Recipes

This section provides a set of recipes that can be used to add additional functionality to inspyred. These recipes are not a part of the core library, but they have proven to be useful in the past for real-world programs. If they continue to be useful, they may be incorporated into inspyred in a future version.

Lexicographic Ordering

In multiobjective optimization problems, alternatives to Pareto preference include linear weighting of the objectives and prioritizing the objectives from most to least important. Both of these methods essentially reduce the problem to a single objective optimization. Obviously, the weighting of the objectives would be handled entirely in the evaluator for the problem, so no special recipe is needed. But the prioritizing of the objectives, which is also known as lexicographic ordering, requires some additional effort.

The fitness values for two individuals, x and y, should be compared such that, if the first objective for x is “better” (i.e., lower when minimizing or higher when maximizing) than the first objective for y, then x is considered “better” than y. If they are equal in that objective, then the second objective is considered in the same way. This process is repeated for all objectives.

The following recipe provides a class to deal with such comparisons that is intended to function much like the inspyred.ec.emo.Pareto class. [download]

import functools

@functools.total_ordering
class Lexicographic(object):
    def __init__(self, values=None, maximize=True):
        if values is None:
            values = []
        self.values = values
        try:
            iter(maximize)
        except TypeError:
            maximize = [maximize for v in values]
        self.maximize = maximize

    def __len__(self):
        return len(self.values)
    
    def __getitem__(self, key):
        return self.values[key]
        
    def __iter__(self):
        return iter(self.values)
        
    def __lt__(self, other):
        for v, o, m in zip(self.values, other.values, self.maximize):
            if m:
                if v < o:
                    return True
                elif v > o:
                    return False
            else:
                if v > o:
                    return True
                elif v < o:
                    return False
        return False

    def __eq__(self, other):
        return (self.values == other.values and self.maximize == other.maximize)

    def __str__(self):
        return str(self.values)
        
    def __repr__(self):
        return str(self.values)


def my_evaluator(candidates, args):
    fitness = []
    for candidate in candidates:
        f = candidate[0] ** 2 + 1
        g = candidate[0] ** 2 - 1
        fitness.append(Lexicographic([f, g], maximize=False))
    return fitness

def my_generator(random, args):
    return [random.random()]
    
if __name__ == '__main__':
    a = Lexicographic([1, 2, 3], maximize=True)
    b = Lexicographic([1, 3, 2], maximize=True)
    c = Lexicographic([2, 1, 3], maximize=True)
    d = Lexicographic([2, 3, 1], maximize=True)
    e = Lexicographic([3, 1, 2], maximize=True)
    f = Lexicographic([3, 2, 1], maximize=True)
    
    u = Lexicographic([1, 2, 3], maximize=False)
    v = Lexicographic([1, 3, 2], maximize=False)
    w = Lexicographic([2, 1, 3], maximize=False)
    x = Lexicographic([2, 3, 1], maximize=False)
    y = Lexicographic([3, 1, 2], maximize=False)
    z = Lexicographic([3, 2, 1], maximize=False)
    
    for p in [a, b, c, d, e, f]:
        for q in [a, b, c, d, e, f]:
            print('%s < %s : %s' % (p, q, p < q))
    print('----------------------------------------')
    for p in [u, v, w, x, y, z]:
        for q in [u, v, w, x, y, z]:
            print('%s < %s : %s' % (p, q, p < q))
    

Constraint Selection

Optimization problems often have to deal with constraints and constraint violations. The following recipe provides one example of how to handle such a thing with inspyred. Here, candidates represent ordered pairs and their fitness is simply their distance from the origin. However, we provide a constraint that punishes candidates that lie outside of the unit circle. Such a scenario should produce a candidate that lies on the unit circle. Note also that crowding_replacement or some other fitness sharing or niching scheme could be used to generate many such points on the circle. [download]

import random
from inspyred import ec
from inspyred.ec import variators
from inspyred.ec import replacers
from inspyred.ec import terminators
from inspyred.ec import observers

