Source code for b3j0f.annotation.async

# -*- coding: utf-8 -*-

# --------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2015 Jonathan Labéjof <jonathan.labejof@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --------------------------------------------------------------------

"""
Decorators dedicated to asynchronous programming.
"""

try:
    from threading import Thread, RLock
except ImportError as IE:
    from dummythreading import Thread, RLock

from time import sleep

try:
    from Queue import Queue
except ImportError:
    from queue import Queue

from signal import signal, SIGALRM, alarm

from b3j0f.annotation.interception import PrivateInterceptor
from b3j0f.annotation import Annotation
from b3j0f.annotation.oop import Mixin


[docs]class Synchronized(PrivateInterceptor): """ Transform a target into a thread safe target. """ #: lock attribute name _LOCK = '_lock' __slots__ = (_LOCK,) + PrivateInterceptor.__slots__ def __init__(self, lock=None, *args, **kwargs): super(Synchronized, self).__init__(*args, **kwargs) self._lock = RLock() if lock is None else lock def _interception(self, annotation, advicesexecutor): self._lock.acquire() result = advicesexecutor.execute() self._lock.release() return result
[docs]class SynchronizedClass(Synchronized): """ Transform a class into a thread safe class. """
[docs] def on_bind_target(self, target): for attribute in target.__dict__: if callable(attribute): Synchronized(attribute, self._lock)
[docs]class Asynchronous(Annotation): """ Transform a target into an asynchronous callable target. """
[docs] def threaded(self, *args, **kwargs): result = self.__wraps(args, kwargs) self.queue.put(result)
[docs] def on_bind_target(self, target): # add start function to wrapper super(Asynchronous, self).on_bind_target(target) setattr(target, 'start', self.start)
[docs] def start(self, *args, **kwargs): self.queue = Queue() thread = Thread(target=self.threaded, args=args, kwargs=kwargs) thread.start() return Asynchronous.Result(self.queue, thread)
[docs] class NotYetDoneException(Exception): pass
[docs] class Result(object): __slots__ = ('queue', 'thread') def __init__(self, queue, thread): super(Asynchronous.Result, self).__init__() self.queue = queue self.thread = thread
[docs] def is_done(self): return not self.thread.is_alive()
[docs] def get_result(self, wait=-1): if not self.is_done(): if wait >= 0: self.thread.join(wait) else: raise Asynchronous.NotYetDoneException( 'the call has not yet completed its task') if not hasattr(self, 'result'): self.result = self.queue.get() return self.result
[docs]class TimeOut(PrivateInterceptor): """ Raise an Exception if the target call has not finished in time. """
[docs] class TimeOutError(Exception): """ Exception thrown if time elapsed before the end of the target call. """ """ Default time out error message. """ DEFAULT_MESSAGE = \ 'Call of {0} with parameters {1} and {2} is timed out' def __init__(self, timeout_interceptor, frame): super(TimeOut.TimeOutError, self).__init__( timeout_interceptor.message.format( timeout_interceptor.target, timeout_interceptor.args, timeout_interceptor.kwargs) )
SECONDS = 'seconds' ERROR_MESSAGE = 'error_message' __slots__ = (SECONDS, ERROR_MESSAGE) + PrivateInterceptor.__slots__ def __init__( self, seconds, error_message=TimeOutError.DEFAULT_MESSAGE, *args, **kwargs ): super(TimeOut, self).__init__(*args, **kwargs) self.seconds = seconds self.error_message = error_message def _handle_timeout(self, signum, frame): raise TimeOut.TimeOutError(self) def _interception(self, advicesexecutor): signal(SIGALRM, self._handle_timeout) alarm(self.seconds) try: result = advicesexecutor.execute() finally: signal.alarm(0) return result
[docs]class Wait(PrivateInterceptor): """ Define a time to wait before and after a target call. """ DEFAULT_WAIT = 1 #: before attribute name BEFORE = 'before' #: after attribute name AFTER = 'after' __slots__ = (BEFORE, AFTER) + PrivateInterceptor.__slots__ def __init__( self, before=DEFAULT_WAIT, after=DEFAULT_WAIT, *args, **kwargs ): super(Wait, self).__init__(*args, **kwargs) self.before = before self.after = after def _interception(self, advicesexecutor): sleep(self.before) result = advicesexecutor.execute() sleep(self._after_seconds) return result
[docs]class Observable(PrivateInterceptor): """ Imlementation of the observer design pattern. It transforms a target into an observable object in adding method registerObserver, unregisterObserver and notify_observers. Observers listen to pre/post target interception. """ def __init__(self): self._super(Observable).__init__() self.observers = set()
[docs] def registerObserver(self, observer): """ Register an observer. """ self.observers.add(observer)
[docs] def unregisterObserver(self, observer): """ Unregister an observer. """ self.observers.remove(observer)
[docs] def notify_observers(self, target, args, kwargs, result=None, post=False): """ Notify observers with parameter calls and information about pre/post call. """ _observers = tuple(self.observers) for observer in _observers: observer.notify(target, args, kwargs, result, post)
[docs] def on_bind_target(self, target): Mixin.set_mixin(target, self.registerObserver) Mixin.set_mixin(target, self.unregisterObserver) Mixin.set_mixin(target, self.notify_observers)
def _pre_intercepts(self, target, args, kwargs): self._super(Observable)._pre_intercepts(target, args, kwargs) self.notify_observers(target, args, kwargs) def _post_intercepts(self, target, args, kwargs, result): self._super(Observable)._post_intercepts(target, args, kwargs) self.notify_observers(target, args, kwargs, result, post=True)