Source code for cxmanage_api.tasks
# Copyright (c) 2012, Calxeda Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of Calxeda Inc. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
# DAMAGE.
from collections import deque
from threading import Thread, Lock, Event
from time import sleep
[docs]class Task(object):
"""A task object represents some unit of work to be done.
:param method: The actual method (function) to execute.
:type method: function
:param args: Arguments to pass to the named method to run.
:type args: list
"""
def __init__(self, method, *args):
"""Default constructor for the Task class."""
self.status = "Queued"
self.result = None
self.error = None
self._method = method
self._args = args
self._finished = Event()
[docs] def join(self):
"""Wait for this task to finish."""
self._finished.wait()
[docs] def is_alive(self):
"""Return true if this task hasn't been finished.
:returns: Whether or not the task is still alive.
:rtype: boolean
"""
return not self._finished.is_set()
def _run(self):
"""Execute this task. Should only be called by TaskWorker."""
self.status = "In Progress"
try:
self.result = self._method(*self._args)
self.status = "Completed"
except Exception as e:
self.error = e
self.status = "Failed"
self._finished.set()
[docs]class TaskQueue(object):
"""A task queue, consisting of a queue and a number of workers.
:param threads: Number of threads to create (if needed).
:type threads: integer
:param delay: Time to wait between
"""
def __init__(self, threads=48, delay=0):
"""Default constructor for the TaskQueue class."""
self.threads = threads
self.delay = delay
self._lock = Lock()
self._queue = deque()
self._workers = 0
[docs] def put(self, method, *args):
"""Add a task to the task queue, and spawn a worker if we're not full.
:param method: Named method to run.
:type method: string
:param args: Arguments to pass to the named method to run.
:type args: list
:returns: A Task that will be executed by a worker at a later time.
:rtype: Task
"""
self._lock.acquire()
task = Task(method, *args)
self._queue.append(task)
if self._workers < self.threads:
TaskWorker(task_queue=self, delay=self.delay)
self._workers += 1
self._lock.release()
return task
[docs] def get(self):
"""
Get a task from the task queue. Mainly used by workers.
:returns: A Task object that hasn't been executed yet.
:rtype: Task
:raises IndexError: If there are no tasks in the queue.
"""
self._lock.acquire()
try:
return self._queue.popleft()
finally:
self._lock.release()
def _remove_worker(self):
"""Decrement the worker count. Should only be used by TaskWorker."""
self._lock.acquire()
self._workers -= 1
self._lock.release()
[docs]class TaskWorker(Thread):
"""A worker thread that runs tasks from a TaskQueue.
:param task_queue: Task queue to get tasks from.
:type task_queue: TaskQueue
:param delay: Time to wait in-between execution.
"""
def __init__(self, task_queue, delay=0):
super(TaskWorker, self).__init__()
self.daemon = True
self._task_queue = task_queue
self._delay = delay
self.start()
[docs] def run(self):
"""Repeatedly get tasks from the TaskQueue and execute them."""
try:
while True:
sleep(self._delay)
task = self._task_queue.get()
task._run()
except:
self._task_queue._remove_worker()
DEFAULT_TASK_QUEUE = TaskQueue()
# End of file: ./tasks.py