from __future__ import print_function
from __future__ import print_function
import uuid
import numpy as np
import scipy.optimize
from ._population import Population
from pychemia.utils.mathematics import unit_vector
[docs]class RealFunction(Population):
def __init__(self, function, ndim, limits, local_minimization=False):
"""
Creates a simple population of points in R^N with
N=ndim the dimensionality of space and
using an univaluated function 'function'
:param function: Routine to evaluate a function
:param ndim: (int) Dimensions of function space
:param limits: (numpy.ndarray)
:return:
"""
Population.__init__(self, 'Euclidean', 'global', use_mongo=False)
self.tag = 'global'
self.name = 'Real Function'
self.function = function
self.ndim = ndim
if len(limits) == 2:
self.limits = np.zeros((ndim, 2))
self.limits[:, 0] = limits[0]
self.limits[:, 1] = limits[1]
else:
self.limits = np.array(limits)
assert (self.limits.shape == (2, ndim))
self._members = []
self._actives = []
self.db = {}
self.moves = {}
self.pcdb = None
self.local_minimization = local_minimization
def __str__(self):
ret = ' Euclidean Population\n\n'
ret += ' Name: %s\n' % self.name
ret += ' Tag: %s\n' % self.tag
ret += '\n'
ret += ' Members: %d\n' % len(self.members)
ret += ' Actives: %d\n' % len(self.actives)
ret += ' Evaluated: %d\n' % len(self.evaluated)
return ret
[docs] def new_entry(self, data, active=True):
ident = self.new_identifier()
x = np.atleast_1d(data)
self.db[ident] = {'x': x, 'fx': None}
self.evaluate_entry(ident)
if active:
self.actives.append(ident)
self.members.append(ident)
return ident
[docs] def add_random(self):
x = np.random.random_sample(self.ndim)
x = x * (self.limits[:, 1] - self.limits[:, 0]) + self.limits[:, 0]
return self.new_entry(x), None
[docs] def check_duplicates(self, ids):
selection = self.ids_sorted(ids)
ret = {}
if len(ids) == 0:
return ret
for i in range(len(ids)):
ident1 = selection[i]
for j in range(i + 1, len(ids)):
ident2 = selection[j]
distance = self.distance(ident1, ident2)
if distance < 1E-2:
ret[ident2] = ident1
return ret
[docs] def coordinate(self, i):
return self.db[i]['x']
[docs] def cross(self, ids):
assert len(ids) == 2
parent1 = self.coordinate(ids[0])
parent2 = self.coordinate(ids[1])
if self.ndim == 1:
son1 = 0.5 * (parent1 + parent2)
son2 = np.abs(parent1 - parent2)
elif self.ndim == 2:
son1 = np.array([parent1[0], parent2[1]])
son2 = np.array([parent2[0], parent1[1]])
else:
split = np.random.randint(1, self.ndim - 1)
son1 = np.concatenate((parent1[:split], parent2[split:]))
son2 = np.concatenate((parent2[:split], parent1[split:]))
new_ident1 = self.new_identifier()
self.members.append(new_ident1)
new_ident2 = self.new_identifier()
self.members.append(new_ident2)
self.db[new_ident1] = {'x': son1, 'fx': None}
self.evaluate_entry(new_ident1)
self.db[new_ident2] = {'x': son2, 'fx': None}
self.evaluate_entry(new_ident2)
if self.db[new_ident1]['fx'] > self.db[new_ident2]['fx']:
return new_ident2, new_ident1
else:
return new_ident1, new_ident2
[docs] def distance(self, imember, jmember):
# The trivial metric
x1 = self.db[imember]['x']
x2 = self.db[jmember]['x']
return np.linalg.norm(x2 - x1)
[docs] def enable(self, ident):
if ident not in self.actives:
self.actives.append(ident)
[docs] def evaluate(self):
for ident in self.members:
self.evaluate_entry(ident)
[docs] def evaluate_entry(self, ident):
x = self.db[ident]['x']
if self.local_minimization:
localmin = scipy.optimize.minimize(self.function, x)
if self.is_inside(localmin.x):
self.db[ident]['x'] = localmin.x
self.db[ident]['fx'] = localmin.fun
else:
self.db[ident]['fx'] = self.function(x)
else:
self.db[ident]['fx'] = self.function(x)
# def evaluator_daemon(self):
#
# def worker(db, function, d):
# while True:
# for entry_id in db:
# if db[entry_id]['fx'] is None:
# self.evaluate_entry(entry_id)
# for entry_id in self.db:
# d[entry_id] = self.db[entry_id]
# time.sleep(5)
#
# manager = Manager()
# d = manager.dict()
#
# p = Process(target=worker, args=(self.db, self.function, d))
# p.start()
# return p, d
[docs] def is_evaluated(self, i):
if i in self.db and self.db[i]['fx'] is not None:
return True
else:
return False
[docs] def from_dict(self, population_dict):
pass
[docs] def disable(self, ident):
if ident in self.actives:
self.actives.remove(ident)
[docs] def get_values(self, selection):
ret = {}
for i in selection:
ret[i] = self.value(i)
return ret
[docs] def member_str(self, imember):
ret = '('
for i in range(self.ndim):
ret += '%5.2f' % self.db[imember]['x'][i]
if i < self.ndim - 1:
ret += ', '
else:
ret += ') -> '
if self.value(imember) is not None:
ret += '%5.2f' % self.value(imember)
else:
ret += 'None'
return ret
[docs] def move(self, imember, jmember, factor=0.2, in_place=False):
"""
Moves imember in the direction of jmember
If in_place is True the movement occurs on the
same address as imember
:param factor:
:param imember:
:param jmember:
:param in_place:
:return:
"""
x1 = self.db[imember]['x']
x2 = self.db[jmember]['x']
newx = x1 + factor * (x2 - x1)
if not in_place:
new_ident = self.new_identifier()
self.actives.append(new_ident)
self.members.append(new_ident)
else:
new_ident = imember
# print 'Moving',imember,'located at',x1
if new_ident not in self.moves:
# print 'No previous'
self.moves[new_ident] = np.vstack((x1, newx))
else:
# print 'With previous'
self.moves[new_ident] = np.vstack((self.moves[new_ident], newx))
# print self.moves[new_ident]
self.db[new_ident] = {'x': newx, 'fx': None}
self.evaluate_entry(new_ident)
return new_ident
[docs] def is_inside(self, x):
outside = False
for i in range(self.ndim):
if self.limits[i, 0] > x[i] or x[i] > self.limits[i, 1]:
outside = True
# print('New position out of limits', x, self.limits)
return not outside
[docs] def move_random(self, imember, factor=0.2, in_place=False, kind='move'):
x = np.array(self.db[imember]['x'])
newx = x
outside = True
while outside:
dx = 2 * np.random.random_sample(self.ndim) - 1
# print 'Random movement', dx, factor
dx = unit_vector(dx)
newx = x + factor * dx
outside = not self.is_inside(newx)
if not in_place:
new_ident = self.new_identifier()
self.actives.append(new_ident)
self.members.append(new_ident)
else:
new_ident = imember
self.db[new_ident] = {'x': newx, 'fx': None}
self.evaluate_entry(new_ident)
return new_ident
@staticmethod
[docs] def new_identifier():
return str(uuid.uuid4())[-12:]
[docs] def random_population(self, n):
for i in range(n):
self.add_random()
[docs] def replace_failed(self):
pass
[docs] def recover(self):
pass
[docs] def save(self):
wf = open('population.dat', 'w')
for i in sorted(self.members):
wf.write("%15s %12.3f %12.3f\n" % (i, self.db[i]['x'][0], self.db[i]['x'][1]))
wf.close()
wf = open('members.dat', 'w')
for i in sorted(self.members):
wf.write("%15s\n" % i)
wf.close()
[docs] def save_info(self):
pass
[docs] def set_value(self, i, y):
self.db[i]['fx'] = y
[docs] def str_entry(self, entry_id):
ret = 'x = ['
for i in self.db[entry_id]['x']:
ret += '%7.2e ,' % i
ret = ret[:-1] + '] f(x) = %7.2e' % self.db[entry_id]['fx']
return ret
[docs] def value(self, imember):
return self.db[imember]['fx']
[docs] def write_change(self, change):
# print 'Write Change', change
if change['change'] == 'promoted':
self.db[change['from']]['from'] = change['from']
else:
self.db[change['to']]['from'] = change['from']
@property
def actives(self):
return self._actives
@property
def members(self):
return self._members
[docs] def get_duplicates(self, ids):
pass