# -*- coding: utf-8 -*-
"""
Define Core Classes
"""
import numpy as np
from . import tool
class NeuroLabError(Exception):
pass
class TrainStop(NeuroLabError):
pass
[docs]class Net(object):
"""
Neural Network class
:Parameters:
inp_minmax: minmax: list ci x 2
Range of input value
co: int
Number of output
layers: list of Layer
Network layers
connect: list of list
Connection scheme of layers*
trainf: callable
Train function
errorf: callable
Error function with derivative
:Connect format:
Example 1: for two-layers feed forwad network
>>> connect = [[-1], # - layer 0 receives the input network signal;
... [0], # - layer 1 receives the output signal
... # from the layer 0;
... [1]] # - the network exit receives the output
... # signal from the layer 1.
Example 2: for two-layers Elman network with derivatives:
>>> connect = [[-1, 0], # - layer 0 receives the input network
... # signal and output signal from layer 0;
... [0], # - layer 1 receives the output
... # signal from the layer 0;
... [1]] # - the network exit receives the output
... # signals from the layer 1.
"""
def __init__(self, inp_minmax, co, layers, connect, trainf, errorf):
self.inp_minmax = np.asfarray(inp_minmax)
self.out_minmax = np.zeros([co, 2])
self.ci = self.inp_minmax.shape[0]
self.co = co
self.layers = layers
self.trainf = trainf
self.errorf = errorf
self.inp = np.zeros(self.ci)
self.out = np.zeros(self.co)
# Check connect format
assert self.inp_minmax.ndim == 2
assert self.inp_minmax.shape[1] == 2
if len(connect) != len(layers) + 1:
raise ValueError("Connect error")
# Check connect links
tmp = [0] * len(connect)
for con in connect:
for s in con:
if s != -1:
tmp[s] += 1
for l, c in enumerate(tmp):
if c == 0 and l != len(layers):
raise ValueError("Connect error: Lost the signal " +
"from the layer " + str(l - 1))
self.connect = connect
# Set inp_minmax for all layers
for nl, nums_signal in enumerate(self.connect):
if nl == len(self.layers):
minmax = self.out_minmax
else:
minmax = self.layers[nl].inp_minmax
ni = 0
for ns in nums_signal:
t = self.layers[ns].out_minmax if ns != -1 else self.inp_minmax
if ni + len(t) > len(minmax):
raise ValueError("Connect error: on layer " + str(l - 1))
minmax[ni: ni + len(t)] = t
ni += len(t)
if ni != len(minmax):
raise ValueError("Connect error: Empty inputs on layer " +
str(l - 1))
self.init()
[docs] def step(self, inp):
"""
Simulated step
:Parameters:
inp: array like
Input vector
:Returns:
out: array
Output vector
"""
#TODO: self.inp=np.asfarray(inp)?
self.inp = inp
for nl, nums in enumerate(self.connect):
if len(nums) > 1:
signal = []
for ns in nums:
s = self.layers[ns].out if ns != -1 else inp
signal.append(s)
signal = np.concatenate(signal)
else:
ns = nums[0]
signal = self.layers[ns].out if ns != -1 else inp
if nl != len(self.layers):
self.layers[nl].step(signal)
self.out = signal
return self.out
[docs] def sim(self, input):
"""
Simulate a neural network
:Parameters:
input: array like
array input vectors
:Returns:
outputs: array like
array output vectors
"""
input = np.asfarray(input)
assert input.ndim == 2
assert input.shape[1] == self.ci
output = np.zeros([len(input), self.co])
for inp_num, inp in enumerate(input):
output[inp_num, :] = self.step(inp)
return output
[docs] def init(self):
"""
Iinitialization layers
"""
for layer in self.layers:
layer.init()
[docs] def train(self, *args, **kwargs):
"""
Train network
see net.trainf.__doc__
"""
return self.trainf(self, *args, **kwargs)
[docs] def reset(self):
"""
Clear of deley
"""
self.inp.fill(0)
self.out.fill(0)
for layer in self.layers:
layer.inp.fill(0)
layer.out.fill(0)
[docs] def save(self, fname):
"""
Save network on file
:Parameters:
fname: file name
"""
tool.save(self, fname)
[docs] def copy(self):
"""
Copy network
"""
import copy
cnet = copy.deepcopy(self)
return cnet
[docs]class Layer(object):
"""
Abstract Neural Layer class
:Parameters:
ci: int
Number of inputs
cn: int
Number of neurons
co: int
Number of outputs
property: dict
property: array shape
example: {'w': (10, 1), 'b': 10}
"""
def __init__(self, ci, cn, co, property):
self.ci = ci
self.cn = cn
self.co = co
self.np = {}
for p, shape in property.items():
self.np[p] = np.empty(shape)
self.inp = np.zeros(ci)
self.out = np.zeros(co)
# Property must be change when init Layer
self.out_minmax = np.empty([self.co, 2])
# Property will be change when init Net
self.inp_minmax = np.empty([self.ci, 2])
self.initf = None
[docs] def step(self, inp):
""" Layer simulation step """
assert len(inp) == self.ci
out = self._step(inp)
self.inp = inp
self.out = out
[docs] def init(self):
""" Init Layer random values """
if type(self.initf) is list:
for initf in self.initf:
initf(self)
elif self.initf is not None:
self.initf(self)
def _step(self, inp):
raise NotImplementedError("Call abstract metod Layer._step")
[docs]class Trainer(object):
"""
Control of network training
"""
def __init__(self, Train, epochs=500, goal=0.01, show=100, **kwargs):
"""
:Parameters:
Train: Train instance
Train algorithm
epochs: int (default 500)
Number of train epochs
goal: float (default 0.01)
The goal of train
show: int (default 100)
Print period
**kwargs: dict
other Train parametrs
"""
# Sets defaults train params
self._train_class = Train
self.defaults = {}
self.defaults['goal'] = goal
self.defaults['show'] = show
self.defaults['epochs'] = epochs
self.defaults['train'] = kwargs
if Train.__init__.__defaults__:
#cnt = Train.__init__.func_code.co_argcount
cnt = Train.__init__.__code__.co_argcount
#names = Train.__init__.func_code.co_varnames
names = Train.__init__.__code__.co_varnames
vals = Train.__init__.__defaults__
st = cnt - len(vals)
for k, v in zip(names[st: cnt], vals):
if k not in self.defaults['train']:
self.defaults['train'][k] = v
self.params = self.defaults.copy()
self.error = []
def __str__(self):
return 'Trainer(' + self._train_class.__name__ + ')'
def __call__(self, net, input, target=None, **kwargs):
"""
Run train process
:Parameters:
net: Net instance
network
input: array like (l x net.ci)
train input patterns
target: array like (l x net.co)
train target patterns - only for train with teacher
**kwargs: dict
other Train parametrs
"""
self.params = self.defaults.copy()
self.params['train'] = self.defaults['train'].copy()
for key in kwargs:
if key in self.params:
self.params[key] = kwargs[key]
else:
self.params['train'][key] = kwargs[key]
args = []
input = np.asfarray(input)
assert input.ndim == 2
assert input.shape[1] == net.ci
args.append(input)
if target is not None:
target = np.asfarray(target)
assert target.ndim == 2
assert target.shape[1] == net.co
assert target.shape[0] == input.shape[0]
args.append(target)
def epochf(err, net, *args):
"""Need call on each epoch"""
if err is None:
err = train.error(net, *args)
self.error.append(err)
epoch = len(self.error)
show = self.params['show']
if show and (epoch % show) == 0:
print("Epoch: {0}; Error: {1};".format(epoch, err))
if err < self.params['goal']:
raise TrainStop('The goal of learning is reached')
if epoch >= self.params['epochs']:
raise TrainStop('The maximum number of train epochs is reached')
train = self._train_class(net, *args, **self.params['train'])
Train.__init__(train, epochf, self.params['epochs'])
self.error = []
try:
train(net, *args)
except TrainStop as msg:
if self.params['show']:
print(msg)
else:
if self.params['show'] and len(self.error) >= self.params['epochs']:
print("The maximum number of train epochs is reached")
return self.error
[docs]class Train(object):
"""Base train abstract class"""
def __init__(self, epochf, epochs):
self.epochf = epochf
self.epochs = epochs
def __call__(self, net, *args):
for epoch in range(self.epochs):
err = self.error(net, *args)
self.epochf(err, net, *args)
self.learn(net, *args)
[docs] def error(self, net, input, target, output=None):
"""Only for train with teacher"""
if output is None:
output = net.sim(input)
return net.errorf(target, output)