Contains a queue based channel implementation
A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception
This base class is not instantiated directly, but instead serves as constructor for Rwriter pairs.
Create a new channel
alias of AsyncQueue
A slightly faster version of a Channel, which sacrificed thead-safety for performance
alias of SyncQueue
A writer is an object providing write access to a possibly blocking reading device
Close the channel. Multiple close calls on a closed channel are no an error
Returns: | True if the channel was closed |
---|
Returns: | number of items already in the device, they could be read with a reader |
---|
Write the given item into the device :param block: True if the device may block until space for the item is available :param timeout: The time in seconds to wait for the device to become ready in blocking mode
The write end of a channel, a file-like interface for a channel
Close the channel. Multiple close calls on a closed channel are no an error
Returns: | True if the channel was closed |
---|
Implements a channel writer with callback functionality
Allows reading from a device
Support the Python 2 iterator syntax
read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects.
Parameters: |
|
---|---|
Returns: | single item in a list if count is 1, or a list of count items. If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned. |
Allows reading from a channel. The reader is thread-safe if the channel is as well
Implements a channel reader with callback functionality
Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate
Thrown when trying to write to a read-only queue
A Reader allowing to read items from an iterator, instead of a channel. Reads will never block. Its thread-safe
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(LockType) for information about locks.
Non-Blocking implementation of read
A channel which sends a callback before items are read from the channel
Install a callback to call after items have been read, but before they are returned to the caller. The callback may adjust the items and/or the list. If no function is provided, the callback is uninstalled
Returns: | the previously installed function |
---|
Install a callback to call with the item count to be read before any item is actually read from the channel. Exceptions will be propagated. If a function is not provided, the call is effectively uninstalled.
Returns: | the previously installed callback or None |
---|---|
Note: | The callback must be threadsafe if the channel is used by multiple threads. |
The write end of a channel which allows you to setup a callback to be called after an item was written to the channel
Install a callback to be called before the given item is written. It returns a possibly altered item which will be written to the channel instead, making it useful for pre-write item conversions. Providing None uninstalls the current method.
Returns: | the previously installed function or None |
---|---|
Note: | Must be thread-safe if the channel is used in multiple threads |
Simplistic implementation of a graph
A Node in the graph. They know their neighbours, and have an id which should resolve into a string
A simple graph implementation, keeping nodes and providing basic access and editing functions. The performance is only suitable for small graphs of not more than 10 nodes !
Add an undirected edge between the given nodes u and v.
Returns: | self |
---|---|
Raises ValueError: | |
If the new edge would create a cycle |
Add a new node to the graph :return: the newly added node
Return all input nodes of the given node, depth first, It will return the actual input node last, as it is required like that by the pool
Delete a node from the graph :return: self
Implementation of a thread-pool working with channels
A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.
Returns: | pool our task belongs to |
---|---|
Raises ValueError: | |
if the instance does not belong to a pool |
Returns: | reference to the pool we belong to |
---|
Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input
Returns: | task we read from |
---|---|
Raises ValueError: | |
If the instance is not attached to at task |
Returns: | reference to the task producing our items |
---|
A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero.
Work is distributed via Channels, which form a dependency graph. The evaluation is lazy, as work will only be done once an output is requested.
The thread pools inherent issue is the global interpreter lock that it will hit, which gets worse considering a few c extensions specifically lock their part globally as well. The only way this will improve is if custom c extensions are written which do some bulk work, but release the GIL once they have acquired their resources.
Due to the nature of having multiple objects in git, its easy to distribute that work cleanly among threads.
Note: | the current implementation returns channels which are meant to be used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it. |
---|
Add a new task to be processed.
Returns: | a read channel to retrieve processed items. If that handle is lost, the task will be considered orphaned and will be deleted on the next occasion. |
---|
Returns: | amount of tasks |
---|
Delete the task. Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume its items.
This method blocks until all tasks to be removed have been processed, if they are currently being processed.
Returns: | self |
---|
Set the amount of workers to use in this pool. When reducing the size, threads will continue with their work until they are done before effectively being removed.
Returns: | self |
---|---|
Parameters: | size – if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads. If the size is 0, newly added tasks will use channels which are NOT threadsafe to optimize item throughput. |
Note: | currently NOT threadsafe ! |
Returns: | amount of workers in the pool |
---|---|
Note: | method is not threadsafe ! |
Abstracts a named task, which contains additional information on how the task should be queued and processed.
Results of the item processing are sent to a writer, which is to be set by the creator using the set_writer method.
Items are read using the internal _read callable, subclasses are meant to set this to a callable that supports the Reader interface’s read function.
someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize
worker function. If False, a list of possibly multiple items will be passed instead.
A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times
Returns: | Exception caught during last processing or None |
---|
Returns: | True if the task’s write channel is closed |
---|
Returns: | True if we are finished processing |
---|
Process count items and send the result individually to the output channel
Set ourselves to being done, has we have completed the processing
Set the write channel to the given one
Returns: | a proxy to our write channel or None if non is set |
---|---|
Note: | you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads. |
Describes tasks which can be used with theaded pools
Implements a task which processes items from an iterable in a multi-processing safe manner
An input iterator for threaded pools
allocate_lock() -> lock object (allocate() is an obsolete synonym)
Create a new lock object. See help(LockType) for information about locks.
Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.
Returns: | pool we are attached to, or None |
---|
Returns: | input channel from which we read |
---|
Adjust the read method to the given one
Module with threading utilities
Simple function which terminates all of our threads :param whitelist: If whitelist is given, only the given threads will be terminated
Kills all worker threads the method has created by sending the quit signal. This takes over in case of an error in the main function
A simple thread able to terminate itself on behalf of the user.
Terminate a thread as follows:
t.stop_and_join()
Derived classes call _should_terminate() to determine whether they should abort gracefully
Schedule this thread to be terminated as soon as possible. :note: this method does not block.
Start the thread and return self
Ask the thread to stop its operation and wait for it to terminate :note: Depending on the implenetation, this might block a moment
This base allows to call functions on class instances natively. As it is meant to work with a pool, the result of the call must be handled by the callee. The thread runs forever unless it receives the terminate signal using its task queue.
Tasks could be anything, but should usually be class methods and arguments to allow the following:
inq = Queue() w = WorkerThread(inq) w.start() inq.put((WorkerThread.<method>, args, kwargs))
finally we call quit to terminate asap.
alternatively, you can make a call more intuitively - the output is the output queue allowing you to get the result right away or later w.call(arg, kwarg=’value’).get()
inq.put(WorkerThread.quit) w.join()
You may provide the following tuples as task: t[0] = class method, function or instance method t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine
Process input tasks until we receive the quit signal
If send via the inq of the thread, it will stop once it processed the function
Send stop message to ourselves - we don’t block, the thread will terminate once it has finished processing its input queue to receive our termination event
Module with utilities related to async operations
A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.
Set the writable flag of this queue to True or False :return: The previous state
An object providing a do-nothing lock interface for use in sync mode
Cleaned up code of the original condition object in order to make it run and respond faster.
Its vital that this method is threadsafe - we absolutely have to get a lock at the beginning of this method to be sure we get the correct amount of waiters back. If we bail out, although a waiter is about to be added, it will miss its wakeup notification, and block forever (possibly)
Thrown when trying to write to a read-only queue
Adapter to allow using a deque like a queue, without locking
:return:number of CPUs in the system :note: inspired by multiprocessing