# -*- coding: utf-8 -*-
"""
.. moduleauthor:: David Toro <davsamirtor@gmail.com>
:platform: Unix, Windows
:synopsis: Serialize and Memoize.
Contains memoizing, caching, serializing and memory-mapping methods so as to let the package
save its state (persistence) and to let a method "remember" what it processed in a session (with cache) or
between sessions (memoization and serializization) of the same input contend once processed. It also wraps mmapping
functions to let objects "live" in the disk (slower but almost unlimited) rather than in memory (faster but limited).
*@cache* is used as replacement of *@property* to compute a class method once.
It is computed only one time after which an attribute of the same name is generated in its place.
*@cachedProperty* is used as replacement of *@property* to compute
a class method depending on changes in its watched variables.
*@memoize* used as a general memoizer decorator for functions
where metadata is generated to disk for persistence.
Made by Davtoh, powered by joblib.
Dependent project: https://github.com/joblib/joblib
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import zip
from past.utils import old_div
from builtins import object
from .root import NotCallable, NotCreatable, VariableNotSettable, VariableNotDeletable, \
CorruptPersistent
__license__ = """
joblib is BSD-licenced (3 clause):
This software is OSI Certified Open Source Software. OSI Certified is a
certification mark of the Open Source Initiative.
Copyright (c) 2009-2011, joblib developpers All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
3. Neither the name of Gael Varoquaux. nor the names of other joblib contributors may be used
to endorse or promote products derived from this software without specific prior written permission.
This software is provided by the copyright holders and contributors "as is" and any
express or implied warranties, including, but not limited to, the implied warranties
of merchantability and fitness for a particular purpose are disclaimed.
In no event shall the copyright owner or contributors be liable for any direct, indirect,
incidental, special, exemplary, or consequential damages (including, but not limited to,
procurement of substitute goods or services; loss of use, data, or profits; or business
interruption) however caused and on any theory of liability, whether in contract, strict
liability, or tort (including negligence or otherwise) arising in any way out of the use
of this software, even if advised of the possibility of such damage.
"""
## READ: https://wiki.python.org/moin/PythonDecoratorLibrary
import joblib
from functools import wraps
from weakref import ref
from collections import MutableMapping
from time import time
from numpy.lib import load as numpyLoad, save as numpySave
import os
#print "using joblib version", joblib.__version__
[docs]class NotMemorizedFunc(joblib.memory.NotMemorizedFunc):
pass
[docs]class MemorizedFunc(joblib.memory.MemorizedFunc):
pass
[docs]class DynamicMemoizedFunc(object):
def __init__(self, func, cachedir = None, ignore=None, mmap_mode=None,
compress=False, verbose=1, timestamp=None, banned = False):
self._func = func # the only one that should not be able to change
self._mmap_mode = mmap_mode
self._ignore = ignore
self._verbose = verbose
self._cachedir = cachedir
self._compress = compress
self._timestamp = timestamp
self._enabled = banned
self._use = None
self._build()
def _build(self):
if self._cachedir is None or not self._enabled:
self._use = NotMemorizedFunc(self._func)
else:
self._use = MemorizedFunc(func=self._func,cachedir= self._cachedir, ignore=self._ignore,
mmap_mode=self._mmap_mode,compress=self._compress,
verbose=self._verbose, timestamp=self._timestamp)
self.__doc__ = self._use.__doc__
@property
def func(self):
return self._func
@func.setter
def func(self,value):
if value != self._func:
if isinstance(value, (MemorizedFunc,NotMemorizedFunc,DynamicMemoizedFunc)):
value = value.func
self._func = value
self._build()
@func.deleter
def func(self):
raise Exception("property cannot be deleted")
@property
def mmap_mode(self):
return self._mmap_mode
@mmap_mode.setter
def mmap_mode(self,value):
if value != self._mmap_mode:
self._mmap_mode = value
self._build()
@mmap_mode.deleter
def mmap_mode(self):
raise Exception("property cannot be deleted")
@property
def ignore(self):
return self._ignore
@ignore.setter
def ignore(self,value):
if value != self._ignore:
self._ignore = value
self._build()
@ignore.deleter
def ignore(self):
raise Exception("property cannot be deleted")
@property
def verbose(self):
return self._verbose
@verbose.setter
def verbose(self,value):
if value != self._verbose:
self._verbose = value
self._build()
@verbose.deleter
def verbose(self):
raise Exception("property cannot be deleted")
@property
def cachedir(self):
return self._cachedir
@cachedir.setter
def cachedir(self,value):
if self._cachedir != value:
self._cachedir = value
self._build()
@cachedir.deleter
def cachedir(self):
raise Exception("property cannot be deleted")
@property
def compress(self):
return self._compress
@compress.setter
def compress(self,value):
if self._compress != value:
self._compress = value
self._build()
@compress.deleter
def compress(self):
raise Exception("property cannot be deleted")
@property
def enabled(self):
return self._enabled
@enabled.setter
def enabled(self, value):
if self._enabled != value:
self._enabled = value
self._build()
@enabled.deleter
def enabled(self):
raise Exception("property cannot be deleted")
def __call__(self, *args, **kwargs):
return self._use(*args, **kwargs)
[docs] def call_and_shelve(self, *args, **kwargs):
return self._use.call_and_shelve(*args, **kwargs)
def __reduce__(self):
return self._use.__reduce__()
def __repr__(self):
return self._use.__repr__()
[docs] def clear(self, warn=True):
return self._use.clear(warn=warn)
[docs]class Memory(joblib.Memory):
"""
A wrapper to joblib Memory to have better control.
"""
def __init__(self, cachedir, mmap_mode=None, compress=False, verbose=1):
super(Memory,self).__init__(None, mmap_mode, compress, verbose)
if cachedir is None:
self.cachedir = None
else:
self.cachedir = cachedir
joblib.memory.mkdirp(self.cachedir)
'''
def cache(self, func=None, ignore=None, verbose=None,
mmap_mode=False):
""" Decorates the given function func to only compute its return
value for input arguments not cached on disk.
Parameters
----------
func: callable, optional
The function to be decorated
ignore: list of strings
A list of arguments name to ignore in the hashing
verbose: integer, optional
The verbosity mode of the function. By default that
of the memory object is used.
mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, optional
The memmapping mode used when loading from cache
numpy arrays. See numpy.load for the meaning of the
arguments. By default that of the memory object is used.
Returns
-------
decorated_func: MemorizedFunc object
The returned object is a MemorizedFunc object, that is
callable (behaves like a function), but offers extra
methods for cache lookup and management. See the
documentation for :class:`joblib.memory.MemorizedFunc`.
"""
if func is None:
# Partial application, to be able to specify extra keyword
# arguments in decorators
return partial(self.cache, ignore=ignore,
verbose=verbose, mmap_mode=mmap_mode)
if verbose is None:
verbose = self._verbose
if mmap_mode is False:
mmap_mode = self.mmap_mode
if isinstance(func, (MemorizedFunc,NotMemorizedFunc,DynamicMemoizedFunc)):
func = func.func
return DynamicMemoizedFunc(func, cachedir=self.cachedir,
mmap_mode=mmap_mode,
ignore=ignore,
compress=self.compress,
verbose=verbose,
timestamp=self.timestamp)'''
__call__ = joblib.Memory.cache
[docs]class Memoizer(object):
memoizers = {}
def __init__(self, ignore=(), ignoreAll=False):
self._ignore = None
self.ignoreAll = ignoreAll
self.ignore = ignore
self.memoized = {} # FOR CLEANING UP handle # flag = True is safe to remove, flag = False is unsafe to remove
Memoizer.memoizers[id(self)] = ref(self)
@property
def ignore(self):
return self._ignore
@ignore.setter
def ignore(self,value):
temp = set()
for f in value:
if not callable(f):
raise Exception("{} must be callable".format(f))
temp.add(id(f))
self._ignore = temp
@ignore.deleter
def ignore(self):
self._ignore = set()
[docs] def makememory(self,cachedir = None, mmap_mode=None, compress=False, verbose=0):
"""
Make memory for :func:`memoize` decorator.
:param cachedir: path to save metadata, if left None function is not cached.
:param mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, optional.
The memmapping mode used when loading from cache
numpy arrays. See numpy.load for the meaning of the
arguments.
:param compress: (boolean or integer)
Whether to zip the stored data on disk. If an integer is
given, it should be between 1 and 9, and sets the amount
of compression. Note that compressed arrays cannot be
read by memmapping.
:param verbose: (int, optional)
Verbosity flag, controls the debug messages that are issued
as functions are evaluated.
:return:
"""
"""
if not cachedir:
cachedir = tempfile.mkdtemp() # tempfile.mkdtemp(dir=cachedir)"""
MEMORY = Memory(cachedir, mmap_mode, compress, verbose)
return MEMORY
[docs] def memoize(self, memory=None, ignore=None, verbose=0, mmap_mode=False):
"""
Decorated functions are faster by trading memory for time, only hashable values can be memoized.
:param memory: (Memory or path to folder) if left None function is not cached.
:param ignore: (list of strings) A list of arguments name to ignore in the hashing.
:param verbose: (integer) Verbosity flag, controls the debug messages that are issued as functions are evaluated.
:param mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, optional. The memmapping mode used when loading from cache
numpy arrays. See numpy.load for the meaning of the arguments.
:return: decorator
"""
def decorator(fn):
if isinstance(memory,joblib.Memory): # use provided memory
memoizedfn = memory.cache(fn,ignore,verbose, mmap_mode)
else:
#memoizedfn = DynamicMemoizedFunc(func=fn,cachedir=str(memory),
# ignore=ignore,verbose=verbose, mmap_mode=mmap_mode)
memoizedfn = self.makememory(cachedir=str(memory)).cache(func=fn,
ignore=ignore,verbose=verbose, mmap_mode=mmap_mode)
@wraps(fn)
def wrapper(*args,**kwargs):
# TODO solve this, how to test quickly to ignore memoization
#perhaps it is better to use dynamicMemoizedFuciton to prevent comparitions when ignoring a memoization
if not self.ignoreAll and id(fn) not in self._ignore and id(wrapper) not in self._ignore:
return memoizedfn(*args,**kwargs)
else:
return fn(*args,**kwargs)
self.memoized[id(fn)] = ref(memoizedfn) # safe to remove
return wrapper
return decorator
__call__ = memoize
memoize = Memoizer() # make memoizer manager
[docs]class Cache(object):
"""
Descriptor (non-data) for building an attribute on-demand at first use.
@cache decorator is used for class methods without inputs (only self reference to the object)
and it caches on first compute. ex::
class x(object):
@cache
def method_x(self):
return self.data
.. note:: Cached data can be deleted in the decorated object to recalculate its value.
"""
def __init__(self, func):
"""
Initialize cache with a property function.
"""
self.func = func # function handle
# if method simulating getattr
def __get__(self, instance, owner): # if trying to get attribute
if instance is not None: # Class not instantiated if instance is None
# Only for instances of Class
# Build the attribute.
cached = self.func(instance) #evaluate function over instance
# Cache the value;
# Creates variable (name) of value (cached) in (instance).
# instance.name = cached
setattr(instance, self.func.__name__, cached)
return cached
else: # return method signature
return self.func
[docs]def cachedProperty(watch=[],handle=[]):
"""
A memoize decorator of @property decorator specifying what to trigger caching.
:param watch: (list of strings) A list of arguments name to watch in the hashing.
:param handle: (list of handles or empty list) Provided list is appended with the Memo
handle were data is stored for the method and where a clear() function is provided.
:return:
"""
#http://code.activestate.com/recipes/576563-cached-property/
class Memo(object):
""" Memo function for cache """
def __init__(self):
self._cache ={}
self._input_cache = {}
def clear(self):
self._cache ={}
self._input_cache = {}
this = Memo()
handle.append(this)
def noargs(f): # if not watch use this funciton
@wraps(f)
def get(self):
try:
return this._cache[f]
except AttributeError:
this._cache = {}
except KeyError:
pass
ret = this._cache[f] = f(self)
return ret
return property(get)
def withargs(f): # if watch use this function
@wraps(f)
def get(self):
input_values = dict((key,getattr(self,key)) for key in watch )
try:
x = this._cache[f]
if input_values == this._input_cache[f]:
return x
except AttributeError:
this._cache ={}
this._input_cache = {}
except KeyError:
pass
x = this._cache[f] = f(self)
this._input_cache[f] = input_values
return x
return property(get)
if type(watch) is not list: # if not arguments
return noargs(watch)
elif watch==[]:
return noargs
return withargs
[docs]class ObjectGetter(object):
"""
Creates or get instance object depending if it is alive.
"""
def __init__(self, callfunc = None, obj=None, callback=None, **annotations):
"""
:param callfunc: function to create object
:param obj: (optional) alive object already obtained from callfunc
:param callback: function called on object destruction
:param annotations: annotations to self (this object can be used to save info or statistics)
Example::
class constructor: pass
myobj = constructor() # created hard reference
getobj = objectGetter(myobj, callfunc=constructor) # created getter
assert myobj is getobj() # it uses the same reference as myobj
del myobj # myobj reference lost
a = getobj() # created (+1) other object from constructor
b = getobj() # it uses the same hard reference a
myobj = constructor()
getobj = objectGetter(None, callfunc=constructor)
assert myobj is not getobj() # created (+1) other object from constructor
a = getobj() # this created (+1) again because there was not reference to object
b = getobj() # it uses the same hard reference a
"""
super(ObjectGetter, self).__init__()
self._ref = None
self._callfunc = None
self._callback = None
self.update(obj=obj, callback=callback, callfunc = callfunc, **annotations)
[docs] def update(self, **kwargs):
callfunc = kwargs.pop("callfunc",None)
if callfunc:
if not callable(callfunc):
raise NotCallable("callfunc {} is not callable".format(callfunc))
self._callfunc = callfunc
obj = kwargs.pop("obj", None)
callback = kwargs.pop("callback", None)
if obj is not None or callback is not None:
if callback is not None: self._callback = callback
self._ref = ref(obj, self._callback)
for k, v in kwargs.items():
setattr(self, k, v)
[docs] def getObj(self, throw = False):
ob = self.raw()
if ob is None: ob = self.create(throw)
return ob
__call__ = getObj
[docs] def create(self, throw = False):
"""
Creates an object and keep reference.
:param throw: if there is not creation function throws error.
:return: created object.
.. warning:: previous object reference is lost even if it was alive.
.. note:: Recommended only to use when object from current reference is dead.
"""
if self.isCreatable(): # to create
ob = self._callfunc()
self._ref = ref(ob, self._callback)
return ob
elif throw:
raise NotCreatable("No callfunc to create object")
[docs] def raw(self):
"""
get object from reference.
:return: None if object is dead, object itself if is alive.
"""
if self._ref: # if in references
ob = self._ref()
return ob
[docs] def isAlive(self):
"""
test if object of reference is alive
"""
return self.raw() is not None
[docs] def isCreatable(self):
"""
test if can create object
"""
return self._callfunc
[docs] def isGettable(self):
"""
test if object can be gotten either by reference or creation.
"""
return self.isCreatable() or self.isAlive()
[docs]class Retriever(MutableMapping):
"""
keep track of references and create objects on demand if needed.
"""
def __init__(self):
self.references = {}
self._lastKey = None
[docs] def register(self, key, method = None, instance = None):
"""
Register object to retrieve.
:param key: hashable key to retrieve
:param method: callable method to get object
:param instance: object instance already created from method
:return:
Example::
def mymethod():
class constructor: pass
return constructor()
ret = retriever()
ret["obj"] = mymethod # register creating method in "obj"
im = ret["obj"] # get object (created obj +1, with reference)
assert im is ret["obj"] # check that it gets the same object
# it remembers that "obj" is last registered or fetched object too
assert ret() is ret()
# lets register with better control (created obj2 +1, no reference)
ret.register("obj2",mymethod(),mymethod)
# proves that obj2 is not the same as obj (created obj2 +1, no reference)
assert ret() is not ret["obj"]
print list(ret.iteritems()) # get items
"""
self.references[key] = ObjectGetter(obj=instance, callfunc=method)
self._lastKey = key # register last key
def __call__(self):
return self[self._lastKey]
def __getitem__(self, key):
data = self.references[key]()
self._lastKey = key
return data
def __setitem__(self, key, method):
self.register(key, method=method)
def __delitem__(self, key):
del self.references[key]
def __iter__(self):
return iter(self.references)
def __len__(self):
return len(self.references)
[docs]class LazyDict(MutableMapping):
"""
Create objects on demand if needed. call the instance with keys to prevent it
from using lazy evaluations (e.g. instead of self[key] use self(key) to prevent
recursion). Containing operations are safe to prevent recursion (e.g. if key in self
instead of self[key]). In addition use self.isLazy flag to enable or disable lazy
operations to prevent possible recursions when getter is called.
"""
def __init__(self, getter, dictionary=None):
if not callable(getter):
raise NotCallable("getter must be a callable function to process keys")
if dictionary is None:
dictionary = {}
self.getter = getter
self.dictionary = dictionary
self._lastKey = None
self.isLazy = True
self.cached = True
def __call__(self, key = None):
if key is None:
return self[self._lastKey]
else:
return self.dictionary[key]
def __getitem__(self, key):
try:
if self.cached:
# try to use cached data
data = self.dictionary[key]
else: # always compute data
data = self.getter(key)
self.dictionary[key] = data
except Exception as e:
if self.isLazy or not self.cached:
# for lazy or to recompute
data = self.getter(key)
self.dictionary[key] = data
else:
raise e
self._lastKey = key
return data
def __setitem__(self, key, val):
self.dictionary[key] = val
def __delitem__(self, key):
del self.dictionary[key]
def __iter__(self):
return iter(self.dictionary)
def __len__(self):
return len(self.dictionary)
def __contains__(self, key):
try:
#self[key] # It can create a recursion
self.dictionary[key]
except KeyError:
return False
else:
return True
[docs]class ResourceManager(Retriever):
"""
keep track of references, create objects on demand, manage their memory and optimize for better performance.
:param maxMemory: (None) max memory in specified unit to keep in check optimization (it does
not mean that memory never surpasses maxMemory).
:param margin: (0.8) margin from maxMemory to trigger optimization.
It is in percentage of maxMemory ranging from 0 (0%) to maximum 1 (100%).
So optimal memory is inside range: maxMemory*margin < Memory < maxMemory
:param unit: (MB) maxMemory unit, it can be GB (Gigabytes), MB (Megabytes), B (bytes)
:param all: if True used memory is from all alive references,
if False used memory is only from keptAlive references.
"""
def __init__(self, maxMemory = None, margin = 0.8, unit = "MB", all = True):
super(ResourceManager, self).__init__()
#self.references is a dictionary containing all the references
#self._lastkey is effectively the last key to use when Manager is called
self._keptMemory = 0 # used memory in bytes of references
self._refMemory = 0 # used memory in bytes of keptAlive
self._maxMemory = None # maximum memory in bytes
self._unit = None # unit of memory
self._conv = 1 # convert to any unit: equivalence with bytes from unit
self._margin = None # private data of margin
self._limit = None # private data representing _maxMemory*_margin in bytes
self._all = all
self.blacklist = set() # key of objects likely to destroy
self.whitelist = set() # key of objects likely to keep in memory and only delete in extreme cases
self.keptAlive = {} # objects currently being kept alive
self.verbosity = True # if true print debugging messages
self.invert = False # invert any order made by user
self.methods = {} # mapping methods for user defined fields
self.method = None # ("size","_call","_fail","_mean")
self.unit = unit # set units
self.margin = margin # margin of percentage (0 to 1) of memory
if maxMemory is not None: self.maxMemory = maxMemory # set maximum memory
def __getitem__(self, key):
"""
gets object from key and collect statistical data
"""
t1 = time()
getter = self.references[key]
getter._iddleT = (t1 - getter._iddleT) # iddle time
wasAlive = getter.isAlive()
wasAtFail = getter._fails>0
try:
obj = getter(throw = True)
if wasAtFail: self.resetGetter(getter)
except NotCreatable:
obj = None
getter._fails += 1 # keep accumulating fails
getter._calls += 1 # actual calls
# time it and increase _calls
if getter._calls: # get successive times
getter._processT = old_div((time()-t1 + getter._processT),2) # process time
else: # if first call then get first profile time
getter._processT = time()-t1
if key not in self.keptAlive:
toWhiteList = getter._fails==0 and key not in self.whitelist and getter._processT > 3
if not wasAlive: # it was not alive but now it was created
self.optimizeObject(key,getter,toWhiteList=toWhiteList)
self._lastKey = key # update key once finished
return obj
def __setitem__(self, key, value):
self.register(key, value)
def __delitem__(self, key):
self._free(key) # tries to _free if kept alive
del self.references[key] # delete entry
if key in self.blacklist: # clear form black list
self.blacklist.remove(key)
if key in self.whitelist: # clear form white list
self.whitelist.remove(key)
[docs] def getSizeOf(self, item):
return item.__sizeof__()#getsizeof(item,0)
[docs] def optimizeObject(self, key, getter, toWhiteList = False):
if getter.isAlive():
obj = getter.raw()
if obj is None:
raise Exception("given a dead reference")
flag = self.keepAlive(key,obj)
if flag and toWhiteList:
self.whitelist.add(key)
return flag
[docs] def keepAlive(self, key, obj):
if key in self.keptAlive:
#self._free(key)
raise Exception("Already ketp alive")
s = self.getSizeOf(obj)# needed memory to allocate
flag = self._optimizeMemory(needed=s)
if flag is not None and flag<=0: # manage memory to allocate new object
self.keptAlive[key] = obj
self._keptMemory += s # update kept memory
return True
def _free(self, key):
"""
_free memory from keptAlive
:param key:
:return: Liberated memory
"""
s = self.getSizeOf(self.keptAlive[key])
del self.keptAlive[key]
self._keptMemory -= s # update kept memory
return s
[docs] def bytes2units(self,value):
"""
converts value from bytes to user units
"""
return old_div(value,self._conv)
[docs] def units2bytes(self,value):
"""
converts value from user units two bytes
"""
return value*self._conv
@property
def usedMemory(self):
"""
:return: used memory in user units
"""
return self.bytes2units(self._keptMemory + self._refMemory)
@usedMemory.setter
def usedMemory(self, value):
raise Exception("variable not settable")
@property
def maxMemory(self):
if self._maxMemory is not None:
return self.bytes2units(self._maxMemory)
@maxMemory.setter
def maxMemory(self, value):
if value is None:
print("WARNING: maximum memory configured to be unlimited")
self._limit = None
self._maxMemory = None
else:
print("WARNING: maximum memory is {} {}".format(value,self.unit))
self._maxMemory = self.units2bytes(value) # pass in bytes
self._limit = self._maxMemory * self.margin # re calculate limit
self._optimizeMemory()
@property
def margin(self):
"""
:return: margin used for triggering memory optimization from maxMemory.
"""
return self._margin
@margin.setter
def margin(self, value):
if value<0 or value>1: raise Exception("Margin must be between 0 and 1")
if value is None:
self._limit = self._maxMemory # set limit to maxMemory
else:
if self._maxMemory is None:
self._limit = None
else:
self._limit = self._maxMemory * value
self._margin = value
self._optimizeMemory()
@property
def all(self):
"""
:return: all flag, if True: used memory is from all alive references,
if False: used memory is only from keptAlive references.
"""
return self._all
@all.setter
def all(self, flag):
if self._all != flag:
self._all = flag
# TODO recaculate usedMemory every time all changes
@property
def unit(self):
"""
:return: user defined units
"""
return self._unit
@unit.setter
def unit(self, unit):
if unit.lower() in ("b","bytes","byte"):
self._conv = 1
elif unit.lower() in ("m","mb","megabytes","megas","mega"):
self._conv = 2**20
elif unit.lower() in ("g","gb","gigas","gigabytes","giga","gigabyte"):
self._conv = 1000*2**20
else:
raise Exception("unit '{}' not supported".format(unit))
self._unit = unit
@staticmethod
[docs] def resetGetter(getter):
"""
Helper function to reset getter parameters.
:param getter: any instance of objectGetter
"""
getter._fails = 0 # init fail count
getter._calls = 0 # init call count
getter._processT = 0 # init mean of retrieving time
getter._iddleT = time()
[docs] def register(self, key, method = None, instance = None):
"""
Register object to retrieve.
:param key: hashable key to retrieve
:param method: callable method to get object
:param instance: object instance already created from method
.. note:: This method is used in __setitem__ as self.register(key, value). Overwrite this
method to change key assignation behaviour.
Example::
def mymethod():
class constructor: pass
return constructor()
ret = retriever()
ret["obj"] = mymethod # register creating method in "obj"
im = ret["obj"] # get object (created obj +1, with reference)
assert im is ret["obj"] # check that it gets the same object
# it remembers that "obj" is last registered or fetched object too
assert ret() is ret()
# lets register with better control (created obj2 +1, no reference)
ret.register("obj2",mymethod(),mymethod)
# proves that obj2 is not the same as obj (created obj2 +1, no reference)
assert ret() is not ret["obj"]
print list(ret.iteritems()) # get items
"""
if key in self.references:
getter = self.references[key]
getter.update(obj = instance, callfunc=method)
else:
getter = ObjectGetter(obj = instance, callfunc=method)
self.resetGetter(getter)
self.references[key] = getter
if instance: self.optimizeObject(key,getter,toWhiteList= not getter.isCreatable())
self._lastKey = key # register last key
def _checkMemory(self, needed = 0):
"""
check if memory needs to be optimized.
:param asValue: True to return positive (how much memory needed to _free)
or negative (how much memory until limit reached) value.
False to return Value (how much memory needed to _free) or None.
:return Memory value or None if no capacity to allocate.
"""
limit = self._limit
if limit is not None:
if needed> self._maxMemory:
return # indicates not capacity
if needed> limit: limit = self._maxMemory
if self._all:
val = self._keptMemory + self._refMemory + needed
else:
val = self._keptMemory + needed
return val - limit
else:
return 0 # return 0 bytes to free
def _getOrderedData(self, method = None, all = None):
"""
Construct list of alive objects.
:param method: method to use from self.methods, if None use self.method.
:param all: if True: all alive objects in the references
(of course if they are kept alive by self.keptAlive then they are alive in references).
else False: only objects kept alive by self.keptAlive.
:param self.method: specifies the method to calculate and
sort according to the third column in list
:return: list with items (key,size,calculated)
"""
method = method or self.method
if all is None: all = self._all # let user data pass or choose default
usemethod = method is not None and method != "size" and self.methods[method]
data = [] # frame: key, size, calls, fails
if all:
for key,getter in self.references.items():
if getter.isAlive():
val = getter.raw()
size = self.getSizeOf(val)
if usemethod:
data.append((key,size,usemethod(val)))
else:
data.append((key,size))
else:
for key,val in self.keptAlive.items():
size = self.getSizeOf(val)
if size:
if usemethod:
data.append((key,size,usemethod(val)))
else:
data.append((key,size))
if method == "size": # sort just by size
data.sort(key=lambda x:x[1],reverse=self.invert)
elif usemethod: # sort by user defined val
data.sort(key=lambda x:x[2],reverse=self.invert)
return data
def _optimizeMemory(self, needed = 0):
"""
:param ret: dictionary or retriever
:param _limit: _limit of memory
:param margin:
:param unit:
:return:
"""
# TODO liberate needed memory
# ideal: total = media*len(sizes) < limit*margin
# if limit< total > limit*percent then eliminate
# methods: None, by ascendant, by descendant, by creations
tofree = self._checkMemory(needed=needed)
if tofree is None: return # nothing to do, not capacity to allocate
if tofree<=0:
return tofree # successful
c = self.bytes2units
unit = self.unit
if self.verbosity:
print("{1} {0} used of {2} {0}.".format(unit,self.usedMemory,self.maxMemory))
print("{1} {0} needs to be freed to allocate {2} {0}.".format(unit,c(tofree),c(needed)))
# FIRST STAGE
blacklist = list(self.blacklist)
# first eliminate in black list
freed = 0
while len(blacklist) and freed>=tofree:
key = blacklist.pop()
if key in self.keptAlive:
size = self._free(key)
freed += size
if self.verbosity: print("Eliminated '{}' of size {} {}".format(key,c(size),unit))
self.blacklist.pop(key) # liminate in real black list
tofree -= freed
if tofree<=0:
if self.verbosity: print("{0} {1} where freed".format(c(freed),unit))
return tofree # successful
# SECOND STAGE: if it did not work keep freeing
data = self._getOrderedData(all=False) # get only in keptAlive
if data:
keys,sizes = zip(*data)[:2] # just needed keys and sizes
total = sum(sizes) # total used memory in bytes
if abs(total - self._keptMemory) > 10: # difference of 10 bytes
self._keptMemory = total
tofree = self._checkMemory(needed=needed)
print("WARNING: data was not well collected")
if tofree>0:
#ratios = np.array(sizes)/total
freed = 0
for key,size in zip(keys,sizes): # free only normal ones
if freed>=tofree:
break
if key not in self.whitelist and key in self.keptAlive:
size2 = self._free(key)
freed += size2
if size2 != size:
print("DEBUG: key {0} had size {1} but was freed {2}".format(key,size,size2))
if self.verbosity: print("Eliminated '{}' of size {} {}".format(key,c(size),unit))
tofree-= freed
if tofree>0:
if self.verbosity: print("WARNING: {} {} not adequately freed".format(c(tofree),unit))
if self.verbosity: print("{0} {2} where freed, remaining {1} {2}".format(c(freed),c(total-freed),unit))
else:
if self.verbosity: print("{} {} is an optimal memory. Not optimized.".format(c(total),unit))
else:
if self.verbosity: print("{} {} is considered low memory. Not optimized.".format(self.usedMemory, unit))
return tofree # this means: > 0 bytes not able to free; < 0 bytes over freed; == 0 successful
[docs]def mapper(path, obj = None, mode =None, onlynumpy = False):
"""
Save and load or map live objects to disk to free RAM memory.
:param path: path to save mapped file.
:param obj: the object to map, if None it tries to load obj from path if exist
:param mode: {None, 'r+', 'r', 'w+', 'c'}.
:param onlynumpy: if True, it saves a numpy mapper from obj.
:return: mmap image, names of mmap files
"""
names = None
if onlynumpy:
if not path.endswith(".npy"):
path += ".npy" # correct path
if obj is not None:
numpySave(path,obj) # save numpy array
names = [path] # simulates answer as onlynumpy = False
return numpyLoad(path,mode),names
else:
if obj is not None:
names = joblib.dump(obj, path) # dump object to file for mapping
return joblib.load(path, mmap_mode=mode), names
[docs]class MemoizedDict(MutableMapping):
"""
Memoized dictionary with keys and values persisted to files.
:param path: path to save memo file
:param mode: loading mode from memo file {None, 'r+', 'r', 'w+', 'c'}
.. note::
If saveAtKey is True it will attempt to memoize each time a keyword is added
and throw an error if not successful. But if saveAtKey is False this process
will be carried out when the MemoizedDict instance is being destroyed in
a proper deletion, that is, if the program ends unexpectedly all data will be
lost or if data cannot be saved it will be lost without warning.
.. warning::
Some data structures cannot be memoize, so this structure is not save yet.
Use at your own risk.
"""
def __init__(self, path, mode = None):
#from directory import checkFile, checkDir, mkPath, rmFile
from .directory import mkPath
import pickle
# TODO: change serializer to use json (it seems it is more reliable and compatible)
# TODO: It is slow load key per key, consider making a way to load al de dictionary keys quickly
# TODO: when clear is called, delete all memoized folder instead each key. it could be dangerous but faster
self._map_serializer = pickle # serializer for keys
self._path = mkPath(path) # path to memoized keys and values
self._map_file = os.path.join(self._path, "metadata") # file containing the keys
#self._map = self._load_map() or {} # keeps the map to persistent files
self._map_old = None
self._mode = mode # mode to read the values
self._loader = joblib.load # loader for values
self._saver = joblib.dump # saver for values
self._hasher = hash # function to hash the keys
self._secure = False # this is an option to use dangerous and secure routines
@property
def _map(self):
self._map_old = self._load_map()
if self._map_old is None:
self._map_old = {}
return self._map_old
@_map.setter
def _map(self,value):
raise VariableNotSettable("_map cannot be set")
@_map.deleter
def _map(self):
raise VariableNotDeletable("_map cannot be deleted")
def _getHash(self, key):
"""
Hasher function to use in persistent files.
:param key: key to hash
"""
return '{}'.format(self._hasher(key))
def _load(self, key):
"""
Loads key from persistent file using map.
:param key: hashable key.
:return: value stored in key.
"""
hashed,files = self._map[key]
filename = os.path.join(self._path, hashed)
if os.path.isfile(filename):
try:
return self._loader(filename, self._mode)
except (EOFError,IOError):
raise CorruptPersistent("Persistent data is corrupt")
else:
raise KeyError
def _save(self, key, value):
"""
saves key to persistent files.
:param key: hashable key.
:param value: serializable object.
:return: None
"""
try:
hashed,files = self._map[key]
for file in files: # remove old keys
try:
os.remove(file) # FIXME enclose in try/except, who knows files could be deleted
except OSError:
pass # file could have been deleted
del self._map_old[key]
except KeyError:
hashed = self._getHash(key)
filename = os.path.join(self._path, hashed)
try:
# TODO: Consider implementing a set to keep track of hashes so that a hash is not repeated
# TODO: though add overloads, consider persisting the key along the value too for recovery purposes.
self._map_old[key] = (hashed,self._saver(value, filename))
self._save_map()
except OSError:
print(" Race condition in the creation of the directory ")
def _save_map(self):
"""
persist dictionary map to file (metadata).
"""
if self._map_old is not None:
with open(self._map_file, 'wb') as f: # FIXME: consumes too much time
return self._map_serializer.dump(self._map_old, f) # save dictionary
def _load_map(self):
"""
loads metadata of dictionary map from persisted file.
"""
try:
with open(self._map_file, 'rb') as f:
return self._map_serializer.load(f) # get session
except IOError as e:
return
except EOFError as e:
raise e
[docs] def clear(self): # overloads clear in the abc class
"""
Remove all items from D.
"""
# for security it is better to wait until all keys are
# safely deleted and not deleting everything at once
for key in list(self._map.keys()):
try:
del self[key]
except KeyError:
pass
"""
# This is really dangerous, an user could change the _path
# variable and delete something else. Or if there exists
# a symbolic link it could bring problems.
# http://stackoverflow.com/a/185941/5288758
folder = self._path
for the_file in os.listdir(folder):
file_path = os.path.join(folder, the_file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
#elif os.path.isdir(file_path): shutil.rmtree(file_path)
except Exception as e:
print(e)
self._map.clear()
"""
def __setitem__(self, key, value):
self._save(key,value)
def __getitem__(self, key):
return self._load(key)
def __delitem__(self, key):
hashed,files = self._map[key]
for file in files: # remove old keys
try:
os.remove(file) # FIXME enclose in try/except, who knows files could be deleted
except OSError:
pass # file could have been deleted
del self._map_old[key]
self._save_map()
def __iter__(self):
return iter(self._map)
def __len__(self):
return len(self._map)
def __contains__(self, key):
try:
#self[key] # It has to load values taking too long
hashed,files = self._map[key] # test that key is in map
for file in files: # test that key really exists in disk
if not os.path.exists(file):
raise KeyError
except KeyError:
return False
else:
return True