def my_constraint_function(candidate):
    """Return the number of constraints that candidate violates."""
    # In this case, we'll just say that the point has to lie 
    # within a circle centered at (0, 0) of radius 1.
    if candidate[0]**2 + candidate[1]**2 > 1:
        return 1
    else:
        return 0

def my_generator(random, args):
    # Create pairs in the range [-2, 2].
    return [random.uniform(-2.0, 2.0) for i in range(2)]

def my_evaluator(candidates, args):
    # The fitness will be how far the point is from
    # the origin. (We're maximizing, in this case.)
    # Note that the constraint heavily punishes individuals
    # who go beyond the unit circle. Therefore, these
    # two functions combined focus the evolution toward
    # finding individual who lie ON the circle.
    fitness = []
    for c in candidates:
        if my_constraint_function(c) > 0:
            fitness.append(-1)
        else:
            fitness.append(c[0]**2 + c[1]**2)
    return fitness

def constrained_tournament_selection(random, population, args):
    num_selected = args.setdefault('num_selected', 1)
    constraint_func = args.setdefault('constraint_function', None)
    tournament_size = 2
    pop = list(population)
    selected = []
    for _ in range(num_selected):
        tournament = random.sample(pop, tournament_size)
        # If there is not a constraint function,
        # just do regular tournament selection.
        if constraint_func is None:
            selected.append(max(tournament))
        else:
            cons = [constraint_func(t.candidate) for t in tournament]
            # If no constraints are violated, just do 
            # regular tournament selection.
            if max(cons) == 0:
                selected.append(max(tournament))
            # Otherwise, choose the least violator 
            # (which may be a non-violator).
            else:
                selected.append(tournament[cons.index(min(cons))])
    return selected

r = random.Random()
myec = ec.EvolutionaryComputation(r)
myec.selector = constrained_tournament_selection
myec.variator = variators.gaussian_mutation
myec.replacer = replacers.generational_replacement
myec.terminator = terminators.evaluation_termination
myec.observer = observers.stats_observer
pop = myec.evolve(my_generator, my_evaluator, 
                  pop_size=100, 
                  bounder=ec.Bounder(-2, 2),
                  num_selected=100,
                  constraint_func=my_constraint_function, 
                  mutation_rate=0.5,
                  max_evaluations=2000)
                  
import pylab
x = []
y = []
c = []
pop.sort()
num_feasible = len([p for p in pop if p.fitness >= 0])
feasible_count = 0
for i, p in enumerate(pop):
    x.append(p.candidate[0])
    y.append(p.candidate[1])
    if i == len(pop) - 1:
        c.append('r')
    elif p.fitness < 0:
        c.append('0.98')
    else:
        c.append(str(1 - feasible_count / float(num_feasible)))
        feasible_count += 1
angles = pylab.linspace(0, 2*pylab.pi, 100)
pylab.plot(pylab.cos(angles), pylab.sin(angles), color='b')
pylab.scatter(x, y, color=c)
pylab.savefig('constraint_example.pdf', format='pdf')

Meta-Evolutionary Computation

The following recipe shows how an evolutionary computation can be used to evolve near-optimal operators and parameters for another evolutionary computation. In the EC literature, such a thing is generally referred to as a “meta-EC”. [download]

import csv
import time
import random
from inspyred import ec
from inspyred.ec import selectors
from inspyred.ec import replacers
from inspyred.ec import variators
from inspyred.ec import terminators
from inspyred.ec import observers


