# Recipes¶

This section provides a set of recipes that can be used to add additional functionality to inspyred. These recipes are not a part of the core library, but they have proven to be useful in the past for real-world programs. If they continue to be useful, they may be incorporated into inspyred in a future version.

## Lexicographic Ordering¶

In multiobjective optimization problems, alternatives to Pareto preference include linear weighting of the objectives and prioritizing the objectives from most to least important. Both of these methods essentially reduce the problem to a single objective optimization. Obviously, the weighting of the objectives would be handled entirely in the evaluator for the problem, so no special recipe is needed. But the prioritizing of the objectives, which is also known as lexicographic ordering, requires some additional effort.

The fitness values for two individuals, x and y, should be compared such that, if the first objective for x is “better” (i.e., lower when minimizing or higher when maximizing) than the first objective for y, then x is considered “better” than y. If they are equal in that objective, then the second objective is considered in the same way. This process is repeated for all objectives.

The following recipe provides a class to deal with such comparisons that is intended to function much like the inspyred.ec.emo.Pareto class. [download]

import functools

@functools.total_ordering
class Lexicographic(object):
def __init__(self, values=None, maximize=True):
if values is None:
values = []
self.values = values
try:
iter(maximize)
except TypeError:
maximize = [maximize for v in values]
self.maximize = maximize

def __len__(self):
return len(self.values)

def __getitem__(self, key):
return self.values[key]

def __iter__(self):
return iter(self.values)

def __lt__(self, other):
for v, o, m in zip(self.values, other.values, self.maximize):
if m:
if v < o:
return True
elif v > o:
return False
else:
if v > o:
return True
elif v < o:
return False
return False

def __eq__(self, other):
return (self.values == other.values and self.maximize == other.maximize)

def __str__(self):
return str(self.values)

def __repr__(self):
return str(self.values)

def my_evaluator(candidates, args):
fitness = []
for candidate in candidates:
f = candidate ** 2 + 1
g = candidate ** 2 - 1
fitness.append(Lexicographic([f, g], maximize=False))
return fitness

def my_generator(random, args):
return [random.random()]

if __name__ == '__main__':
a = Lexicographic([1, 2, 3], maximize=True)
b = Lexicographic([1, 3, 2], maximize=True)
c = Lexicographic([2, 1, 3], maximize=True)
d = Lexicographic([2, 3, 1], maximize=True)
e = Lexicographic([3, 1, 2], maximize=True)
f = Lexicographic([3, 2, 1], maximize=True)

u = Lexicographic([1, 2, 3], maximize=False)
v = Lexicographic([1, 3, 2], maximize=False)
w = Lexicographic([2, 1, 3], maximize=False)
x = Lexicographic([2, 3, 1], maximize=False)
y = Lexicographic([3, 1, 2], maximize=False)
z = Lexicographic([3, 2, 1], maximize=False)

for p in [a, b, c, d, e, f]:
for q in [a, b, c, d, e, f]:
print('%s < %s : %s' % (p, q, p < q))
print('----------------------------------------')
for p in [u, v, w, x, y, z]:
for q in [u, v, w, x, y, z]:
print('%s < %s : %s' % (p, q, p < q))



## Constraint Selection¶

Optimization problems often have to deal with constraints and constraint violations. The following recipe provides one example of how to handle such a thing with inspyred. Here, candidates represent ordered pairs and their fitness is simply their distance from the origin. However, we provide a constraint that punishes candidates that lie outside of the unit circle. Such a scenario should produce a candidate that lies on the unit circle. Note also that crowding_replacement or some other fitness sharing or niching scheme could be used to generate many such points on the circle. [download]

import random
from inspyred import ec
from inspyred.ec import variators
from inspyred.ec import replacers
from inspyred.ec import terminators
from inspyred.ec import observers

def my_constraint_function(candidate):
"""Return the number of constraints that candidate violates."""
# In this case, we'll just say that the point has to lie
# within a circle centered at (0, 0) of radius 1.
if candidate**2 + candidate**2 > 1:
return 1
else:
return 0

def my_generator(random, args):
# Create pairs in the range [-2, 2].
return [random.uniform(-2.0, 2.0) for i in range(2)]

