Examples¶

For many people, it is easiest to learn a new library by adapting existing examples to their purposes. For that reason, many examples are presented in this section in the hope that they will prove useful to others using inspyred.

Standard Algorithms¶

The following examples illustrate how to use the different, built-in, evolutionary computations. They are (hopefully) simple and self-explanatory. Please note that each one uses an existing benchmark problem. This is just an expedience; it is not necessary. The full list of existing benchmarks can be found in the Library Reference. See the Tutorial for examples that do not make use of a benchmark problem.

Genetic Algorithm¶

In this example, a GA is used to evolve a solution to the binary version of the Schwefel benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
maximize=problem.maximize,
bounder=problem.bounder,
max_evaluations=30000,
num_elites=1)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Evolution Strategy¶

In this example, an ES is used to evolve a solution to the Rosenbrock benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Rosenbrock(2)
ea = inspyred.ec.ES(prng)
ea.terminator = [inspyred.ec.terminators.evaluation_termination,
inspyred.ec.terminators.diversity_termination]
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Simulated Annealing¶

In this example, an SA is used to evolve a solution to the Sphere benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Sphere(2)
ea = inspyred.ec.SA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(evaluator=problem.evaluator,
generator=problem.generator,
maximize=problem.maximize,
bounder=problem.bounder,
max_evaluations=30000)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Differential Evolution Algorithm¶

In this example, a DEA is used to evolve a solution to the Griewank benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Griewank(2)
ea = inspyred.ec.DEA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Estimation of Distribution Algorithm¶

In this example, an EDA is used to evolve a solution to the Rastrigin benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Rastrigin(2)
ea = inspyred.ec.EDA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(evaluator=problem.evaluator,
generator=problem.generator,
pop_size=1000,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000,
num_selected=500,
num_offspring=1000,
num_elites=1)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Pareto Archived Evolution Strategy (PAES)¶

In this example, a PAES is used to evolve a solution to the Kursawe multiobjective benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Kursawe(3)
ea = inspyred.ec.emo.PAES(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=10000,
max_archive_size=100,
num_grid_divisions=4)

if display:
final_arc = ea.archive
print('Best Solutions: \n')
for f in final_arc:
print(f)
import pylab
x = []
y = []
for f in final_arc:
x.append(f.fitness[0])
y.append(f.fitness[1])
pylab.scatter(x, y, color='b')
pylab.savefig('{0} Example ({1}).pdf'.format(ea.__class__.__name__,
problem.__class__.__name__),
format='pdf')
pylab.show()
return ea

if __name__ == '__main__':
main(display=True)


Nondominated Sorting Genetic Algorithm (NSGA-II)¶

In this example, an NSGA2 is used to evolve a solution to the Kursawe multiobjective benchmark. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Kursawe(3)
ea = inspyred.ec.emo.NSGA2(prng)
ea.variator = [inspyred.ec.variators.blend_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
maximize=problem.maximize,
bounder=problem.bounder,
max_generations=80)

if display:
final_arc = ea.archive
print('Best Solutions: \n')
for f in final_arc:
print(f)
import pylab
x = []
y = []
for f in final_arc:
x.append(f.fitness[0])
y.append(f.fitness[1])
pylab.scatter(x, y, color='b')
pylab.savefig('{0} Example ({1}).pdf'.format(ea.__class__.__name__,
problem.__class__.__name__),
format='pdf')
pylab.show()
return ea

if __name__ == '__main__':
main(display=True)


Particle Swarm Optimization¶

In this example, a PSO is used to evolve a solution to the Ackley benchmark. [download]

from time import time
from random import Random
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.swarm.PSO(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
ea.topology = inspyred.swarm.topologies.ring_topology
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000,
neighborhood_size=5)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Ant Colony Optimization¶

In this example, an ACS is used to evolve a solution to the TSP benchmark. [download]

from random import Random
from time import time
import math
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)

problem = inspyred.benchmarks.TSP(weights)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(generator=problem.constructor,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=10,
max_generations=50)

if display:
best = max(ac.archive)
print('Best Solution:')
for b in best.candidate:
print(points[b.element[0]])
print(points[best.candidate[-1].element[1]])
print('Distance: {0}'.format(1/best.fitness))
return ac

if __name__ == '__main__':
main(display=True)