class MetaEC(ec.EvolutionaryComputation):
    def __init__(self, random):
        ec.EvolutionaryComputation.__init__(self, random)
        self.selector = selectors.tournament_selection
        self.replacer = replacers.generational_replacement
        self.variator = [variators.uniform_crossover, self._internal_variator]
        self.terminator = self._internal_meta_terminator
        
    def _create_selector_replacer(self, random):
        pop_size = random.randint(1, 100)
        selector = random.choice(range(0, 5))
        replacer = random.choice(range(0, 8))
        sel = [selector]
        if selector > 0:
            if replacer == 0 or replacer == 2 or replacer == 3:
                sel.append(pop_size)
            else:
                sel.append(random.randint(1, pop_size))
        if selector == 2:
            sel.append(random.randint(min(2, pop_size), pop_size))
        rep = [replacer]
        if replacer == 1:
            rep.append(random.randint(min(2, pop_size), pop_size))
        elif replacer == 3 or replacer == 5:
            rep.append(random.randint(0, pop_size))
        return [pop_size, sel, rep]
    
    def _create_variators(self, random):
        crossover = random.choice([0, 1, 2, 3, 4, 6])
        mutator = random.choice([5, 6])
        variators = ([crossover], [mutator])
        if crossover == 0 or crossover == 4:
            variators[0].append(random.random())
            variators[0].append(random.random())
        elif crossover == 1:
            variators[0].append(random.random())
        elif crossover == 2:
            variators[0].append(random.random())
            variators[0].append(random.randint(1, 10))
        elif crossover == 3:
            variators[0].append(random.randint(0, 30))
        if mutator == 5:
            variators[1].append(random.random())
            variators[1].append(random.random())
        return variators
    
    def _internal_generator(self, random, args):
        cross, mut = self._create_variators(random)
        return [self._create_selector_replacer(random), cross, mut]
        
    def _internal_variator(self, random, candidates, args):
        cs_copy = list(candidates)
        for i, cs in enumerate(cs_copy):
            if random.random() < 0.1:
                cs_copy[i][0] = self._create_selector_replacer(random)
            if random.random() < 0.1:
                cross, mut = self._create_variators(random)
                cs_copy[i][1] = cross
                cs_copy[i][2] = mut
        return cs_copy

    def _internal_observer(self, population, num_generations, num_evaluations, args):
        for i, p in enumerate(population):
            self._observer_file.write('{0}, {1}, {2}\n'.format(i, p.fitness, str(p.candidate)))
            self._observer_file.flush()
    
    def _internal_terminator(self, population, num_generations, num_evaluations, args):
        maxevals = args.get('max_evaluations', 0)
        self._meta_evaluations += num_evaluations
        return num_evaluations >= maxevals or self._meta_evaluations >= self._max_meta_evaluations
        
    def _internal_meta_terminator(self, population, num_generations, num_evaluations, args):
        return self._meta_evaluations >= self._max_meta_evaluations
        
    def _internal_evaluator(self, candidates, args):
        the_generator = args.get('the_generator')
        the_evaluator = args.get('the_evaluator')
        do_maximize = args.get('do_maximize', True)
    
        fitness = []
        for candidate in candidates:
            popsize, selector, replacer, crossover, mutator, myargs = self.interpret_candidate(candidate)
            myargs['max_evaluations'] = args.get('num_trial_evaluations', popsize * 10)
            num_trials = args.get('num_trials', 1)
            evo = ec.EvolutionaryComputation(self._random)
            evo.terminator = self._internal_terminator
            evo.observer = self._internal_observer
            evo.selector = selector
            evo.variator = [crossover, mutator]
            evo.replacer = replacer
            best_fit = []
            for i in range(num_trials):
                final_pop = evo.evolve(generator=the_generator,
                                       evaluator=the_evaluator,
                                       pop_size=popsize,
                                       maximize=do_maximize,
                                       args=myargs)
                best_fit.append(final_pop[0].fitness)
            fitness.append(sum(best_fit) / float(len(best_fit)))
        return fitness
    
    def interpret_candidate(self, candidate):
        selector_mapping = (selectors.default_selection,
                            selectors.rank_selection,
                            selectors.tournament_selection,
                            selectors.truncation_selection,
                            selectors.uniform_selection)
        variator_mapping = (variators.blend_crossover, 
                            variators.heuristic_crossover,
                            variators.n_point_crossover,
                            variators.simulated_binary_crossover,
                            variators.uniform_crossover,
                            variators.gaussian_mutation,
                            variators.default_variation)
        replacer_mapping = (replacers.comma_replacement,
                            replacers.crowding_replacement, 
                            replacers.default_replacement,
                            replacers.generational_replacement,
                            replacers.plus_replacement,
                            replacers.random_replacement,
                            replacers.steady_state_replacement,
                            replacers.truncation_replacement)
        
        myargs = dict()
        # Selectors
        if candidate[0][1][0] == 1:
            myargs['num_selected'] = candidate[0][1][1]
        elif candidate[0][1][0] == 2:
            myargs['num_selected'] = candidate[0][1][1]
            myargs['tournament_size'] = candidate[0][1][2]
        elif candidate[0][1][0] == 3:
            myargs['num_selected'] = candidate[0][1][1]
        elif candidate[0][1][0] == 4:
            myargs['num_selected'] = candidate[0][1][1]
            
        # Replacers
        if candidate[0][2][0] == 1:
            myargs['crowding_distance'] = candidate[0][2][1]
        elif candidate[0][2][0] == 3:
            myargs['num_elites'] = candidate[0][2][1]
        elif candidate[0][2][0] == 5:
            myargs['num_elites'] = candidate[0][2][1]
            
        # Crossovers
        if candidate[1][0] == 0:
            myargs['crossover_rate'] = candidate[1][1]
            myargs['blx_alpha'] = candidate[1][2]
        elif candidate[1][0] == 1:
            myargs['crossover_rate'] = candidate[1][1]
        elif candidate[1][0] == 2:
            myargs['crossover_rate'] = candidate[1][1]
            myargs['num_crossover_points'] = candidate[1][2]
        elif candidate[1][0] == 3:
            myargs['sbx_distribution_index'] = candidate[1][1]
        elif candidate[1][0] == 4:
            myargs['crossover_rate'] = candidate[1][1]
            myargs['ux_bias'] = candidate[1][2]
            
        # Mutators
        if candidate[2][0] == 5:
            myargs['mutation_rate'] = candidate[2][1]
            myargs['gaussian_stdev'] = candidate[2][2]
            
        return (candidate[0][0], 
                selector_mapping[candidate[0][1][0]], 
                replacer_mapping[candidate[0][2][0]], 
                variator_mapping[candidate[1][0]],
                variator_mapping[candidate[2][0]],
                myargs)
                
    def evolve(self, generator, evaluator, pop_size=100, seeds=[], maximize=True, **args):
        args.setdefault('the_generator', generator)
        args.setdefault('the_evaluator', evaluator)
        args.setdefault('do_maximize', maximize)
        args.setdefault('num_elites', 1)
        args.setdefault('num_selected', pop_size)
        self._observer_file = open('metaec-individuals-file-' + time.strftime('%m%d%Y-%H%M%S') + '.csv', 'w')
        self._meta_evaluations = 0
        self._max_meta_evaluations = args.get('max_evaluations', 0)
        final_pop = ec.EvolutionaryComputation.evolve(self, self._internal_generator, 
                                                      self._internal_evaluator, pop_size, 
                                                      seeds, maximize, **args)
        self._observer_file.close()
        return final_pop
        