def my_evaluator(candidates, args):
# The fitness will be how far the point is from
# the origin. (We're maximizing, in this case.)
# Note that the constraint heavily punishes individuals
# who go beyond the unit circle. Therefore, these
# two functions combined focus the evolution toward
# finding individual who lie ON the circle.
fitness = []
for c in candidates:
if my_constraint_function(c) > 0:
fitness.append(-1)
else:
fitness.append(c**2 + c**2)
return fitness

def constrained_tournament_selection(random, population, args):
num_selected = args.setdefault('num_selected', 1)
constraint_func = args.setdefault('constraint_function', None)
tournament_size = 2
pop = list(population)
selected = []
for _ in range(num_selected):
tournament = random.sample(pop, tournament_size)
# If there is not a constraint function,
# just do regular tournament selection.
if constraint_func is None:
selected.append(max(tournament))
else:
cons = [constraint_func(t.candidate) for t in tournament]
# If no constraints are violated, just do
# regular tournament selection.
if max(cons) == 0:
selected.append(max(tournament))
# Otherwise, choose the least violator
# (which may be a non-violator).
else:
selected.append(tournament[cons.index(min(cons))])
return selected

r = random.Random()
myec = ec.EvolutionaryComputation(r)
myec.selector = constrained_tournament_selection
myec.variator = variators.gaussian_mutation
myec.replacer = replacers.generational_replacement
myec.terminator = terminators.evaluation_termination
myec.observer = observers.stats_observer
pop = myec.evolve(my_generator, my_evaluator,
pop_size=100,
bounder=ec.Bounder(-2, 2),
num_selected=100,
constraint_func=my_constraint_function,
mutation_rate=0.5,
max_evaluations=2000)

import pylab
x = []
y = []
c = []
pop.sort()
num_feasible = len([p for p in pop if p.fitness >= 0])
feasible_count = 0
for i, p in enumerate(pop):
x.append(p.candidate)
y.append(p.candidate)
if i == len(pop) - 1:
c.append('r')
elif p.fitness < 0:
c.append('0.98')
else:
c.append(str(1 - feasible_count / float(num_feasible)))
feasible_count += 1
angles = pylab.linspace(0, 2*pylab.pi, 100)
pylab.plot(pylab.cos(angles), pylab.sin(angles), color='b')
pylab.scatter(x, y, color=c)
pylab.savefig('constraint_example.pdf', format='pdf')


## Meta-Evolutionary Computation¶

The following recipe shows how an evolutionary computation can be used to evolve near-optimal operators and parameters for another evolutionary computation. In the EC literature, such a thing is generally referred to as a “meta-EC”. [download]

import csv
import time
import random
from inspyred import ec
from inspyred.ec import selectors
from inspyred.ec import replacers
from inspyred.ec import variators
from inspyred.ec import terminators
from inspyred.ec import observers

class MetaEC(ec.EvolutionaryComputation):
def __init__(self, random):
ec.EvolutionaryComputation.__init__(self, random)
self.selector = selectors.tournament_selection
self.replacer = replacers.generational_replacement
self.variator = [variators.uniform_crossover, self._internal_variator]
self.terminator = self._internal_meta_terminator

def _create_selector_replacer(self, random):
pop_size = random.randint(1, 100)
selector = random.choice(range(0, 5))
replacer = random.choice(range(0, 8))
sel = [selector]
if selector > 0:
if replacer == 0 or replacer == 2 or replacer == 3:
sel.append(pop_size)
else:
sel.append(random.randint(1, pop_size))
if selector == 2:
sel.append(random.randint(min(2, pop_size), pop_size))
rep = [replacer]
if replacer == 1:
rep.append(random.randint(min(2, pop_size), pop_size))
elif replacer == 3 or replacer == 5:
rep.append(random.randint(0, pop_size))
return [pop_size, sel, rep]

def _create_variators(self, random):
crossover = random.choice([0, 1, 2, 3, 4, 6])
mutator = random.choice([5, 6])
variators = ([crossover], [mutator])
if crossover == 0 or crossover == 4:
variators.append(random.random())
variators.append(random.random())
elif crossover == 1:
variators.append(random.random())
elif crossover == 2:
variators.append(random.random())
variators.append(random.randint(1, 10))
elif crossover == 3:
variators.append(random.randint(0, 30))
if mutator == 5:
variators.append(random.random())
variators.append(random.random())
return variators

