Source code for hessianfree.demos

from __future__ import print_function

import ast
from cProfile import Profile
import pickle
import pstats
import sys

import numpy as np
import matplotlib.pyplot as plt

import hessianfree as hf


[docs]def xor(use_hf=True): """Run a basic xor training test. :param bool use_hf: if True run example using Hessian-free optimization, otherwise use stochastic gradient descent """ inputs = np.asarray([[0, 0], [0, 1], [1, 0], [1, 1]], dtype=np.float32) targets = np.asarray([[0], [1], [1], [0]], dtype=np.float32) ff = hf.FFNet([2, 5, 1]) if use_hf: ff.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter=2), max_epochs=40, plotting=True) else: # using gradient descent (for comparison) ff.run_epochs(inputs, targets, optimizer=hf.opt.SGD(l_rate=1), max_epochs=10000, plotting=True) outputs = ff.forward(inputs)[-1] for i in range(4): print("-" * 2) print("input", inputs[i]) print("target", targets[i]) print("output", outputs[i])
[docs]def crossentropy(): """A network that modifies the layer types and loss function.""" inputs = np.asarray([[0, 0], [0, 1], [1, 0], [1, 1]], dtype=np.float32) targets = np.asarray([[1, 0], [0, 1], [0, 1], [1, 0]], dtype=np.float32) ff = hf.FFNet([2, 5, 2], layers=[hf.nl.Linear(), hf.nl.Tanh(), hf.nl.Softmax()], loss_type=hf.loss_funcs.CrossEntropy()) ff.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter=2), max_epochs=40, plotting=True) outputs = ff.forward(inputs)[-1] for i in range(4): print("-" * 20) print("input", inputs[i]) print("target", targets[i]) print("output", outputs[i])
[docs]def connections(): """A network with non-standard connectivity between layers.""" inputs = np.asarray([[0, 0], [0, 1], [1, 0], [1, 1]], dtype=np.float32) targets = np.asarray([[0], [1], [1], [0]], dtype=np.float32) ff = hf.FFNet([2, 5, 5, 1], layers=hf.nl.Tanh(), conns={0: [1, 2], 1: [2, 3], 2: [3]}) ff.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter=2), max_epochs=40, plotting=True) outputs = ff.forward(inputs)[-1] for i in range(4): print("-" * 20) print("input", inputs[i]) print("target", targets[i]) print("output", outputs[i])
[docs]def mnist(model_args=None, run_args=None): """Test on the MNIST (digit classification) dataset. Download dataset at http://deeplearning.net/data/mnist/mnist.pkl.gz :param dict model_args: kwargs that will be passed to the :class:`.FFNet` constructor :param dict run_args: kwargs that will be passed to :meth:`.run_epochs` """ with open("mnist.pkl", "rb") as f: try: train, _, test = pickle.load(f) except UnicodeDecodeError: # python 3 with open("mnist.pkl", "rb") as f2: train, _, test = pickle.load(f2, encoding="bytes") if model_args is None: ff = hf.FFNet([28 * 28, 1024, 512, 256, 32, 10], layers=([hf.nl.Linear()] + [hf.nl.ReLU()] * 4 + [hf.nl.Softmax()]), use_GPU=True, debug=False) else: ff = hf.FFNet([28 * 28, 1024, 512, 256, 32, 10], layers=([hf.nl.Linear()] + [hf.nl.ReLU()] * 4 + [hf.nl.Softmax()]), **model_args) inputs = train[0] targets = np.zeros((inputs.shape[0], 10), dtype=np.float32) targets[np.arange(inputs.shape[0]), train[1]] = 0.9 targets += 0.01 tmp = np.zeros((test[0].shape[0], 10), dtype=np.float32) tmp[np.arange(test[0].shape[0]), test[1]] = 0.9 tmp += 0.01 test = (test[0], tmp) if run_args is None: ff.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter=250, init_damping=45), minibatch_size=7500, test=test, max_epochs=125, test_err=hf.loss_funcs.ClassificationError(), plotting=True) else: CG_iter = run_args.pop("CG_iter", 250) init_damping = run_args.pop("init_damping", 45) ff.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter, init_damping), test=test, test_err=hf.loss_funcs.ClassificationError(), **run_args) output = ff.forward(test[0]) print("classification error", hf.loss_funcs.ClassificationError().batch_loss(output, test[1]))
[docs]def integrator(model_args=None, run_args=None, n_inputs=15, sig_len=10, plots=True): """A recurrent network implementing an integrator. :param dict model_args: kwargs that will be passed to the :class:`.RNNet` constructor :param dict run_args: kwargs that will be passed to :meth:`.run_epochs` :param int n_inputs: size of batch to train on :param int sig_len: number of timesteps to run for :param bool plots: display plots of trained output """ inputs = np.outer(np.linspace(0.1, 0.9, n_inputs), np.ones(sig_len))[:, :, None] targets = np.outer(np.linspace(0.1, 0.9, n_inputs), np.linspace(0, 1, sig_len))[:, :, None] inputs = inputs.astype(np.float32) targets = targets.astype(np.float32) test = (inputs, targets) if model_args is None: rnn = hf.RNNet(shape=[1, 10, 1], layers=hf.nl.Logistic(), debug=False, use_GPU=False) else: rnn = hf.RNNet(**model_args) if run_args is None: rnn.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter=100), test=test, max_epochs=30, plotting=plots) else: CG_iter = run_args.pop("CG_iter", 100) init_damping = run_args.pop("init_damping", 1) rnn.run_epochs(inputs, targets, optimizer=hf.opt.HessianFree(CG_iter, init_damping), test=test, plotting=plots, **run_args) if plots: plt.figure() plt.plot(inputs.squeeze().T) plt.title("inputs") plt.figure() plt.plot(targets.squeeze().T) plt.title("targets") outputs = rnn.forward(inputs)[-1] plt.figure() plt.plot(outputs.squeeze().T) plt.title("outputs") plt.show()
[docs]def adding(T=50, plots=True): """The canonical "adding" test of long-range dependency learning for RNNs. :param int T: length of the test signal :param bool plots: display plots of trained output """ # set up inputs N = 10000 test_cut = int(N * 0.9) vals = np.random.uniform(0, 1, size=(N, T, 1)).astype(np.float32) mask = np.zeros((N, T, 1), dtype=np.float32) for m in mask: m[np.random.randint(T / 10)] = 1 m[np.random.randint(T / 10, T / 2)] = 1 inputs = np.concatenate((vals, mask), axis=-1) tmp = np.zeros_like(vals) tmp[mask.astype(np.bool)] = vals[mask.astype(np.bool)] targets = np.zeros((N, T, 1), dtype=np.float32) targets[:] = np.nan targets[:, -1] = np.sum(tmp, axis=1, dtype=np.float32) test = (inputs[test_cut:], targets[test_cut:]) # build network optimizer = hf.opt.HessianFree(CG_iter=60, init_damping=20) rnn = hf.RNNet( shape=[2, 32, 64, 1], layers=[hf.nl.Linear(), hf.nl.ReLU(), hf.nl.Continuous(hf.nl.ReLU(), tau=20), hf.nl.ReLU()], W_init_params={"coeff": 0.25}, loss_type=[hf.loss_funcs.SquaredError(), hf.loss_funcs.StructuralDamping(1e-4, layers=[2], optimizer=optimizer)], rec_layers=[2], use_GPU=True, debug=False, rng=np.random.RandomState(0)) # scale spectral radius of recurrent weights W, _ = rnn.get_weights(rnn.W, (2, 2)) W *= 1.0 / np.max(np.abs(np.linalg.eigvals(W))) rnn.run_epochs(inputs[:test_cut], targets[:test_cut], optimizer=optimizer, minibatch_size=1024, test=test, max_epochs=5, plotting=plots, test_err=hf.loss_funcs.SquaredError()) if plots: outputs = rnn.forward(inputs[:20]) plt.figure() lines = plt.plot(outputs[-1][:].squeeze().T) plt.scatter(np.ones(outputs[-1].shape[0]) * outputs[-1].shape[1], targets[:20, -1], c=[plt.getp(l, "color") for l in lines]) plt.title("outputs") plt.show()
[docs]def plant(plots=True): """A network using a dynamic plant as the output layer. :param bool plots: display plots of trained output """ n_inputs = 32 sig_len = 15 class Plant(hf.nl.Plant): # this plant implements a simple dynamic system, with two-dimensional # state representing [position, velocity] def __init__(self, A, B, targets, init_state): super(Plant, self).__init__() self.A = np.asarray(A) self.B = B self.targets = targets self.init_state = init_state self.shape = [n_inputs, sig_len, len(A)] # derivative of output with respect to state (constant, so just # compute it once here) self.d_output = np.resize(np.eye(self.shape[-1]), (n_inputs, self.shape[-1], self.shape[-1], 1)) self.reset() def activation(self, x): self.act_count += 1 # this implements a basic s_{t+1} = A*s_t + B*x dynamic system. # but to make things a little more complicated we allow the B # matrix to be dynamic, so it's actually # s_{t+1} = A*s_t + B(s_t)*x self.B_matrix, self.d_B_matrix = self.B(self.state) self.state = (np.dot(self.state, self.A) + np.einsum("ij,ijk->ik", x, self.B_matrix)) return self.state[:x.shape[0]] def d_activation(self, x, _): self.d_act_count += 1 assert self.act_count == self.d_act_count # derivative of state with respect to input d_input = self.B_matrix.transpose((0, 2, 1))[..., None] # derivative of state with respect to previous state d_state = np.resize(self.A.T, np.concatenate(([x.shape[0]], self.A.shape))) d_state[:, 1, 0] += x[:, 1] * self.d_B_matrix[:, 1, 1] d_state = d_state[..., None] return np.concatenate((d_input, d_state, self.d_output), axis=-1) def __call__(self, _): self.inputs = np.concatenate((self.inputs, self.state[:, None, :]), axis=1) return self.state def get_vecs(self): return self.inputs, self.targets def reset(self, init=None): self.act_count = 0 self.d_act_count = 0 self.state = (self.init_state.copy() if init is None else init.copy()) self.inputs = np.zeros((self.shape[0], 0, self.shape[-1]), dtype=np.float32) self.B_matrix = self.d_B_matrix = None # static A matrix (converts velocity into a change in position) A = [[1, 0], [0.2, 1]] # dynamic B(s) matrix (converts input into velocity, modulated by current # state) # note that this dynamic B matrix doesn't really make much sense, it's # just here to demonstrate what happens with a plant whose dynamics # change over time def B(state): B = np.zeros((state.shape[0], state.shape[1], state.shape[1])) B[:, 1, 1] = np.tanh(state[:, 0]) d_B = np.zeros((state.shape[0], state.shape[1], state.shape[1])) d_B[:, 1, 1] = 1 - np.tanh(state[:, 0]) ** 2 return B, d_B # random initial position and velocity init_state = np.random.uniform(-0.5, 0.5, size=(n_inputs, 2)) # the target will be to end at position 1 with velocity 0 targets = np.ones((n_inputs, sig_len, 2), dtype=np.float32) targets[:, :, 1] = 0 targets[:, :-1, :] = np.nan plant = Plant(A, B, targets, init_state) rnn = hf.RNNet(shape=[2, 16, 2], layers=[hf.nl.Linear(), hf.nl.Tanh(), plant], W_init_params={"coeff": 0.1}, W_rec_params={"coeff": 0.1}, rng=np.random.RandomState(0)) rnn.run_epochs(plant, None, hf.opt.HessianFree(CG_iter=20, init_damping=10), max_epochs=150, plotting=plots) # using gradient descent (for comparison) # rnn.run_epochs(plant, None, optimizer=SGD(l_rate=0.01), # minibatch_size=None, test=test, max_epochs=10000, # plotting=True) if plots: outputs = rnn.forward(plant)[-1] plt.figure() plt.plot(outputs[:, :, 0].squeeze().T) plt.title("position") plt.figure() plt.plot(outputs[:, :, 1].squeeze().T) plt.title("velocity") plt.show()
[docs]def profile(func, max_epochs=1, use_GPU=False, cprofile=True): """Run a profiler on the code. :param str func: the demo function to be profiled (can be 'mnist' or 'integrator') :param int max_epochs: maximum number of iterations to run :param bool use_GPU: run optimization on GPU :param bool cprofile: if True then run the profiling on the CPU, otherwise use CUDA profiler """ if cprofile: p = Profile() p.enable() else: import pycuda pycuda.driver.start_profiler() if func == "mnist": mnist({'use_GPU': use_GPU, 'rng': np.random.RandomState(0)}, {'max_epochs': max_epochs, 'plotting': False, 'minibatch_size': 7500, 'CG_iter': 10}) elif func == "integrator": integrator({'shape': [1, 100, 1], 'layers': hf.nl.Logistic(), 'use_GPU': use_GPU, 'debug': False, 'rng': np.random.RandomState(0)}, {'max_epochs': max_epochs, 'CG_iter': 10}, n_inputs=500, sig_len=200, plots=False) else: raise ValueError("Unknown profile function") if cprofile: p.disable() ps = pstats.Stats(p) ps.strip_dirs().sort_stats('time').print_stats(20) else: pycuda.driver.stop_profiler()
if __name__ == "__main__": if len(sys.argv) < 2: xor() else: if sys.argv[1] in locals(): locals()[sys.argv[1]](*[ast.literal_eval(a) for a in sys.argv[2:]]) else: print("Unknown demo function (%s)" % sys.argv[1])