Customized Algorithms¶

The true benefit of the inspyred library is that it allows the programmer to customize almost every aspect of the algorithm. This is accomplished primarily through the use of function (or function-like) callbacks that can be specified by the programmer. The following examples show how to customize many different parts of the evolutionary computation.

Custom Evolutionary Computation¶

In this example, an evolutionary computation is created which uses tournament selection, uniform crossover, Gaussian mutation, and steady-state replacement. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.uniform_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
tournament_size=7,
num_selected=2,
max_generations=300,
mutation_rate=0.2)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Custom Archiver¶

The purpose of the archiver is to provide a mechanism for candidate solutions to be maintained without necessarily remaining in the population. This is important for most multiobjective evolutionary approaches, but it can also be useful for single-objective problems, as well. In this example, an archiver is created that maintains the worst individual found. (There is no imaginable reason why one might actually do this. It is just for illustration purposes.) [download]

from random import Random
from time import time
import inspyred

def my_archiver(random, population, archive, args):
worst_in_pop = min(population)
if len(archive) > 0:
worst_in_arc = min(archive)
if worst_in_pop < worst_in_arc:
return [worst_in_pop]
else:
return archive
else:
return [worst_in_pop]

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Rosenbrock(2)
ea = inspyred.ec.ES(prng)
ea.observer = [inspyred.ec.observers.stats_observer,
inspyred.ec.observers.archive_observer]
ea.archiver = my_archiver
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
print(ea.archive)


Custom Observer¶

Sometimes it is helpful to see certain aspects of the current population as it evolves. The purpose of the “observer” functions is to provide a function that executes at the end of each generation so that the process can be monitored accordingly. In this example, the only information desired at each generation is the current best individual. [download]

from random import Random
from time import time
import inspyred

def my_observer(population, num_generations, num_evaluations, args):
best = max(population)
print('{0:6} -- {1} : {2}'.format(num_generations,
best.fitness,
str(best.candidate)))

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Rastrigin(2)
ea = inspyred.ec.ES(prng)
ea.observer = my_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))


Custom Replacer¶

The replacers are used to determine which of the parents, offspring, and current population should survive into the next generation. In this example, survivors are determined to be top 50% of individuals from the population along with 50% chosen randomly from the offspring. (Once again, this is simply an example. There may be no good reason to create such a replacement scheme.) [download]

from random import Random
from time import time
import inspyred

def my_replacer(random, population, parents, offspring, args):
psize = len(population)
population.sort(reverse=True)
survivors = population[:psize // 2]
num_remaining = psize - len(survivors)
for i in range(num_remaining):
survivors.append(random.choice(offspring))
return survivors

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.ec.ES(prng)
ea.replacer = my_replacer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))


Custom Selector¶

The selectors are used to determine which individuals in the population should become parents. In this example, parents are chosen such that 50% of the time the best individual in the population is chosen to be a parent and 50% of the time a random individual is chosen. As before, this is an example selector that may have little practical value. [download]

from random import Random
from time import time
import inspyred

def my_selector(random, population, args):
n = args.get('num_selected', 2)
best = max(population)
selected = []
for i in range(n):
if random.random() <= 0.5:
selected.append(best)
else:
selected.append(random.choice(population))
return selected

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Griewank(2)
ea = inspyred.ec.DEA(prng)
ea.selector = my_selector
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)

best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))


Custom Terminator¶

The terminators are used to determine when the evolutionary process should end. All terminators return a Boolean value where True implies that the evolution should end. In this example, the evolution should continue until the average Hamming distance between all combinations of candidates falls below a specified minimum. [download]

from random import Random
from time import time
import itertools
import inspyred

def my_terminator(population, num_generations, num_evaluations, args):
min_ham_dist = args.get('minimum_hamming_distance', 30)
ham_dist = []
for x, y in itertools.combinations(population, 2):
ham_dist.append(sum(a != b for a, b in zip(x.candidate, y.candidate)))
avg_ham_dist = sum(ham_dist) / float(len(ham_dist))
return avg_ham_dist <= min_ham_dist

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.terminator = my_terminator
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_elites=1,
minimum_hamming_distance=12)

best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))


Custom Variator¶

