# -*- encoding: utf-8 -*-
"""
GAplic
======
Container of gobjs.
:class:`GAplic` supplies the main loop in a thread o subprocess context.
:class:`GAplic` has the next methods:
* :meth:`GAplic.create_gobj`
* :meth:`GAplic.destroy_gobj`
* :meth:`GAplic.find_unique_gobj`
* :meth:`GAplic.send_event_to_external_gaplic`
* :meth:`GAplic.send_event_to_external_role`
* :meth:`GAplic.subscribe_event_from_external_gaplic`
* :meth:`GAplic.subscribe_event_from_external_role`
* :meth:`GAplic.unsubscribe_event_from_external_gaplic`
* :meth:`GAplic.unsubscribe_event_from_external_role`
* :meth:`GAplic.add_callback`
* :meth:`GAplic.start`
* :meth:`GAplic.stop`
* :meth:`GAplic.mt_subprocess`
A gaplic can run in a thread or subprocess context,
always with the same interface.
Auxiliary functions:
====================
To run a :class:`GAplic` instance like a **thread**:
:func:`setup_gaplic_thread`.
To run a :class:`GAplic` instance like a **subprocess**:
:func:`setup_gaplic_process`.
To use configuration file .ini
with `PasteDeploy <http://pythonpaste.org/deploy>`_ in composite mode:
:func:`gaplic_factory`.
GAplic class
============
.. autoclass:: GAplic
:members: create_gobj, destroy_gobj,
find_unique_gobj,
send_event_to_external_gaplic,
send_event_to_external_role,
subscribe_event_from_external_gaplic,
subscribe_event_from_external_role,
unsubscribe_event_from_external_gaplic,
unsubscribe_event_from_external_role,
add_callback,
start,
stop,
mt_subprocess
Thead context
=============
Running as thread::
ga = GAplic()
worker = setup_gaplic_thread(ga)
worker.start()
.. autofunction:: setup_gaplic_thread
.. autoclass:: GAplicThreadWorker
:members: run, join
Subprocess context
==================
Running as subprocess::
ga = GAplic()
worker = setup_gaplic_process(ga)
worker.start()
.. autofunction:: setup_gaplic_process
.. autoclass:: GAplicProcessWorker
:members: run, join
Runnig several threads or subprocesses::
from ginsfsm.gaplic import GAplic, setup_gaplic_thread
# run one gaplic as thread
ga_srv = GAplic('Server')
srv_worker = setup_gaplic_thread(ga_srv)
srv_worker.start()
ga_cli = GAplic('Client')
try:
# run the main gaplic as main process
ga_cli.start()
except (KeyboardInterrupt, SystemExit):
# stop the main gaplic
ga_srv.stop()
# wait to finish the other gaplic
srv_worker.join()
print('Program stopped')
Ini file configuration
======================
You can configure and run your gaplic applications with PasteDeploy.
Available are the :term:`gcreate` and :term:`gserve` commands,
similar to pcreate and pserve of Pyramid.
.. autofunction:: gaplic_factory
"""
import time
import threading
from collections import deque
from ginsfsm.globals import (
set_global_main_gaplic,
set_global_app,
add_global_thread,
add_global_subprocess,
get_gaplic_by_thread_ident,
)
from ginsfsm.compat import (
string_types,
iterkeys_,
)
from ginsfsm.deferred import (
DeferredList,
Deferred,
)
from ginsfsm.c_sock import (
poll_loop,
close_all_sockets,
_poll,
GSock,
)
from ginsfsm.gobj import GObj
from ginsfsm.router import GRouter
def _start_timer(seconds):
""" Start a timer of :param:`seconds` seconds.
The returned value must be used to check the end of the timer
with _test_timer() function.
"""
timer = time.time()
timer = timer + seconds
return timer
def _test_timer(value):
""" Check if timer :param:`value` has ended.
Return True if the timer has elapsed, False otherwise.
WARNING: it will fail when system clock has been changed.
TODO: check if system clock has been changed.
"""
timer_actual = time.time()
if timer_actual >= value:
return True
else:
return False
class _XTimer(object):
""" Group attributes for timing.
:param:`got_timer` callback will be executed :param:`sec` seconds.
The callback will be called with :param:`param1` parameter.
If :param:`autostart` is True, the timer will be cyclic.
"""
def __init__(self, sec, got_timer_func, param1, autostart):
self.sec = sec
self.got_timer_func = got_timer_func
self.param1 = param1
self.autostart = autostart
GAPLIC_FSM = {}
#def ac_deferred_callback(self, event):
# deferred_ref = event.deferred_ref
# self.deferred_list(deferred_ref, ext_event=event)
# self.deferred_list.delete(deferred_ref)
#
#GAPLIC_FSM = {
# 'event_list': ('EV_DEFERRED_CALLBACK: top input',),
# 'state_list': ('ST_IDLE',),
# 'machine': {
# 'ST_IDLE':
# (
# ('EV_DEFERRED_CALLBACK', ac_deferred_callback, 'ST_IDLE'),
# ),
# }
#}
GAPLIC_GCONFIG = {
'roles': [tuple, (), 0, None, 'Roles of gaplic'],
'ini_settings': [
dict, {}, 0, None,
'The ini settings will be set to all new created gobj'
' by overwrite_parameters() function'
],
# trace_mach is inherited from SMachine.
'trace_mach': [bool, False, 0, None, 'Display simple machine activity'],
# logger is inherited from SMachine.
'logger': [None, None, 0, None, ''],
'router_enabled': [
bool, False, 0, None,
'True if a (NOT Pyramid) router is enabled.'
],
'all_unique_names': [
bool, False, 0, None,
'All named-gobjs are unique-named gobjs'
],
}
[docs]class GAplic(GObj):
""" Container of gobj's running under the same process or thread.
:param name: name of the gaplic, default is ``None``.
:param ini_settings: keyword arguments,
with the parameters from a ini configfile.
The ini settings will be set to all new created gobj
by :func:`ginsfsm.gobj.GObj.overwrite_parameters` function.
.. note::
The parameters can be dot named to include
the :term:`named-gobj`'s destination of the parameters.
GAplic is the main boss.
Manage the timer's, event queues, etc.
Supplies register, deregister and search or named-events.
.. ginsfsm::
:fsm: GAPLIC_FSM
:gconfig: GAPLIC_GCONFIG
Example::
from ginsfsm.gaplic import GAplic
if __name__ == "__main__":
ga = GAplic(name='Example1')
ga.create_gobj('test_aplic', GPrincipal, None)
try:
ga.start()
except KeyboardInterrupt:
print('Program stopped')
"""
def __init__(self, name=None, roles=None, **ini_settings):
GObj.__init__(self, GAPLIC_FSM, GAPLIC_GCONFIG)
self.name = name
# TODO: register in gaplic-dns: (gaplic_name, roles, urls)
if isinstance(roles, (list, tuple)):
self.roles = roles
else:
self.roles = (roles,)
self.ini_settings = ini_settings.copy()
# Call stop() to stop gaplic
self.do_exit = multiprocessing.Event()
""" threading.Event() or multiprocessing.Event() object
to signal the stop of gaplic."""
self.loop_timeout = 0.5 # timeout to select(),poll() function.
""" Loop timeout. Default 0.5 seconds.
It's the minimun timer resolution you can have.
"""
self._impl_poll = _poll() # Used by gsock. epoll() implementation.
self._socket_map = {} # Used by gsock. Dict {fd:Gobj}
self._gotter_timers = {} # Dict with timers {_XTimer:timer value}
self._qevent = deque() # queue for post events.
self._callbacks = [] # callbacks compatible with tornado.io_loop
self._inside = 0 # to tab machine trace.
self._unique_named_gobjs = {}
self._thread_ident = None
self._thread_name = 0
self.gaplic = self
self.deferred_list = DeferredList()
logger = ini_settings.get('logger', None)
if not logger:
import logging
self.logger = logging.getLogger(__name__)
else:
if isinstance(logger, string_types):
import logging
self.logger = logging.getLogger(logger)
else:
self.logger = logger
if not self.logger:
logging.basicConfig(level=logging.DEBUG)
self.logger = logging
self.start_up()
def start_up(self):
""" Initialization zone.
"""
self.router_enabled = self.ini_settings.pop('router_enabled', False)
self.all_unique_names = self.ini_settings.pop('all_unique_names',
False)
if self.router_enabled:
self.start_up_router()
self.logger and self.logger.info('GAplic (%r) initiated' % self.name)
def start_up_router(self, pyramid_url=None, pyramid_root=None):
self.router = self.create_gobj(
'router',
GRouter,
self,
__unique_name__=True,
pyramid_root=pyramid_root,
pyramid_url=pyramid_url
)
def _increase_inside(self):
self._inside += 1
def _decrease_inside(self):
self._inside -= 1
def _tab(self):
if self._inside <= 0:
spaces = 1
else:
spaces = self._inside * 2
pad = ' ' * spaces
return pad
[docs] def create_gobj(self, name, gclass, parent, **kw):
""" Factory function to create gobj's instances.
Subclass of :meth:`ginsfsm.gobj.GObj.create_gobj` to do something else,
like to let :term:`unique-named-gobj` instances.
:param name: Name of the gobj.
If the key `__unique_name__` passed in *kw* is True,
then the gobj will be a :term:`unique-named-gobj` and.
the :meth:`GObj._register_unique_gobj` will be called.
If :meth:`GObj._register_unique_gobj` fails, a
:exc:GObjError exception will be raised.
:param gclass: `gclass` is the GObj type used to create the new gobj.
It's must be a derived class of :class:`ginsfsm.gobj.GObj`.
:param parent: parent of the new :term:`gobj`.
If `None`, the gobj will be a :term:`principal` gobj.
:param kw: Attributes that are added to the new :term:`gobj`.
All the keyword arguments used in the creation function
**are added as attributes** to ``config`` object.
You must consult the attributes supported
by each `gclass` type.
The attributes must be defined in the gclass GCONFIG,
otherwise they are ignored.
Special kw:
``'__unique_name__'``: Register the gobj as unique name gobj.
:rtype: new gobj instance.
When a :term:`gobj` is created by the factory function, it's added to
their parent child list :attr:`ginsfsm.gobj.GObj.dl_childs`,
and several attributes are created:
* **parent**: the parent :term:`gobj` of the created :term:`gobj`.
* **gaplic**: the :term:`gaplic` of the created :term:`gobj`.
If the `gclass` is subclass of :class:`ginsfsm.c_sock.GSock`
two private attributes are added to the created :term:`gobj`:
* **_socket_map**: dictionary of open sockets.
* **_impl_poll**: poll implementation: can be epoll, select, KQueue..
the best found option.
It's the base of the asynchronous behavior.
"""
attrs = {
'logger': self.logger,
'gaplic': self,
'create_gobj': self.create_gobj,
'destroy_gobj': self.destroy_gobj,
'_increase_inside': self._increase_inside,
'_decrease_inside': self._decrease_inside,
'_tab': self._tab,
}
if issubclass(gclass, GSock):
attrs.update({
'_socket_map': self._socket_map,
'_impl_poll': self._impl_poll
})
if self.all_unique_names:
attrs.update({'__unique_name__': True})
kw.update(attrs)
gobj = GObj.create_gobj(self, name, gclass, parent, **kw)
return gobj
@staticmethod
[docs] def destroy_gobj(gobj):
""" Destroy a gobj
"""
if gobj.__unique_name__:
gobj.gaplic._deregister_unique_gobj(gobj)
GObj.destroy_gobj(gobj)
def get_unique_named_gobjs(self):
""" Return the list of :term:`unique-named-gobj`'s.
"""
return [name for name in iterkeys_(self._unique_named_gobjs)]
def _register_unique_gobj(self, gobj):
""" Register a :term:`unique-named-gobj`.
"""
named_gobj = self._unique_named_gobjs.get(gobj.name, None)
if named_gobj is not None:
self.logger and self.logger.info(
'ERROR _register_unique_gobj() "%s" ALREADY REGISTERED' %
gobj.name)
return False
self._unique_named_gobjs[gobj.name] = gobj
self.__unique_name__ = True
return True
def _deregister_unique_gobj(self, gobj):
""" Deregister a :term:`unique-named-gobj`.
"""
named_gobj = self._unique_named_gobjs.get(gobj.name, None)
if named_gobj is not None:
del self._unique_named_gobjs[gobj.name]
return True
return False
[docs] def find_unique_gobj(self, gobj_name):
""" Find a :term:`unique-named-gobj`.
"""
named_gobj = self._unique_named_gobjs.get(gobj_name, None)
return named_gobj
[docs] def send_event_to_external_role(
self, role, gobj_name, event_name, subscriber_gobj, **kw):
""" Send an event to an external gaplic.
:param role: name of external role.
:param gobj_name: name of external gobj.
:param event_name: name of the event to send.
:param subscriber_gobj: subscriber obj that wants receive the response.
:param kw: keyword arguments.
Possible values for **kw** arguments:
* `__subscribe_response__`: ``Bool`` Subscribe the
response of external executed action.
Received with the same event.
"""
if not isinstance(role, string_types):
raise TypeError(
'Destination role %r must be a string'
% (role)
)
if not isinstance(gobj_name, string_types):
raise TypeError(
'Destination gobj %r must be a string'
% (gobj_name)
)
if not isinstance(event_name, string_types):
raise TypeError(
'Event name %r must be a string'
% (event_name)
)
if not isinstance(subscriber_gobj, (string_types, GObj)):
raise TypeError(
'Subscriber gobj %r must be a string'
% (subscriber_gobj)
)
if role in self.roles:
raise TypeError(
"Please don't use external methods to self gaplic."
% (role)
)
if isinstance(subscriber_gobj, (GObj)):
subscriber_gobj = subscriber_gobj.name
subs_gobj = self.find_unique_gobj(subscriber_gobj)
if not subs_gobj:
raise TypeError(
'Subscriber gobj %r must be a __unique_named__ gobj'
% (subscriber_gobj)
)
return self.router.mt_send_event_to_external_role(
role,
gobj_name,
event_name,
subscriber_gobj,
kw
)
[docs] def send_event_to_external_gaplic(
self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
""" Send an event to an external gaplic.
:param gaplic_name: name of external gaplic.
:param gobj_name: name of external gobj.
:param event_name: name of the event to send.
:param subscriber_gobj: subscriber obj that wants receive the response.
:param kw: keyword arguments.
Possible values for **kw** arguments:
* `__subscribe_response__`: ``Bool`` Subscribe the
response of external executed action.
Received with the same event.
"""
if not isinstance(gaplic_name, string_types):
raise TypeError(
'Destination gaplic %r must be a string'
% (gaplic_name)
)
if not isinstance(gobj_name, string_types):
raise TypeError(
'Destination gobj %r must be a string'
% (gobj_name)
)
if not isinstance(event_name, string_types):
raise TypeError(
'Event name %r must be a string'
% (event_name)
)
if not isinstance(subscriber_gobj, (string_types, GObj)):
raise TypeError(
'Subscriber gobj %r must be a string'
% (subscriber_gobj)
)
if gaplic_name == self.name:
raise TypeError(
"Please don't use external methods to self gaplic."
% (gaplic_name)
)
if isinstance(subscriber_gobj, (GObj)):
subscriber_gobj = subscriber_gobj.name
subs_gobj = self.find_unique_gobj(subscriber_gobj)
if not subs_gobj:
raise TypeError(
'Subscriber gobj %r must be a __unique_named__ gobj'
% (subscriber_gobj)
)
return self.router.mt_send_event_to_external_gaplic(
gaplic_name,
gobj_name,
event_name,
subscriber_gobj,
kw
)
[docs] def subscribe_event_from_external_gaplic(
self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
""" Subscribe an event of an external gaplic by name.
"""
kw.update({'__subscribe_event__': True})
return self.send_event_to_external_gaplic(
gaplic_name,
gobj_name,
event_name,
subscriber_gobj,
**kw
)
[docs] def subscribe_event_from_external_role(
self, role, gobj_name, event_name, subscriber_gobj, **kw):
""" Subscribe an event of an external gaplic by role.
"""
kw.update({'__subscribe_event__': True})
return self.send_event_to_external_role(
role,
gobj_name,
event_name,
subscriber_gobj,
**kw
)
[docs] def unsubscribe_event_from_external_gaplic(
self, gaplic_name, gobj_name, event_name, subscriber_gobj, **kw):
""" Subscribe an event of an external gaplic by name.
"""
kw.update({'__unsubscribe_event__': True})
return self.send_event_to_external_gaplic(
gaplic_name,
gobj_name,
event_name,
subscriber_gobj,
**kw
)
[docs] def unsubscribe_event_from_external_role(
self, role, gobj_name, event_name, subscriber_gobj, **kw):
""" Subscribe an event of an external gaplic by role.
"""
kw.update({'__unsubscribe_event__': True})
return self.send_event_to_external_role(
role,
gobj_name,
event_name,
subscriber_gobj,
**kw
)
def delete_all_references(self, gobj):
""" Delete all references of gobj in timer and event queues.
"""
# TODO: by the moment, be care with your event generation
def _loop(self):
""" process event queue, timer queue, and epoll.
Return True if there is some remain event for be proccessed.
Useful for testing purposes.
"""
if not self._thread_ident:
self._thread_ident = threading.current_thread().ident
self._thread_name = threading.current_thread().name
timeout = self.loop_timeout # iniatially wait loop_timeout seconds
remain = False # some work pending.
# Callbacks compatible with tornado.io_loop
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
callbacks = self._callbacks
if callbacks:
remain = True
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
timeout = 0.0
remain |= self._process_qevent()
if remain:
# They are remain events,
# we don't want to wait in poll() before we run them.
# wait some time, t
# o avoid recursive send events that puts 100% cpu.
# TODO: try set 0.0.
timeout = 0.1
poll_loop(self._socket_map, self._impl_poll, timeout)
#
remain |= self._process_timer() # don't remove or the tests will fail.
# oportunity for subclass.
self.mt_subprocess()
return remain
[docs] def start(self):
""" Run the infinite i/o event loop.
"""
while True:
# with do_exit Event set (being thread or process),
# wait to event set to exit, ignoring KeyboardInterrupt.
try:
if self.do_exit.is_set():
close_all_sockets(self._socket_map)
break
self._loop()
except (KeyboardInterrupt, SystemExit):
close_all_sockets(self._socket_map)
raise
[docs] def stop(self):
""" Signalize the gaplic instance to stop.
"""
self.do_exit.set()
[docs] def mt_subprocess(self):
""" Subclass :class:`GAplic` class and override this function
to do extra work in the infinite loop.
"""
def enqueue_event(self, event):
""" Post the event in the next :term:`gaplic` loop cycle,
not right now.
:param event: :term:`event` to send.
:param destination: destination :term:`gobj` of the event.
:param kw: keyword argument with data associated to event.
.. note::
All the keyword arguments **are added as attributes** to
the sent :term:`event`.
Same as :meth:`send_event` function but the event is sent in the
next :term:`gaplic` loop cycle, not right now.
It **does not return** the return of the executed action because the
action it's executed later, in the next loop cycle.
It's mandatory use this function, if the `destination`
:term:`gobj` is not local.
.. note:: It **DOES NOT** return the return of the executed action
because the action it's executed later, in the next loop cycle,
so you **CANNOT** receive valid direct data from the action.
.. warning:: If you use :meth:`post_event` without a :term:`gaplic`
then a :exc:`GAplicError` exception will be raised.
``destination`` must be a `string` or :class:`GObj` types, otherwise a
:exc:`GObjError` will be raised.
``event`` must be a `string` or :class:`Event` types, otherwise a
:exc:`EventError` will be raised.
If ``event`` is an :class:`Event` instance, a new :class:`Event`
duplicated instance is returned, but it will be updated with
the new ``destination`` and ``kw`` keyword arguments.
.. note::
All the keyword arguments used in the factory function
**are added as attributes** to the created :term:`event` instance.
You must consult the attributes supported by each machine's event.
"""
self._qevent.append(event)
def _process_qevent(self):
""" Return True if remains events.
"""
# ln = len(self._qevent)
# print "qevent...........%d" % (ln)
it = 0
maximum = 10
while True:
if it > maximum:
# balance the work
return True
try:
event = self._qevent.popleft()
except IndexError:
break
else:
it += 1
# To send to external gaplics, destination must be 'router'
try:
destination = self._resolv_destination(event.destination)
cur_ident = threading.current_thread().ident
if cur_ident != self._thread_ident:
if self.logger:
self.logger.error("??????????????????")
dst_ident = destination.gaplic._thread_ident
if cur_ident == dst_ident:
self.send_event(
destination,
event.event_name,
**event.kw)
else:
# Yeah, send to another gaplic
gaplic = get_gaplic_by_thread_ident(dst_ident)
if gaplic:
gaplic.enqueue_event(event)
except Exception:
if self.logger:
self.logger.error("Exception gaplic.__process_qevent")
return False
def _process_timer(self):
# don't use iteritems() items(),
# some xtimer can be removed during processing timers
some_event = False
try:
for xtimer in iterkeys_(self._gotter_timers):
try:
value = self._gotter_timers[xtimer]
except KeyError:
# timer deleted while loop.
continue
some_event = True
if value and _test_timer(value):
if xtimer.autostart:
self._gotter_timers[xtimer] = _start_timer(xtimer.sec)
else:
self._gotter_timers[xtimer] = 0
if xtimer.param1 is None:
xtimer.got_timer_func()
else:
xtimer.got_timer_func(xtimer.param1)
if not xtimer.autostart:
self._gotter_timers.pop(xtimer)
except RuntimeError:
# timer deleted while loop.
some_event = True
return some_event
def _setTimeout(self, sec, got_timer_func, param1=None, autostart=False):
""" Set a callback to be executed in ``sec`` seconds.
Function used by :class:`GTimer` gobj. Not for general use.
Return an object to be used in :func:`clearTimeout`.
"""
xtimer = _XTimer(sec, got_timer_func, param1, autostart)
self._gotter_timers[xtimer] = _start_timer(sec)
return xtimer
def _clearTimeout(self, xtimer):
""" Clear callback timeout.
Function used by :class:`GTimer` gobj. Not for general use.
"""
t = self._gotter_timers.get(xtimer, None)
if t:
# prevent timer cleared in proces_timer loop
self._gotter_timers[xtimer] = 0
self._gotter_timers.pop(xtimer)
def add_timeout(self, deadline, callback):
""" Compatible with tornado.io_loop
``deadline`` only seconds please.
Calls the given callback at the time deadline from the I/O loop.
Returns a handle that may be passed to remove_timeout to cancel.
``deadline`` may be a number denoting a unix timestamp (as returned
by ``time.time()`` or a ``datetime.timedelta`` object for a deadline
relative to the current time.
Note that it is not safe to call `add_timeout` from other threads.
Instead, you must use `add_callback` to transfer control to the
IOLoop's thread, and then call `add_timeout` from there.
"""
timer_id = self._setTimeout(deadline, callback)
return timer_id
def remove_timeout(self, timeout):
""" Compatible with tornado.io_loop
Cancels a pending timeout.
The argument is a handle as returned by add_timeout.
"""
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html).
# If this turns out to be a problem, we could add a garbage
# collection pass whenever there are too many dead timeouts.
timeout.callback = None
self._clearTimeout(timeout)
[docs] def add_callback(self, callback, *args, **kwargs):
""" Call the given callback in the next I/O loop iteration.
"""
list_empty = not self._callbacks
deferred = Deferred(0, callback, *args, **kwargs)
self._callbacks.append(deferred)
if list_empty:
if threading.current_thread().ident != self._thread_ident:
# If we're in the IOLoop's thread, we know it's not currently
# polling. If we're not, and we added the first callback to an
# empty list, we may need to wake it up (it may wake up on its
# own, but an occasional extra wake is harmless). Waking
# up a polling IOLoop is relatively expensive, so we try to
# avoid it when we can.
pass
# TODO: study this from tornado
# self._waker.wake()
def _run_callback(self, deferred):
try:
deferred()
except Exception:
self.handle_callback_exception(deferred)
def handle_callback_exception(self, callback):
"""This method is called whenever a callback run by the IOLoop
throws an exception.
By default simply logs the exception as an error. Subclasses
may override this method to customize reporting of exceptions.
The exception itself is not passed explicitly, but is available
in sys.exc_info.
"""
if self.logger:
self.logger.error(
"Exception in callback %r",
callback,
exc_info=True)
#===============================================================
# Thread wrapper for gaplic
#===============================================================
[docs]class GAplicThreadWorker(threading.Thread):
""" Class derived from :class:`threading.Thread` to run gaplic
in thread environment.
"""
def __init__(self, gaplic):
threading.Thread.__init__(self)
self.daemon = True
self.gaplic = gaplic
[docs] def run(self):
""" Override the :meth:`threading.Thread.run` method.
Run the gaplic loop in a separate thread.
"""
self.gaplic.start()
def stop(self):
""" Stop the worker.
"""
self.gaplic.stop()
[docs] def join(self, timeout=10.0): # wait until 10 seconds for thread killed.
""" Wait for worker to stop, until ``timeout`` seconds."""
super(GAplicThreadWorker, self).join(timeout)
[docs]def setup_gaplic_thread(gaplic):
""" Run gaplic as thread.
Return the worker.
You must call worker.start() to run the thread.
"""
worker = GAplicThreadWorker(gaplic)
add_global_thread(worker)
return worker
#===============================================================
# Process wrapper for gaplic
#===============================================================
import multiprocessing
[docs]class GAplicProcessWorker(multiprocessing.Process):
""" Class derived from :class:`multiprocessing.Process` to run gaplic
in subprocess environment.
"""
def __init__(self, gaplic):
multiprocessing.Process.__init__(self)
self.daemon = True
self.gaplic = gaplic
[docs] def run(self):
""" Override the :meth:`multiprocessing.Process.run` method.
Run the gaplic loop in a separate process.
"""
self.gaplic.start()
def stop(self):
""" Stop the worker.
"""
self.gaplic.stop()
[docs] def join(self, timeout=10.0): # wait until 10 seconds for process killed.
""" Wait for worker to stop, until ``timeout`` seconds."""
super(GAplicProcessWorker, self).join(timeout)
[docs]def setup_gaplic_process(gaplic):
""" Run gaplic as process.
Return the worker.
You must call worker.start() to run the subprocess.
"""
worker = GAplicProcessWorker(gaplic)
add_global_subprocess(worker)
return worker
[docs]def gaplic_factory(loader, global_conf, **local_conf):
""" To use with PasteDeploy in composite.
Items of *composite* section:
:main: name of a section that must return a :term:`gaplic`
instance. It will be the **principal** :term:`gaplic`.
:threads: name of sections that must return :term:`gaplic`
instances. They will run in threads.
:subprocesses: name of sections that must return :term:`gaplic`
instances. They will run in subprocesses.
:wsgi: name of sections that must return a *app paste factory*.
Wsgi applications are saved as global apps (set_global_app()).
Example::
[composite:main]
use = call:ginsfsm.gaplic:gaplic_factory
main = wsgi-server
wsgi = wsgi-application
[app:wsgi-server]
use = call:ginsfsm.examples.wsgi.simple_wsgi_server:main
host = 0.0.0.0
port = 8000
application = wsgi-application
GSock.trace_dump = true
GObj.trace_mach = true
[app:wsgi-application]
use=call:ginsfsm.examples.wsgi.simple_wsgi_server:paste_app_factory
The prototype for ``wsgi`` (paste app factory) is::
def paste_app_factory(global_conf, **local_conf):
return wsgi-application
The prototype for ``main``, ``threads`` and ``subprocesses`` is::
def main(global_conf, **local_config):
return gaplic-instance
"""
main = local_conf.get('main')
wsgis = local_conf.get('wsgi', '').split()
threads = local_conf.get('threads', '').split()
subprocesses = local_conf.get('subprocesses', '').split()
# Firstly create main gaplic, to can pass it to wsgi's
main_gaplic = loader.get_app(main, global_conf=global_conf)
set_global_main_gaplic(main_gaplic)
for thread in threads:
gaplic = loader.get_app(thread, global_conf=global_conf)
worker = setup_gaplic_thread(gaplic)
worker.start()
for subprocess in subprocesses:
gaplic = loader.get_app(subprocess, global_conf=global_conf)
worker = setup_gaplic_process(gaplic)
worker.start()
for wsgi in wsgis:
app = loader.get_app(wsgi, global_conf=global_conf)
set_global_app(wsgi, app)
return main_gaplic