def _internal_generator(self, random, args):
cross, mut = self._create_variators(random)
return [self._create_selector_replacer(random), cross, mut]

def _internal_variator(self, random, candidates, args):
cs_copy = list(candidates)
for i, cs in enumerate(cs_copy):
if random.random() < 0.1:
cs_copy[i] = self._create_selector_replacer(random)
if random.random() < 0.1:
cross, mut = self._create_variators(random)
cs_copy[i] = cross
cs_copy[i] = mut
return cs_copy

def _internal_observer(self, population, num_generations, num_evaluations, args):
for i, p in enumerate(population):
self._observer_file.write('{0}, {1}, {2}\n'.format(i, p.fitness, str(p.candidate)))
self._observer_file.flush()

def _internal_terminator(self, population, num_generations, num_evaluations, args):
maxevals = args.get('max_evaluations', 0)
self._meta_evaluations += num_evaluations
return num_evaluations >= maxevals or self._meta_evaluations >= self._max_meta_evaluations

def _internal_meta_terminator(self, population, num_generations, num_evaluations, args):
return self._meta_evaluations >= self._max_meta_evaluations

def _internal_evaluator(self, candidates, args):
the_generator = args.get('the_generator')
the_evaluator = args.get('the_evaluator')
do_maximize = args.get('do_maximize', True)

fitness = []
for candidate in candidates:
popsize, selector, replacer, crossover, mutator, myargs = self.interpret_candidate(candidate)
myargs['max_evaluations'] = args.get('num_trial_evaluations', popsize * 10)
num_trials = args.get('num_trials', 1)
evo = ec.EvolutionaryComputation(self._random)
evo.terminator = self._internal_terminator
evo.observer = self._internal_observer
evo.selector = selector
evo.variator = [crossover, mutator]
evo.replacer = replacer
best_fit = []
for i in range(num_trials):
final_pop = evo.evolve(generator=the_generator,
evaluator=the_evaluator,
pop_size=popsize,
maximize=do_maximize,
args=myargs)
best_fit.append(final_pop.fitness)
fitness.append(sum(best_fit) / float(len(best_fit)))
return fitness

def interpret_candidate(self, candidate):
selector_mapping = (selectors.default_selection,
selectors.rank_selection,
selectors.tournament_selection,
selectors.truncation_selection,
selectors.uniform_selection)
variator_mapping = (variators.blend_crossover,
variators.heuristic_crossover,
variators.n_point_crossover,
variators.simulated_binary_crossover,
variators.uniform_crossover,
variators.gaussian_mutation,
variators.default_variation)
replacer_mapping = (replacers.comma_replacement,
replacers.crowding_replacement,
replacers.default_replacement,
replacers.generational_replacement,
replacers.plus_replacement,
replacers.random_replacement,
replacers.truncation_replacement)

myargs = dict()
# Selectors
if candidate == 1:
myargs['num_selected'] = candidate
elif candidate == 2:
myargs['num_selected'] = candidate
myargs['tournament_size'] = candidate
elif candidate == 3:
myargs['num_selected'] = candidate
elif candidate == 4:
myargs['num_selected'] = candidate

# Replacers
if candidate == 1:
myargs['crowding_distance'] = candidate
elif candidate == 3:
myargs['num_elites'] = candidate
elif candidate == 5:
myargs['num_elites'] = candidate

# Crossovers
if candidate == 0:
myargs['crossover_rate'] = candidate
myargs['blx_alpha'] = candidate
elif candidate == 1:
myargs['crossover_rate'] = candidate
elif candidate == 2:
myargs['crossover_rate'] = candidate
myargs['num_crossover_points'] = candidate
elif candidate == 3:
myargs['sbx_distribution_index'] = candidate
elif candidate == 4:
myargs['crossover_rate'] = candidate
myargs['ux_bias'] = candidate

# Mutators
if candidate == 5:
myargs['mutation_rate'] = candidate
myargs['gaussian_stdev'] = candidate

return (candidate,
selector_mapping[candidate],
replacer_mapping[candidate],
variator_mapping[candidate],
variator_mapping[candidate],
myargs)