if __name__ == '__main__':  
    import math
    import inspyred

    prng = random.Random()
    prng.seed(time.time())
    problem = inspyred.benchmarks.Rastrigin(3)
    mec = MetaEC(prng)
    mec.observer = observers.stats_observer
    final_pop = mec.evolve(generator=problem.generator, 
                           evaluator=problem.evaluator, 
                           pop_size=10, 
                           maximize=problem.maximize,
                           bounder=problem.bounder,
                           num_trials=1, 
                           num_trial_evaluations=5000, 
                           max_evaluations=100000)
        
    pop_size, selector, replacer, crossover, mutator, args = mec.interpret_candidate(final_pop[0].candidate)
    print('Best Fitness: {0}'.format(final_pop[0].fitness))
    print('Population Size: {0}'.format(pop_size))
    print('Selector: {0}'.format(selector.__name__))
    print('Replacer: {0}'.format(replacer.__name__))
    print('Crossover: {0}'.format(crossover.__name__))
    print('Mutator: {0}'.format(mutator.__name__))
    print('Parameters:')
    for key in args:
        print('    {0}: {1}'.format(key, args[key]))
    print('Actual Evaluations Used: {0}'.format(mec._meta_evaluations))

Micro-Evolutionary Computation

Another approach that has been successfully applied to some difficult problems is to use many small-population EC’s for small numbers of evaluations in succession. Each succeeding EC is seeded with the best solution from the previous run. This is somewhat akin to a random-restart hill-climbing approach, except that information about the best solution so far is passed along during each restart. [download]

