# -*- coding: utf-8 -*-
import os
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
from jinja2 import Environment, FileSystemLoader
from sklearn.cross_validation import StratifiedKFold, KFold
from sklearn.grid_search import GridSearchCV
from sklearn.learning_curve import learning_curve
from sklearn.svm import SVC, LinearSVC, SVR
from sklearn.metrics import classification_report, f1_score
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression, Ridge, SGDRegressor,\
SGDClassifier
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from algorithm import Algorithm
from data import Data
[docs]class MALSS(object):
[docs] def __init__(self, task, shuffle=True, standardize=True, scoring=None,
cv=5, n_jobs=-1, random_state=0, lang='en', verbose=True):
"""
Initialize parameters.
Parameters
----------
task : string
Specifies the task of the analysis. It must be one of
'classification', 'regression'.
shuffle : boolean, optional (default=True)
Whether to shuffle the data.
standardize : boolean, optional (default=True)
Whether to sdandardize the data.
scoring : string, callable or None, optional, default: None
A string (see scikit-learn's model evaluation documentation) or
a scorer callable object / function with
signature scorer(estimator, X, y).
mean_squared_error (for regression task) or f1 (for classification
task) is used by default.
cv : integer or cross-validation generator.
If an integer is passed, it is the number of folds (default 3).
K-fold cv (for regression task) or Stratified k-fold cv
(for classification task) is used by default.
Specific cross-validation objects can be passed, see
sklearn.cross_validation module for the list of possible objects.
n_jobs : integer, optional (default=1)
The number of jobs to run in parallel. If -1, then the number of
jobs is set to the number of cores - 1.
random_state : int seed, RandomState instance, or None (default=0)
The seed of the pseudo random number generator
lang : string (default='en')
Specifies the language in the report. It must be one of
'en' (English), 'jp' (Japanese).
verbose : boolean, default: True
Enable verbose output.
"""
self.is_ready = False
self.shuffle = shuffle
self.standardize = standardize
self.task = task
self.cv = cv
if n_jobs == -1:
self.n_jobs = np.max([multiprocessing.cpu_count() - 1, 1])
else:
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
if lang != 'en' and lang != 'jp':
raise ValueError('lang:%s is no supported' % lang)
self.lang = lang
self.minimized_score = False
if task == 'classification':
self.scoring = 'f1' if scoring is None else scoring
elif task == 'regression':
self.scoring = 'mean_squared_error' if scoring is None else scoring
if self.scoring == 'mean_squared_error' or\
self.scoring == 'mean_absolute_error':
self.minimized_score = True
else:
raise ValueError('task:%s is not supported' % task)
def __choose_algorithm(self):
algorithms = []
if self.task == 'classification':
if self.data.X.shape[0] * self.data.X.shape[1] <= 1e+06:
if self.data.X.shape[0] ** 2 * self.data.X.shape[1] <= 1e+09:
algorithms.append(
Algorithm(
SVC(random_state=self.random_state),
[{'kernel': ['rbf'],
'C': [1, 10, 100, 1000],
'gamma': [1e-3, 1e-2, 1e-1, 1.0]}],
'Support Vector Machine (RBF Kernel)',
('http://scikit-learn.org/stable/modules/'
'generated/sklearn.svm.SVC.html')))
algorithms.append(
Algorithm(
RandomForestClassifier(
random_state=self.random_state,
n_jobs=self.n_jobs),
[{'n_estimators': [10, 100, 1000],
'max_features': [0.3, 0.6, 0.9],
'max_depth': [3, 7, None]}],
'Random Forest',
('http://scikit-learn.org/stable/modules/'
'generated/'
'sklearn.ensemble.RandomForestClassifier.html')))
algorithms.append(
Algorithm(
LinearSVC(random_state=self.random_state),
[{'C': [0.1, 1, 10, 100]}],
'Support Vector Machine (Linear Kernel)',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.svm.LinearSVC.html')))
algorithms.append(
Algorithm(
LogisticRegression(random_state=self.random_state),
[{'penalty': ['l2', 'l1'],
'C': [0.1, 0.3, 1, 3, 10],
'class_weight': [None, 'auto']}],
'Logistic Regression',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.linear_model.LogisticRegression.html')))
algorithms.append(
Algorithm(
DecisionTreeClassifier(random_state=self.random_state),
[{'max_depth': [3, 5, 7, 9, 11]}],
'Decision Tree',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.tree.DecisionTreeClassifier.html')))
algorithms.append(
Algorithm(
KNeighborsClassifier(),
[{'n_neighbors': [2, 6, 10, 14, 18]}],
'k-Nearest Neighbors',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.neighbors.KNeighborsClassifier.html')))
else:
algorithms.append(
Algorithm(
SGDClassifier(
random_state=self.random_state,
n_jobs=self.n_jobs),
[{'loss': ['hinge', 'log'],
'penalty': ['l2', 'l1'],
'alpha': [1e-05, 3e-05, 1e-04, 3e-04, 1e-03],
'class_weight': [None, 'auto']}],
'SGD Classifier',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.linear_model.SGDClassifier.html')))
if self.task == 'regression':
if self.data.X.shape[0] * self.data.X.shape[1] <= 1e+06:
if self.data.X.shape[0] ** 2 * self.data.X.shape[1] <= 1e+09:
algorithms.append(
Algorithm(
SVR(random_state=self.random_state),
[{'kernel': ['rbf'],
'C': [1, 10, 100, 1000],
'gamma': [1e-3, 1e-2, 1e-1, 1.0]}],
'Support Vector Machine (RBF Kernel)',
('http://scikit-learn.org/stable/modules/'
'generated/sklearn.svm.SVR.html')))
algorithms.append(
Algorithm(
RandomForestRegressor(
random_state=self.random_state,
n_jobs=self.n_jobs),
[{'n_estimators': [10, 100, 1000],
'max_features': [0.3, 0.6, 0.9],
'max_depth': [3, 7, None]}],
'Random Forest',
('http://scikit-learn.org/stable/modules/'
'generated/'
'sklearn.ensemble.RandomForestRegressor.html')))
algorithms.append(
Algorithm(
Ridge(),
[{'alpha':
[0.01, 0.1, 1, 10, 100]}],
'Ridge Regression',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.linear_model.Ridge.html')))
algorithms.append(
Algorithm(
DecisionTreeRegressor(random_state=self.random_state),
[{'max_depth': [3, 5, 7, 9, 11]}],
'Decision Tree',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.tree.DecisionTreeRegressor.html')))
else:
algorithms.append(
Algorithm(
SGDRegressor(
random_state=self.random_state),
[{'penalty': ['l2', 'l1'],
'alpha': [1e-05, 3e-05, 1e-04, 3e-04, 1e-03]}],
'SGD Regressor',
('http://scikit-learn.org/stable/modules/generated/'
'sklearn.linear_model.SGDRegressor.html')))
return algorithms
[docs] def add_algorithm(self, estimator, param_grid, name):
"""
Add arbitrary scikit-learn-compatible algorithm.
Parameters
----------
estimator : object type that implements the “fit” and “predict” methods
A object of that type is instantiated for each grid point.
param_grid : dict or list of dictionaries
Dictionary with parameters names (string) as keys and
lists of parameter settings to try as values, or a list of
such dictionaries, in which case the grids spanned by
each dictionary in the list are explored.
This enables searching over any sequence of parameter settings.
name : string
Algorithm name (used for report)
"""
self.algorithms.append(Algorithm(estimator, param_grid, name))
[docs] def remove_algorithm(self, index=-1):
"""
Remove algorithm
Parameters
----------
index : int (default=-1)
Remove an algorithm from list by index.
By default, last algorithm is removed.
"""
del self.algorithms[index]
[docs] def get_algorithms(self):
"""
Get algorithm names and grid parameters.
Returns
-------
algorithms : list
List of tupples(name, grid_params).
"""
rtn = []
for algorithm in self.algorithms:
rtn.append((algorithm.name, algorithm.parameters))
return rtn
[docs] def fit(self, X, y, dname=None, algorithm_selection_only=False):
"""
Tune parameters and search best algorithm
Parameters
----------
X : {numpy.ndarray, pandas.DataFrame}, shape = [n_samples, n_features]
Training vector, where n_samples in the number of samples and
n_features is the number of features.
y : {numpy.ndarray, pandas.Series}, shape = [n_samples]
Target values (class labels in classification, real numbers in
regression)
dname : string (default=None)
If not None, make a analysis report in this directory.
algorithm_selection_only : boolean, optional (default=False)
If True, only algorithm selection is executed.
This option is needed for (get|add|remove)_algorithm(s) methods.
Returns
-------
self : object
Returns self.
"""
if self.verbose:
print 'Set data.'
self.data = Data(self.shuffle, self.standardize, self.random_state)
self.data.fit_transform(X, y)
if not self.is_ready:
if self.verbose:
print 'Choose algorithm.'
self.algorithms = self.__choose_algorithm()
if self.verbose:
for algorithm in self.algorithms:
print ' %s' % algorithm.name
self.is_ready = True
if algorithm_selection_only:
return self
if isinstance(self.cv, int):
if self.task == 'classification':
self.cv = StratifiedKFold(self.data.y, n_folds=self.cv,
shuffle=self.shuffle,
random_state=self.random_state)
elif self.task == 'regression':
self.cv = KFold(self.data.X.shape[0], n_folds=self.cv,
shuffle=self.shuffle,
random_state=self.random_state)
if self.verbose:
print 'Analyze. (take some time)'
self.__tune_parameters()
if self.task == 'classification':
self.__report_classification_result()
if dname is not None:
if self.verbose:
print 'Make report.'
self.__make_report(dname)
if self.verbose:
print 'Done.'
return self
def predict(self, X):
return self.algorithms[self.best_index].estimator.predict(
self.data.transform(X))
def __search_best_algorithm(self):
self.best_score = float('-Inf')
self.best_index = -1
sign = 1.0
if self.minimized_score:
sign = -1.0
self.best_score = float('Inf')
for i in xrange(len(self.algorithms)):
if sign * self.algorithms[i].best_score > sign * self.best_score:
self.best_score = self.algorithms[i].best_score
self.best_index = i
self.algorithms[self.best_index].is_best_algorithm = True
def __tune_parameters(self):
for i in xrange(len(self.algorithms)):
estimator = self.algorithms[i].estimator
parameters = self.algorithms[i].parameters
sc = f1score if self.scoring == 'f1' else self.scoring
clf = GridSearchCV(
estimator, parameters, cv=self.cv, scoring=sc,
n_jobs=self.n_jobs)
clf.fit(self.data.X, self.data.y)
if self.minimized_score:
clf.best_score_ *= -1.0
for j in xrange(len(clf.grid_scores_)):
clf.grid_scores_[j] = (clf.grid_scores_[j][0],
-1.0 * clf.grid_scores_[j][1],
-1.0 * clf.grid_scores_[j][2])
self.algorithms[i].estimator = clf.best_estimator_
self.algorithms[i].best_score = clf.best_score_
self.algorithms[i].best_params = clf.best_params_
self.algorithms[i].grid_scores = clf.grid_scores_
self.__search_best_algorithm()
def __report_classification_result(self):
for i in xrange(len(self.algorithms)):
est = self.algorithms[i].estimator
self.algorithms[i].classification_report =\
classification_report(self.data.y, est.predict(self.data.X))
def __plot_learning_curve(self, dname=None):
for alg in self.algorithms:
estimator = alg.estimator
sc = f1score if self.scoring == 'f1' else self.scoring
train_sizes, train_scores, test_scores = learning_curve(
estimator,
self.data.X,
self.data.y,
cv=self.cv,
scoring=sc,
n_jobs=self.n_jobs)
if self.minimized_score:
train_scores *= -1.0
test_scores *= -1.0
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
plt.figure()
plt.title(estimator.__class__.__name__)
plt.xlabel("Training examples")
plt.ylabel("Score")
plt.grid()
plt.fill_between(train_sizes, train_scores_mean - train_scores_std,
train_scores_mean + train_scores_std, alpha=0.1,
color="r")
plt.fill_between(train_sizes, test_scores_mean - test_scores_std,
test_scores_mean + test_scores_std,
alpha=0.1, color="g")
plt.plot(train_sizes, train_scores_mean, 'o-', color="r",
label="Training score")
plt.plot(train_sizes, test_scores_mean, 'o-', color="g",
label="Cross-validation score")
if self.minimized_score:
plt.legend(loc='upper right')
else:
plt.legend(loc="lower right")
if dname is not None and not os.path.exists(dname):
os.mkdir(dname)
if dname is not None:
plt.savefig('%s/learning_curve_%s.png' %
(dname, estimator.__class__.__name__),
bbox_inches='tight', dpi=75)
else:
plt.savefig('learning_curve_%s.png' %
estimator.__class__.__name__,
bbox_inches='tight', dpi=75)
plt.close()
def __make_report(self, dname='report'):
if not os.path.exists(dname):
os.mkdir(dname)
self.__plot_learning_curve(dname)
env = Environment(
loader=FileSystemLoader(
os.path.abspath(
os.path.dirname(__file__)) + '/template', encoding='utf8'))
if self.lang == 'jp':
tmpl = env.get_template('report_jp.html.tmp')
else:
tmpl = env.get_template('report.html.tmp')
scoring_name = self.scoring if isinstance(self.scoring, str) else\
self.scoring.func_name
html = tmpl.render(algorithms=self.algorithms,
scoring=scoring_name,
task=self.task,
data=self.data).encode('utf-8')
fo = open(dname + '/report.html', 'w')
fo.write(html)
fo.close()
[docs] def generate_module_sample(self, fname='module_sample.py'):
"""
Generate a module sample to be able to add in the model
in your system for prediction.
Parameters
----------
fname : string (default="module_sample.py")
A string containing a path to a output file.
"""
env = Environment(
loader=FileSystemLoader(
os.path.abspath(
os.path.dirname(__file__)) + '/template', encoding='utf8'))
tmpl = env.get_template('sample_code.py.tmp')
encoded = True if len(self.data.del_columns) > 0 else False
html = tmpl.render(algorithm=self.algorithms[self.best_index],
encoded=encoded,
standardize=self.standardize).encode('utf-8')
fo = open(fname, 'w')
fo.write(html)
fo.close()
def f1score(estimator, X, y):
return f1_score(y, estimator.predict(X), average=None).mean()
if __name__ == "__main__":
pass