from __future__ import print_function
import time
from abc import ABCMeta, abstractmethod
from pychemia import pcm_log, HAS_PYMONGO
if HAS_PYMONGO:
import bson
[docs]class Searcher:
__metaclass__ = ABCMeta
"""
General class for all optimization algorithms that uses fixed and blocked
Generations
"""
def __init__(self, population, generation_size=32, stabilization_limit=10,
target_value=None, searcher_id=None):
self.population = population
self.generation_size = generation_size
self.stabilization_limit = stabilization_limit
self.target_value = target_value
self.current_generation = 0
self.generation = {}
self.sleep_time = 2
self.delta_change = 0.2
self.lineage = {}
self.lineage_inv = {}
self.old_actives = []
self.old_nextgen = []
self.pcdb = population.pcdb
if searcher_id is None:
self.searcher_id = self.population.tag
else:
self.searcher_id = searcher_id
[docs] def recover(self, changedb=False):
if self.pcdb is not None:
data = self.pcdb.db.searcher_info.find_one({'_id': self.searcher_id})
if data is not None:
self.stabilization_limit = data['stabilization_limit']
self.generation_size = data['generation_size']
self.set_params(data['params'])
self.generation = {}
self.current_generation = 0
for entry in self.get_all_generations():
if self.searcher_id in entry:
self.generation[entry['_id']] = entry[self.searcher_id]
if max(self.generation[entry['_id']]) > self.current_generation:
self.current_generation = max(self.generation[entry['_id']])
lineage_data = self.pcdb.db.lineage.find_one({'_id': self.searcher_id})
if lineage_data is not None:
lineage = lineage_data['lineage']
lineage_inv = lineage_data['lineage_inv']
changed = False
for i in lineage.keys():
for j in lineage[i]:
if j == 'None':
lineage[i].remove(j)
changed = True
if changed and changedb:
print('Lineage was changed (Some entry was null)')
self.pcdb.db.lineage.update({'_id': self.searcher_id}, {'$set': {'lineage': lineage}})
sizes = [len(lineage[x]) for x in lineage]
assert min(sizes) == max(sizes)
for i in lineage.keys():
if HAS_PYMONGO:
self.lineage[i] = [bson.ObjectId(x) for x in lineage[i]]
else:
self.lineage[i] = [x for x in lineage[i]]
for i in lineage_inv.keys():
if HAS_PYMONGO:
self.lineage_inv[bson.ObjectId(i)] = lineage_inv[i]
else:
self.lineage_inv[i] = lineage_inv[i]
self.correct_extras(changedb)
def __str__(self):
ret = ' Searcher Name: %s\n' % self.searcher_name
ret += ' Generation size: %d\n' % self.generation_size
ret += ' Stabilization limit: %d\n' % self.stabilization_limit
ret += ' Current Generation: %d\n' % self.current_generation
ret += ' Parameters: %s\n' % str(self.get_params())
return ret
[docs] def enforce_generation_size(self):
while True:
# Get a snapshot of the current status
# Otherwise some inconsistencies could appear if some entries become evaluated during the execution
# of this routine
actives = self.population.actives
evaluated = self.population.evaluated
actives_evaluated = self.population.actives_evaluated
actives_no_evaluated = self.population.actives_no_evaluated
if len(actives) == 0:
self.population.random_population(self.generation_size)
break
elif len(actives) == self.generation_size:
pcm_log.debug('Equal')
self.correct_extras(changedb=True)
break
elif len(actives) > self.generation_size:
pcm_log.debug('More %d' % len(actives))
self.recover(changedb=True)
self.correct_extras(changedb=True)
else:
pcm_log.debug('Less %d' % len(actives))
sizes = [len(self.lineage[x]) for x in self.lineage]
assert min(sizes) == max(sizes)
for i in self.lineage:
entry_id = self.lineage[i][-1]
print('Activating', i, entry_id)
self.population.enable(entry_id)
assert len(self.population.actives) == self.generation_size
assert len(self.get_generation()) == self.generation_size
sizes = [len(self.get_generation(i)) for i in range(self.current_generation)]
assert min(sizes) == max(sizes)
# if len(evaluated) >= self.generation_size:
# pcm_log.debug('Enabling some entries starting from the best candidates')
# candidates = self.population.ids_sorted(evaluated)
# index = 0
# while len(actives) < self.generation_size and index < len(evaluated):
# if candidates[index] not in actives:
# pcm_log.debug('Not in actives %s' % str(candidates[index]))
# self.population.enable(candidates[index])
# else:
# pcm_log.debug('In actives %s' % str(candidates[index]))
# index += 1
# actives = self.population.actives
# else:
# n = self.generation_size - len(actives)
# pcm_log.debug('Creating %d random structures' % n)
# self.population.random_population(n)
for i in self.population.actives:
self.set_generation(i, self.current_generation)
[docs] def get_generation(self, generation_number=None):
"""
Return all the elements tagged as belonging to a given generation
:param generation_number: Number of generation to get
:return: (list) Identifiers of all candidates for a given generation number
:rtype: list
"""
if generation_number is None:
generation_number = self.current_generation
return [x for x in self.generation if generation_number in self.generation[x]]
[docs] def print_status(self, level='DEBUG'):
if level == 'DEBUG':
pcm_log.debug(' %s (tag: %s)' % (self.population.name, self.population.tag))
pcm_log.debug(' Current Generation : %4d' % self.current_generation)
pcm_log.debug(' Population (evaluated/total) : %4d /%4d' % (len(self.population.evaluated),
len(self.population.members)))
pcm_log.debug(' Actives (evaluated/total) : %4d /%4d' % (len(self.population.actives_evaluated),
len(self.population.actives)))
pcm_log.debug(' Size of Generation (this/next) : %4d /%4d' % (len(self.get_generation()),
len(self.get_generation(
self.current_generation + 1))))
else:
pcm_log.info(' %s (tag: %s)' % (self.population.name, self.population.tag))
pcm_log.info(' Current Generation : %4d' % self.current_generation)
pcm_log.info(' Population (evaluated/total) : %4d /%4d' % (len(self.population.evaluated),
len(self.population.members)))
pcm_log.info(' Actives (evaluated/total) : %4d /%4d' % (len(self.population.actives_evaluated),
len(self.population.actives)))
pcm_log.info(' Size of Generation (this/next) : %4d /%4d' % (len(self.get_generation()),
len(self.get_generation(
self.current_generation + 1))))
if len(self.get_generation(self.current_generation + 1)) + len(self.population.actives) != self.generation_size:
pcm_log.debug('Change in generations')
for i in self.old_actives:
if i not in self.population.actives:
pcm_log.debug('This active disappeared: %s' % str(i))
for i in self.population.actives:
if i not in self.old_actives:
pcm_log.debug('This active appeared: %s' % str(i))
for i in self.old_nextgen:
if i not in self.get_generation(self.current_generation + 1):
pcm_log.debug('This nextgen disappeared: %s' % str(i))
for i in self.get_generation(self.current_generation + 1):
if i not in self.old_nextgen:
pcm_log.debug('This nextgen appeared: %s' % str(i))
self.old_actives = self.population.actives
self.old_nextgen = self.get_generation(self.current_generation + 1)
[docs] def advance(self, father, son, change):
self.write_change(father, change)
if father not in self.lineage_inv:
print('%s not in current lineage' % father)
print('Lineages %s' % self.lineage_inv.keys())
raise ValueError('Father not in lineage')
slot = self.lineage_inv[father]
self.lineage[slot].append(None)
self.lineage[slot][self.current_generation + 1] = son
self.lineage_inv[son] = slot
self.set_generation(son, self.current_generation + 1)
[docs] def pass_to_new_generation(self, entry_id, reason=None):
# pcm_log.debug('Moving to new generation : %s' % str(entry_id))
change = {'change': 'promoted', 'reason': reason}
self.advance(entry_id, entry_id, change)
[docs] def replace_by_changed(self, entry_id_old, reason=None):
entry_id_new = self.population.move_random(entry_id_old, factor=self.delta_change, in_place=False,
kind='change')
change = {'change': 'modified', 'to': entry_id_new, 'reason': reason}
pcm_log.debug('Modified %s -> %s' % (str(entry_id_old), str(entry_id_new)))
self.population.disable(entry_id_old)
self.advance(entry_id_old, entry_id_new, change)
[docs] def replace_by_other(self, entry_id_old, entry_id_new, reason=None):
change = {'change': 'replace_by_other', 'to': entry_id_new, 'reason': reason}
# pcm_log.debug('Changed %s -> %s' % (str(entry_id_old), str(entry_id_new)))
self.population.disable(entry_id_old)
self.advance(entry_id_old, entry_id_new, change)
[docs] def replace_by_random(self, entry_id_old, reason=None):
self.population.disable(entry_id_old)
entry_id_new, origin = self.population.add_random()
pcm_log.debug('Replace by random %s -> %s' % (str(entry_id_old), str(entry_id_new)))
change = {'change': 'replace_by_random', 'to': entry_id_new, 'reason': reason, 'origin': origin}
self.advance(entry_id_old, entry_id_new, change)
[docs] def save_generations(self):
if self.pcdb is not None:
for entry_id in sorted(self.generation):
info = self.generation[entry_id]
if self.pcdb.db.generations.find_one({'_id': entry_id}) is None:
self.pcdb.db.generations.insert({'_id': entry_id, self.searcher_id: info})
else:
self.pcdb.db.generations.update({'_id': entry_id}, {'$set': {self.searcher_id: info}})
lineage = {}
for i in self.lineage.keys():
lineage[i] = [str(x) for x in self.lineage[i]]
lineage_inv = {}
for i in self.lineage_inv.keys():
lineage_inv[str(i)] = self.lineage_inv[i]
if self.pcdb.db.lineage.find_one({'_id': self.searcher_id}) is None:
self.pcdb.db.lineage.insert({'_id': self.searcher_id,
'lineage_inv': lineage_inv,
'lineage': lineage})
else:
self.pcdb.db.lineage.update({'_id': self.searcher_id}, {'$set': {'lineage': lineage,
'lineage_inv': lineage_inv}})
[docs] def set_generation(self, entry_id, value):
if entry_id not in self.generation:
self.generation[entry_id] = [value]
else:
if value not in self.generation[entry_id]:
self.generation[entry_id].append(value)
@abstractmethod
[docs] def run_one(self):
pass
@abstractmethod
[docs] def set_params(self, params):
pass
@abstractmethod
[docs] def get_params(self):
pass
[docs] def update_lineages(self):
assert len(self.population.actives) == self.generation_size
# Complete lineage dictionary with generation_size entries
# Each entry on the dictionary is a list of current_generation+1 values
for i in range(self.generation_size):
if str(i) not in self.lineage:
self.lineage[str(i)] = [None]
while len(self.lineage[str(i)]) < self.current_generation + 1:
self.lineage[str(i)].append(None)
# Complete lineage for all actives with known lineage_inv
for entry_id in self.population.actives:
if entry_id in self.lineage_inv:
slot = self.lineage_inv[entry_id]
self.lineage[slot][self.current_generation] = entry_id
# Find empty slots for actives not in lineage_inv
for entry_id in self.population.actives:
if entry_id not in self.lineage_inv:
for i in range(self.generation_size):
# Search for the first empty slot and associate it to the entry_id
if self.lineage[str(i)][self.current_generation] is None:
self.lineage[str(i)][self.current_generation] = entry_id
self.lineage_inv[entry_id] = str(i)
break
# Check consistency between lineage and lineage_inv
for i in range(self.generation_size):
assert str(i) in self.lineage
assert self.lineage[str(i)][-1] in self.lineage_inv
assert self.lineage_inv[self.lineage[str(i)][-1]] == str(i)
# Setting Generation list
pcm_log.debug('Setting generation...')
for entry_id in self.population.actives:
self.set_generation(entry_id, self.current_generation)
[docs] def run(self):
"""
Execute the total number of cycles
:return:
"""
print(str(self))
print(str(self.population))
self.save_info()
self.population.save_info()
best_member = ''
best_recorded = None
survival_for_best = 0
while True:
print('\nGENERATION: %d' % self.current_generation)
self.print_status(level='DEBUG')
pcm_log.debug('[%s] Enforcing the size of generation: %d' % (self.searcher_name, self.generation_size))
self.enforce_generation_size()
self.update_lineages()
self.old_actives = self.population.actives
self.old_nextgen = self.get_generation(self.current_generation + 1)
self.print_status()
number_evaluated = len(self.population.actives_evaluated)
while self.population.fraction_evaluated < 1.0:
if len(self.population.actives_evaluated) != number_evaluated:
msg = "Population '%s' still not evaluated. %4.0f %%"
pcm_log.debug(msg % (self.population.name, 100 * self.population.fraction_evaluated))
self.print_status(level='DEBUG')
number_evaluated = len(self.population.actives_evaluated)
self.population.replace_failed()
time.sleep(self.sleep_time)
pcm_log.debug("Population '%s' evaluated. %4.0f %%" % (self.population.name,
100 * self.population.fraction_evaluated))
best_member = self.population.best_candidate
self.population.refine_progressive(best_member)
print('Current best candidate: [%s]:\n%s' % (best_member, self.population.str_entry(best_member)))
if best_member in self.get_generation():
print('This candidate have survived for %d generations' % len(self.generation[best_member]))
if len(self.generation[best_member]) >= self.stabilization_limit:
self.save_generations()
break
else:
pcm_log.debug('Best candidate %s is not in the current generation' % best_member)
# pcm_log.debug('Slot: %s' % self.lineage_inv[best_member])
# pcm_log.debug('Lineage: %s' % self.lineage[self.lineage_inv[best_member]])
if best_member != best_recorded:
survival_for_best = 0
best_recorded = best_member
else:
survival_for_best += 1
if survival_for_best >= self.stabilization_limit:
print('This candidate have survived for %d generations' % survival_for_best)
self.save_generations()
break
if self.target_value is not None:
if self.population.value(best_member) <= self.target_value:
print('Target value achieved: target=%9.3f best=%9.3f' % (self.population.value(best_member),
self.target_value))
self.save_generations()
break
else:
print('Best value = %7.3f target value = %7.3f' % (self.population.value(best_member),
self.target_value))
pcm_log.debug('[%s] Removing not evaluated: %d' %
(self.searcher_name, len(self.population.actives_no_evaluated)))
for entry_id in self.population.actives_no_evaluated:
self.replace_by_random(entry_id, reason='no evaluated')
self.print_status()
duplicates = self.population.check_duplicates(self.population.ids_sorted(self.population.actives_evaluated))
for entry_id in duplicates:
change = {'change': 'duplicate', 'to': duplicates[entry_id], 'reason': None}
self.write_change(entry_id, change)
self.replace_by_random(entry_id, reason='duplicate')
pcm_log.info(' Duplicates identified and disabled: %d' % len(duplicates))
self.print_status(level='INFO')
pcm_log.info(' Running one cycle for %s with %d candidates' % (self.searcher_name,
len(self.actives_in_generation)))
self.run_one()
self.update_generation()
print('Searcher ended after %d iterations' % self.current_generation)
print('Best candidate: [%s]:\n%s' % (best_member, self.population.str_entry(best_member)))
[docs] def write_change(self, entry_id, change):
if self.pcdb is not None:
change['method'] = self.searcher_name
change['tag'] = self.searcher_name
change['generation'] = self.current_generation
change['from'] = entry_id
self.pcdb.db.generation_changes.insert(change)
[docs] def update_generation(self):
# Increase the current generation number
assert len(self.get_generation()) == self.generation_size
self.current_generation += 1
# Enable all entries in the new generation
# Disable all entries not in new generation
pcm_log.debug('[%s] Activating new generation, disabling others' % self.searcher_name)
for entry_id in self.generation:
if self.current_generation in self.generation[entry_id]:
self.population.enable(entry_id)
else:
self.population.disable(entry_id)
self.save_generations()
@property
def searcher_name(self):
return self.__class__.__name__
@property
def to_dict(self):
return {'name': self.searcher_name,
'generation_size': self.generation_size,
'stabilization_limit': self.stabilization_limit,
'params': self.get_params()}
[docs] def save_info(self):
if self.pcdb is not None:
data = self.pcdb.db.searcher_info.find_one({'_id': self.searcher_id})
if data is None:
data = self.to_dict
data['_id'] = self.searcher_id
self.pcdb.db.searcher_info.insert(data)
else:
self.pcdb.db.searcher_info.update({'_id': self.searcher_id}, self.to_dict)
[docs] def clean(self):
if self.pcdb is not None:
self.pcdb.db.searcher_info.drop()
self.pcdb.db.generations.drop()
self.pcdb.db.generation_changes.drop()
self.pcdb.db.lineage.drop()
[docs] def replace_failed(self):
self.pcdb.replace_failed()
[docs] def get_all_generations(self, generation_number=None):
if generation_number is None:
return self.pcdb.db.generations.find()
else:
return self.pcdb.db.generations.find({self.population.tag: generation_number})
@property
def actives_in_generation(self):
return [x for x in self.population.actives if self.population.is_evaluated(x) and x in self.get_generation()]