import collections
import inspyred

class MicroEC(inspyred.ec.EvolutionaryComputation):
    def __init__(self, random):
        inspyred.ec.EvolutionaryComputation.__init__(self, random)
        
    def evolve(self, generator, evaluator, pop_size=10, seeds=None, maximize=True, bounder=None, **args):
        self._kwargs = args
        self._kwargs['_ec'] = self
        
        if seeds is None:
            seeds = []
        if bounder is None:
            bounder = inspyred.ec.Bounder()
        
        self.termination_cause = None
        self.generator = generator
        self.evaluator = evaluator
        self.bounder = bounder
        self.maximize = maximize
        self.population = []
        self.archive = []
        microseeds = seeds
        
        while not self._should_terminate(list(self.population), self.num_generations, self.num_evaluations):
            microec = inspyred.ec.EvolutionaryComputation(self._random)
            microec.selector = self.selector
            microec.variator = self.variator
            microec.replacer = self.replacer
            microec.observer = self.observer
            microec.terminator = inspyred.ec.terminators.evaluation_termination
            maxevals = args['max_evaluations']
            args['max_evaluations'] = args['micro_evaluations']
            result = microec.evolve(generator=generator, evaluator=evaluator, 
                                    pop_size=pop_size, seeds=microseeds, 
                                    maximize=maximize, **args)
            self.population = list(result)
            args['max_evaluations'] = maxevals
            result.sort(reverse=True)
            microseeds = [result[0].candidate]
            self.num_evaluations += microec.num_evaluations

            # Migrate individuals.
            self.population = self.migrator(random=self._random, 
                                            population=self.population, 
                                            args=self._kwargs)
            
            # Archive individuals.
            self.archive = self.archiver(random=self._random, archive=self.archive, 
                                         population=list(self.population), args=self._kwargs)
            
            self.num_generations += microec.num_generations
            if isinstance(self.observer, collections.Iterable):
                for obs in self.observer:
                    obs(population=list(self.population), num_generations=self.num_generations, 
                        num_evaluations=self.num_evaluations, args=self._kwargs)
            else:
                self.observer(population=list(self.population), num_generations=self.num_generations, 
                              num_evaluations=self.num_evaluations, args=self._kwargs)
        return self.population


if __name__ == '__main__':
    import random
    import math
    import time

    def rastrigin_generator(random, args):
        return [random.uniform(-5.12, 5.12) for _ in range(2)]

    def rastrigin_evaluator(candidates, args):
        fitness = []
        for cand in candidates:
            fitness.append(10 * len(cand) + sum([x**2 - 10 * (math.cos(2*math.pi*x)) for x in cand]))
        return fitness
        
    prng = random.Random()
    prng.seed(time.time())
    micro = MicroEC(prng)
    micro.selector = inspyred.ec.selectors.tournament_selection
    micro.replacer = inspyred.ec.replacers.steady_state_replacement
    micro.variator = [inspyred.ec.variators.uniform_crossover, inspyred.ec.variators.gaussian_mutation]
    micro.archiver = inspyred.ec.archivers.best_archiver
    micro.observer = inspyred.ec.observers.stats_observer
    micro.terminator = inspyred.ec.terminators.evaluation_termination
    final_pop = micro.evolve(rastrigin_generator, 
                             rastrigin_evaluator, 
                             pop_size=10, 
                             maximize=False, 
                             bounder=inspyred.ec.Bounder(-5.12, 5.12),
                             max_evaluations=3000,
                             micro_evaluations=300,
                             num_selected=2, 
                             gaussian_stdev=0.1)
                             
    print('Actual evaluations: {0}'.format(micro.num_evaluations))

    for p in micro.archive:
        print p

Network Migrator

The following custom migrator is a callable class (because the migrator must behave like a callback function) that allows solutions to migrate from one network machine to another. It is assumed that the EC islands are running on the given IP:port combinations. [download]

import sys
import socket
import pickle
import threading
import collections
import SocketServer