def evolve(self, generator, evaluator, pop_size=100, seeds=[], maximize=True, **args):
args.setdefault('the_generator', generator)
args.setdefault('the_evaluator', evaluator)
args.setdefault('do_maximize', maximize)
args.setdefault('num_elites', 1)
args.setdefault('num_selected', pop_size)
self._observer_file = open('metaec-individuals-file-' + time.strftime('%m%d%Y-%H%M%S') + '.csv', 'w')
self._meta_evaluations = 0
self._max_meta_evaluations = args.get('max_evaluations', 0)
final_pop = ec.EvolutionaryComputation.evolve(self, self._internal_generator,
self._internal_evaluator, pop_size,
seeds, maximize, **args)
self._observer_file.close()
return final_pop

if __name__ == '__main__':
import math
import inspyred

prng = random.Random()
prng.seed(time.time())
problem = inspyred.benchmarks.Rastrigin(3)
mec = MetaEC(prng)
mec.observer = observers.stats_observer
final_pop = mec.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_trials=1,
num_trial_evaluations=5000,
max_evaluations=100000)

pop_size, selector, replacer, crossover, mutator, args = mec.interpret_candidate(final_pop.candidate)
print('Best Fitness: {0}'.format(final_pop.fitness))
print('Population Size: {0}'.format(pop_size))
print('Selector: {0}'.format(selector.__name__))
print('Replacer: {0}'.format(replacer.__name__))
print('Crossover: {0}'.format(crossover.__name__))
print('Mutator: {0}'.format(mutator.__name__))
print('Parameters:')
for key in args:
print('    {0}: {1}'.format(key, args[key]))
print('Actual Evaluations Used: {0}'.format(mec._meta_evaluations))


## Micro-Evolutionary Computation¶

Another approach that has been successfully applied to some difficult problems is to use many small-population EC’s for small numbers of evaluations in succession. Each succeeding EC is seeded with the best solution from the previous run. This is somewhat akin to a random-restart hill-climbing approach, except that information about the best solution so far is passed along during each restart. [download]

import collections
import inspyred

class MicroEC(inspyred.ec.EvolutionaryComputation):
def __init__(self, random):
inspyred.ec.EvolutionaryComputation.__init__(self, random)

def evolve(self, generator, evaluator, pop_size=10, seeds=None, maximize=True, bounder=None, **args):
self._kwargs = args
self._kwargs['_ec'] = self

if seeds is None:
seeds = []
if bounder is None:
bounder = inspyred.ec.Bounder()

self.termination_cause = None
self.generator = generator
self.evaluator = evaluator
self.bounder = bounder
self.maximize = maximize
self.population = []
self.archive = []
microseeds = seeds

while not self._should_terminate(list(self.population), self.num_generations, self.num_evaluations):
microec = inspyred.ec.EvolutionaryComputation(self._random)
microec.selector = self.selector
microec.variator = self.variator
microec.replacer = self.replacer
microec.observer = self.observer
microec.terminator = inspyred.ec.terminators.evaluation_termination
maxevals = args['max_evaluations']
args['max_evaluations'] = args['micro_evaluations']
result = microec.evolve(generator=generator, evaluator=evaluator,
pop_size=pop_size, seeds=microseeds,
maximize=maximize, **args)
self.population = list(result)
args['max_evaluations'] = maxevals
result.sort(reverse=True)
microseeds = [result.candidate]
self.num_evaluations += microec.num_evaluations

# Migrate individuals.
self.population = self.migrator(random=self._random,
population=self.population,
args=self._kwargs)

# Archive individuals.
self.archive = self.archiver(random=self._random, archive=self.archive,
population=list(self.population), args=self._kwargs)

self.num_generations += microec.num_generations
if isinstance(self.observer, collections.Iterable):
for obs in self.observer:
obs(population=list(self.population), num_generations=self.num_generations,
num_evaluations=self.num_evaluations, args=self._kwargs)
else:
self.observer(population=list(self.population), num_generations=self.num_generations,
num_evaluations=self.num_evaluations, args=self._kwargs)
return self.population

if __name__ == '__main__':
import random
import math
import time

def rastrigin_generator(random, args):
return [random.uniform(-5.12, 5.12) for _ in range(2)]