The variators provide what are normally classified as “crossover” and “mutation,” however even more exotic variators can be defined. Remember that a list of variators can be specified that will act as a pipeline with the output of the first being used as the input to the second, etc. In this example, the binary candidate is mutated such that two points are chosen and the bits between each point are put in reverse order. For example, 0010100100 would become 0010010100 if the third and eighth bits are the chosen points. [download]

from random import Random
from time import time
import inspyred

# Note that we could have used the @inspyred.ec.variators.mutator
# decorator here and simplified our custom variator to
#
#     def my_variator(random, candidate, args)
#
# where candidate is a single candidate. Such a function would
# just return the single mutant.
def my_variator(random, candidates, args):
mutants = []
for c in candidates:
points = random.sample(range(len(c)), 2)
x, y = min(points), max(points)
if x == 0:
mutants.append(c[y::-1] + c[y+1:])
else:
mutants.append(c[:x] + c[y:x-1:-1] + c[y+1:])
return mutants

if __name__ == '__main__':
prng = Random()
prng.seed(time())

problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.variator = my_variator
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_elites=1,
max_evaluations=20000)

best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))


The examples in this section deal with less commonly used aspects of the library. Be aware that these parts may not have received as much testing as the more core components exemplified above.

Discrete Optimization¶

Discrete optimization problems often present difficulties for naive evolutionary computation approaches. Special care must be taken to generate and maintain feasible solutions and/or to sufficiently penalize infeasible solutions. Ant colony optimization approaches were created to deal with discrete optimization problems. In these examples, we consider two of the most famous discrete optimization benchmark problems – the Traveling Salesman Problem (TSP) and the Knapsack problem. The background on these problems is omitted here because it can easily be found elsewhere.

The Traveling Salesman Problem¶

Candidate solutions for the TSP can be most easily be represented as permutations of the list of city numbers (enumerating the order in which cities should be visited). For instance, if there are 5 cities, then a candidate solution might be [4, 1, 0, 2, 3]. This is how the TSP benchmark represents solutions. However, this simple representation forces us to work harder to ensure that our solutions remain feasible during crossover and mutation. Therefore, we need to use variators suited to the task, as shown in the example below. [download]

from random import Random
from time import time
import math
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)

problem = inspyred.benchmarks.TSP(weights)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.partially_matched_crossover,
inspyred.ec.variators.inversion_mutation]
ea.replacer = inspyred.ec.replacers.generational_replacement
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=100,
max_generations=50,
tournament_size=5,
num_selected=100,
num_elites=1)

if display:
best = max(ea.population)
print('Best Solution: {0}: {1}'.format(str(best.candidate), 1/best.fitness))
return ea

if __name__ == '__main__':
main(display=True)


As an alternative, we can use ant colony optimization to solve the TSP. This example was shown previously, but it is presented again here for completeness. [download]

from random import Random
from time import time
import math
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)

problem = inspyred.benchmarks.TSP(weights)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(generator=problem.constructor,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=10,
max_generations=50)

if display:
best = max(ac.archive)
print('Best Solution:')
for b in best.candidate:
print(points[b.element[0]])
print(points[best.candidate[-1].element[1]])
print('Distance: {0}'.format(1/best.fitness))
return ac

if __name__ == '__main__':
main(display=True)


The Knapsack Problem¶

Candidate solutions for the Knapsack problem can be represented as either a binary list (for the 0/1 Knapsack) or as a list of non-negative integers (for the Knapsack with duplicates). In each case, the list is the same length as the number of items, and each element of the list corresponds to the quantity of the corresponding item to place in the knapsack. For the evolutionary computation, we can use uniform_crossover and gaussian_mutation. The reason we are able to use Gaussian mutation here, even though the candidates are composed of discrete values, is because the bounder created by the Knapsack benchmark is an instance of DiscreteBounder, which automatically moves an illegal component to its nearest legal value. [download]

from random import Random
from time import time
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

items = [(7,369), (10,346), (11,322), (10,347), (12,348), (13,383),
(8,347), (11,364), (8,340), (8,324), (13,365), (12,314),
(13,306), (13,394), (7,326), (11,310), (9,400), (13,339),
(5,381), (14,353), (6,383), (9,317), (6,349), (11,396),
(14,353), (9,322), (5,329), (5,386), (5,382), (4,369),
(6,304), (10,392), (8,390), (8,307), (10,318), (13,359),
(9,378), (8,376), (11,330), (9,331)]