class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    """Defines a migration function across a network.
    
    This callable class acts as a migration function that 
    allows candidate solutions to migrate from one population
    to another via TCP/IP connections.
    
    The migrator is constructed by specifying the IP address
    of the server (hosting the population from which individuals
    emigrate) as an IP-port tuple and the addresses of the clients 
    (hosting the populations to which individuals from the server 
    immigrate) as a list of IP-port tuples. The ``max_migrants`` 
    parameter specifies the size of the queue of migrants waiting 
    to immigrate to the server from the clients; the newest migrants 
    replace older ones in the queue.
    
    Note: In order to use this migration operator, individuals
    must be pickle-able.
        
    The following is an example of the use of this operator::
    
        m = NetworkMigrator(('192.168.1.10', 25125),
                            [('192.168.1.11', 12345), ('192.168.1.12', 54321)], 
                            max_migrants=3)
                        
    Since the NetworkMigrator object is a server, it should always
    call the ``shutdown()`` method when it is no longer needed, in
    order to give back its resources.

    Public Attributes:
    
    - *client_addresses* -- the list of IP address tuples
      (IP, port) to which individuals should migrate
    - *migrants* -- the deque of migrants (of maximum size 
      specified by ``max_migrants``) waiting to immigrate 
      to client populations
    
    """
    def __init__(self, server_address, client_addresses, max_migrants=1):
        self._lock = threading.Lock()
        SocketServer.TCPServer.__init__(self, server_address, None)
        self.client_addresses = client_addresses
        self.migrants = collections.deque(maxlen=max_migrants)
        t = threading.Thread(target=self.serve_forever)
        t.setDaemon(True)
        t.start()
        self.__name__ = self.__class__.__name__

    def finish_request(self, request, client_address):
        try:
            rbufsize = -1
            wbufsize = 0
            rfile = request.makefile('rb', rbufsize)
            wfile = request.makefile('wb', wbufsize)

            pickle_data = rfile.readline().strip()
            migrant = pickle.loads(pickle_data)
            with self._lock:
                self.migrants.append(migrant)
            
            if not wfile.closed:
                wfile.flush()
            wfile.close()
            rfile.close()        
        finally:
            sys.exc_traceback = None

    def __call__(self, random, population, args):
        """Perform the migration.
        
        This function serves as the migration operator. Here, a random address
        is chosen from the ``client_addresses`` list, and a random individual
        is chosen from the population to become the migrant. A socket is opened
        to the chosen client address, and the chosen migrant is pickled and
        sent to the NetworkMigrator object running at the client address. Then
        the migrant queue on the current machine is queried for a migrant
        to replace the one sent. If one is found, it replaces the newly
        migrated individual; otherwise, the individual remains in the population.
        
        Any immigrants may also be re-evaluated before insertion into the
        current population by setting the ``evaluate_migrant`` keyword
        argument in ``args`` to True. This is useful if the evaluation
        functions in different populations are different and we want to compare
        "apples to apples," as they say.

        Arguments:
        
        - *random* -- the random number generator object
        - *population* -- the population of Individuals
        - *args* -- a dictionary of keyword arguments
        
        Optional keyword arguments in the ``args`` parameter:
        
        - *evaluate_migrant* -- whether to re-evaluate the immigrant (default False)
        
        """
        evaluate_migrant = args.setdefault('evaluate_migrant', False)
        client_address = random.choice(self.client_addresses)
        migrant_index = random.randint(0, len(population) - 1)
        pickle_data = pickle.dumps(population[migrant_index])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect(client_address)
            sock.send(pickle_data + '\n')
        finally:
            sock.close()
        migrant = None
        with self._lock:
            if len(self.migrants) > 0:
                migrant = self.migrants.popleft()
        if migrant is not None:
            if evaluate_migrant:
                fit = args._ec.evaluator([migrant], args)
                migrant.fitness = fit[0]
                args._ec.num_evaluations += 1
            population[migrant_index] = migrant
        return population
		
    def __str__(self):
        return str(self.migrants)

Table Of Contents

Previous topic

Examples

Next topic

Library Reference

This Page