def rastrigin_evaluator(candidates, args):
fitness = []
for cand in candidates:
fitness.append(10 * len(cand) + sum([x**2 - 10 * (math.cos(2*math.pi*x)) for x in cand]))
return fitness

prng = random.Random()
prng.seed(time.time())
micro = MicroEC(prng)
micro.selector = inspyred.ec.selectors.tournament_selection
micro.variator = [inspyred.ec.variators.uniform_crossover, inspyred.ec.variators.gaussian_mutation]
micro.archiver = inspyred.ec.archivers.best_archiver
micro.observer = inspyred.ec.observers.stats_observer
micro.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = micro.evolve(rastrigin_generator,
rastrigin_evaluator,
pop_size=10,
maximize=False,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
max_evaluations=3000,
micro_evaluations=300,
num_selected=2,
gaussian_stdev=0.1)

print('Actual evaluations: {0}'.format(micro.num_evaluations))

for p in micro.archive:
print p


## Network Migrator¶

The following custom migrator is a callable class (because the migrator must behave like a callback function) that allows solutions to migrate from one network machine to another. It is assumed that the EC islands are running on the given IP:port combinations. [download]

import sys
import socket
import pickle
import collections
import SocketServer

"""Defines a migration function across a network.

This callable class acts as a migration function that
allows candidate solutions to migrate from one population
to another via TCP/IP connections.

The migrator is constructed by specifying the IP address
of the server (hosting the population from which individuals
emigrate) as an IP-port tuple and the addresses of the clients
(hosting the populations to which individuals from the server
immigrate) as a list of IP-port tuples. The max_migrants
parameter specifies the size of the queue of migrants waiting
to immigrate to the server from the clients; the newest migrants
replace older ones in the queue.

Note: In order to use this migration operator, individuals
must be pickle-able.

The following is an example of the use of this operator::

m = NetworkMigrator(('192.168.1.10', 25125),
[('192.168.1.11', 12345), ('192.168.1.12', 54321)],
max_migrants=3)

Since the NetworkMigrator object is a server, it should always
call the shutdown() method when it is no longer needed, in
order to give back its resources.

Public Attributes:

(IP, port) to which individuals should migrate
- *migrants* -- the deque of migrants (of maximum size
specified by max_migrants) waiting to immigrate
to client populations

"""
self.migrants = collections.deque(maxlen=max_migrants)
t.setDaemon(True)
t.start()
self.__name__ = self.__class__.__name__

try:
rbufsize = -1
wbufsize = 0
rfile = request.makefile('rb', rbufsize)
wfile = request.makefile('wb', wbufsize)

with self._lock:
self.migrants.append(migrant)

if not wfile.closed:
wfile.flush()
wfile.close()
rfile.close()
finally:
sys.exc_traceback = None

def __call__(self, random, population, args):
"""Perform the migration.

This function serves as the migration operator. Here, a random address
is chosen from the client_addresses list, and a random individual
is chosen from the population to become the migrant. A socket is opened
to the chosen client address, and the chosen migrant is pickled and
sent to the NetworkMigrator object running at the client address. Then
the migrant queue on the current machine is queried for a migrant
to replace the one sent. If one is found, it replaces the newly
migrated individual; otherwise, the individual remains in the population.

Any immigrants may also be re-evaluated before insertion into the
current population by setting the evaluate_migrant keyword
argument in args to True. This is useful if the evaluation
functions in different populations are different and we want to compare
"apples to apples," as they say.

Arguments:

- *random* -- the random number generator object
- *population* -- the population of Individuals
- *args* -- a dictionary of keyword arguments

Optional keyword arguments in the args parameter:

- *evaluate_migrant* -- whether to re-evaluate the immigrant (default False)

"""
evaluate_migrant = args.setdefault('evaluate_migrant', False)
migrant_index = random.randint(0, len(population) - 1)
pickle_data = pickle.dumps(population[migrant_index])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.send(pickle_data + '\n')
finally:
sock.close()
migrant = None
with self._lock:
if len(self.migrants) > 0:
migrant = self.migrants.popleft()
if migrant is not None:
if evaluate_migrant:
fit = args._ec.evaluator([migrant], args)
migrant.fitness = fit
args._ec.num_evaluations += 1
population[migrant_index] = migrant
return population

def __str__(self):
return str(self.migrants)