problem = inspyred.benchmarks.Knapsack(15, items, duplicates=True)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.uniform_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=100,
max_evaluations=2500,
tournament_size=5,
num_selected=2)

if display:
best = max(ea.population)
print('Best Solution: {0}: {1}'.format(str(best.candidate),
best.fitness))
return ea

if __name__ == '__main__':
main(display=True)


Once again, as an alternative we can use ant colony optimization. Just for variety, we’ll use it to solve the 0/1 Knapsack problem (duplicates=False). [download]

from random import Random
from time import time
import math
import inspyred

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

items = [(7,369), (10,346), (11,322), (10,347), (12,348), (13,383),
(8,347), (11,364), (8,340), (8,324), (13,365), (12,314),
(13,306), (13,394), (7,326), (11,310), (9,400), (13,339),
(5,381), (14,353), (6,383), (9,317), (6,349), (11,396),
(14,353), (9,322), (5,329), (5,386), (5,382), (4,369),
(6,304), (10,392), (8,390), (8,307), (10,318), (13,359),
(9,378), (8,376), (11,330), (9,331)]

problem = inspyred.benchmarks.Knapsack(15, items, duplicates=False)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(problem.constructor, problem.evaluator,
maximize=problem.maximize, pop_size=50,
max_generations=50)

if display:
best = max(ac.archive)
print('Best Solution: {0}: {1}'.format(str(best.candidate),
best.fitness))
return ac

if __name__ == '__main__':
main(display=True)


Evaluating Individuals Concurrently¶

One of the most lauded aspects of many bio-inspired algorithms is their inherent parallelism. Taking advantage of this parallelism is important in real-world problems, which are often computationally expensive. There are two approaches available in inspyred to perform parallel evaluations for candidate solutions. The first makes use of the multiprocessing module that exists in core Python 2.6+. The second makes use of a third-party library called Parallel Python (pp), which can be used for either multi-core processing on a single machine or distributed processing across a network.

The multiprocessing Module¶

Using the multiprocessing approach to parallel evaluations is probably the better choice if all evaluations are going to be split among multiple processors or cores on a single machine. This is because the module is part of the core Python library (2.6+) and provides a simple, standard interface for setting up the parallelism. As shown in the example below, the only additional parameter is the name of the “actual” evaluation function and the (optional) number of CPUs to use. [download]

from random import Random
from time import time
import inspyred
import math

def generate_rastrigin(random, args):
size = args.get('num_inputs', 10)
return [random.uniform(-5.12, 5.12) for i in xrange(size)]

def evaluate_rastrigin(candidates, args):
fitness = []
for cs in candidates:
fit = 10 * len(cs) + sum([((x - 1)**2 - 10 *
math.cos(2 * math.pi * (x - 1)))
for x in cs])
fitness.append(fit)
return fitness

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

ea = inspyred.ec.DEA(prng)
if display:
ea.observer = inspyred.ec.observers.stats_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=generate_rastrigin,
evaluator=inspyred.ec.evaluators.parallel_evaluation_mp,
mp_evaluator=evaluate_rastrigin,
mp_num_cpus=8,
pop_size=8,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
maximize=False,
max_evaluations=256,
num_inputs=3)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Parallel Python¶

The Parallel Python approach to multiprocessing is best suited for use across a network of computers (though it can be used on a single machine, as well). It takes just a little effort to install and setup pp on each machine, but once it’s complete, the network becomes a very efficient computing cluster. This is a capability that the multiprocessing approach simply does not have, and it provides incredible scalability. However, pp requires additional, non-standard parameters to be passed in, as illustrated in the example below. [download]

from random import Random
from time import time
import inspyred
import math

# Define an additional "necessary" function for the evaluator
# to see how it must be handled when using pp.
def my_squaring_function(x):
return x**2

def generate_rastrigin(random, args):
size = args.get('num_inputs', 10)
return [random.uniform(-5.12, 5.12) for i in xrange(size)]

def evaluate_rastrigin(candidates, args):
fitness = []
for cs in candidates:
fit = 10 * len(cs) + sum([(my_squaring_function(x - 1) -
10 * math.cos(2 * math.pi * (x - 1)))
for x in cs])
fitness.append(fit)
return fitness

def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())

ea = inspyred.ec.DEA(prng)
if display:
ea.observer = inspyred.ec.observers.stats_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=generate_rastrigin,
evaluator=inspyred.ec.evaluators.parallel_evaluation_pp,
pp_evaluator=evaluate_rastrigin,
pp_dependencies=(my_squaring_function,),
pp_modules=("math",),
pop_size=8,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
maximize=False,
max_evaluations=256,
num_inputs=3)

if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea

if __name__ == '__main__':
main(display=True)


Island Models¶

Along with parallel evaluation of the fitness, it is also possible to create different populations that evolve independently but are capable of sharing solutions among themselves. Such approaches are known as “island models” and can be accomplished within inspyred by making use of the migrator callback. The following example illustrates a very simple two-island model using the inspyred.ec.migrators.MultiprocessingMigrator migrator. Remember that custom migrators can easily be constructed for more specific needs. [download]

import random
import time
import multiprocessing
import inspyred

def create_island(rand_seed, island_number, problem, mp_migrator):
evals = 200
psize = 20
rand = random.Random()
rand.seed(rand_seed)
ec = inspyred.ec.EvolutionaryComputation(rand)
ec.observer = [inspyred.ec.observers.stats_observer, inspyred.ec.observers.file_observer]
ec.terminator = inspyred.ec.terminators.evaluation_termination
ec.selector = inspyred.ec.selectors.tournament_selection
ec.replacer = inspyred.ec.replacers.generational_replacement
ec.variator = [inspyred.ec.variators.blend_crossover, inspyred.ec.variators.gaussian_mutation]
ec.migrator = mp_migrator
final_pop = ec.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
statistics_file=open("stats_%d.csv" % island_number, "w"),
individuals_file=open("inds_%d.csv" % island_number, "w"),
max_evaluations=evals,
num_elites=1,
pop_size=psize,
num_selected=psize,
evaluate_migrant=False)

if __name__ == "__main__":
cpus = 2
problem = inspyred.benchmarks.Rastrigin(2)
mp_migrator = inspyred.ec.migrators.MultiprocessingMigrator(1)
rand_seed = int(time.time())
jobs = []
for i in range(cpus):
p = multiprocessing.Process(target=create_island, args=(rand_seed + i, i, problem, mp_migrator))
p.start()
jobs.append(p)
for j in jobs:
j.join()


Replacement via Niching¶

An example of a not-quite-standard replacer is the crowding_replacement which provides a niching capability. An example using this replacer is given below. Here, the candidates are just single numbers (but created as singleton lists so as to be Sequence types for some of the built-in operators) between 0 and 26. Their fitness values are simply the value of the sine function using the candidate value as input. Since the sine function is periodic and goes through four periods between 0 and 26, this function is multimodal. So we can use crowding_replacement to ensure that all maxima are found. [download]

import random
import time
import math
import itertools
import inspyred

def my_distance(x, y):
return sum([abs(a - b) for a, b in zip(x, y)])

def generate(random, args):
return [random.uniform(0, 26)]

def evaluate(candidates, args):
fitness = []
for cand in candidates:
fit = sum([math.sin(c) for c in cand])
fitness.append(fit)
return fitness

def main(prng=None, display=False):
if prng is None:
prng = random.Random()
prng.seed(time.time())

ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.replacer = inspyred.ec.replacers.crowding_replacement
ea.variator = inspyred.ec.variators.gaussian_mutation
ea.terminator = inspyred.ec.terminators.evaluation_termination

final_pop = ea.evolve(generate, evaluate, pop_size=30,
bounder=inspyred.ec.Bounder(0, 26),
max_evaluations=10000,
num_selected=30,
mutation_rate=1.0,
crowding_distance=10,
distance_function=my_distance)

if display:
import pylab
x = []
y = []
for p in final_pop:
x.append(p.candidate[0])
y.append(math.sin(p.candidate[0]))
t = [(i / 1000.0) * 26.0 for i in range(1000)]
s = [math.sin(a) for a in t]
pylab.plot(t, s, color='b')
pylab.scatter(x, y, color='r')
pylab.axis([0, 26, 0, 1.1])
pylab.savefig('niche_example.pdf', format='pdf')
pylab.show()
return ea

if __name__ == '__main__':
main(display=True)