API Reference

Channel

Contains a queue based channel implementation

class async.channel.Channel

A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception

This base class is not instantiated directly, but instead serves as constructor for Rwriter pairs.

Create a new channel

QueueCls

alias of AsyncQueue

queue
class async.channel.SerialChannel

A slightly faster version of a Channel, which sacrificed thead-safety for performance

QueueCls

alias of SyncQueue

class async.channel.Writer(device)

A writer is an object providing write access to a possibly blocking reading device

close()

Close the channel. Multiple close calls on a closed channel are no an error

closed()
Returns:True if the channel was closed
size()
Returns:number of items already in the device, they could be read with a reader
write(item, block=True, timeout=None)

Write the given item into the device :param block: True if the device may block until space for the item is available :param timeout: The time in seconds to wait for the device to become ready in blocking mode

class async.channel.ChannelWriter(channel)

The write end of a channel, a file-like interface for a channel

channel
close()

Close the channel. Multiple close calls on a closed channel are no an error

closed()
Returns:True if the channel was closed
size()
write(item, block=False, timeout=None)
class async.channel.CallbackChannelWriter(*args)

Implements a channel writer with callback functionality

class async.channel.Reader(device)

Allows reading from a device

next()

Support the Python 2 iterator syntax

read(count=0, block=True, timeout=None)

read a list of items read from the device. The list, as a sequence of items, is similar to the string of characters returned when reading from file like objects.

Parameters:
  • count – given amount of items to read. If < 1, all items will be read
  • block – if True, the call will block until an item is available
  • timeout – if positive and block is True, it will block only for the given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation.
Returns:

single item in a list if count is 1, or a list of count items. If the device was empty and count was 1, an empty list will be returned. If count was greater 1, a list with less than count items will be returned. If count was < 1, a list with all items that could be read will be returned.

class async.channel.ChannelReader(channel)

Allows reading from a channel. The reader is thread-safe if the channel is as well

channel
read(count=0, block=True, timeout=None)
class async.channel.CallbackChannelReader(*args)

Implements a channel reader with callback functionality

async.channel.mkchannel(ctype=<class 'async.channel.Channel'>, wtype=<class 'async.channel.ChannelWriter'>, rtype=<class 'async.channel.ChannelReader'>)

Create a channel, with a reader and a writer :return: tuple(reader, writer) :param ctype: Channel to instantiate :param wctype: The type of the write channel to instantiate :param rctype: The type of the read channel to instantiate

exception async.channel.ReadOnly

Thrown when trying to write to a read-only queue

class async.channel.IteratorReader(iterator)

A Reader allowing to read items from an iterator, instead of a channel. Reads will never block. Its thread-safe

lock_type()

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(LockType) for information about locks.

read(count=0, block=True, timeout=None)

Non-Blocking implementation of read

class async.channel.CallbackReaderMixin(*args)

A channel which sends a callback before items are read from the channel

read(count=0, block=True, timeout=None)
set_post_cb(fun=<function <lambda> at 0x10b27a938>)

Install a callback to call after items have been read, but before they are returned to the caller. The callback may adjust the items and/or the list. If no function is provided, the callback is uninstalled

Returns:the previously installed function
set_pre_cb(fun=<function <lambda> at 0x10b27a848>)

Install a callback to call with the item count to be read before any item is actually read from the channel. Exceptions will be propagated. If a function is not provided, the call is effectively uninstalled.

Returns:the previously installed callback or None
Note:The callback must be threadsafe if the channel is used by multiple threads.
class async.channel.CallbackWriterMixin(*args)

The write end of a channel which allows you to setup a callback to be called after an item was written to the channel

set_pre_cb(fun=<function <lambda> at 0x10b27a320>)

Install a callback to be called before the given item is written. It returns a possibly altered item which will be written to the channel instead, making it useful for pre-write item conversions. Providing None uninstalls the current method.

Returns:the previously installed function or None
Note:Must be thread-safe if the channel is used in multiple threads
write(item, block=True, timeout=None)

Graph

Simplistic implementation of a graph

class async.graph.Node(id=None)

A Node in the graph. They know their neighbours, and have an id which should resolve into a string

id
in_nodes
out_nodes
class async.graph.Graph

A simple graph implementation, keeping nodes and providing basic access and editing functions. The performance is only suitable for small graphs of not more than 10 nodes !

add_edge(u, v)

Add an undirected edge between the given nodes u and v.

Returns:self
Raises ValueError:
 If the new edge would create a cycle
add_node(node)

Add a new node to the graph :return: the newly added node

input_inclusive_dfirst_reversed(node)

Return all input nodes of the given node, depth first, It will return the actual input node last, as it is required like that by the pool

nodes
remove_node(node)

Delete a node from the graph :return: self

Pool

Implementation of a thread-pool working with channels

class async.pool.PoolReader(channel, task, pool)

A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.

pool()
Returns:pool our task belongs to
Raises ValueError:
 if the instance does not belong to a pool
pool_ref()
Returns:reference to the pool we belong to
read(count=0, block=True, timeout=None)

Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input

task()
Returns:task we read from
Raises ValueError:
 If the instance is not attached to at task
task_ref()
Returns:reference to the task producing our items
class async.pool.Pool(size=0)

A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero.

Work is distributed via Channels, which form a dependency graph. The evaluation is lazy, as work will only be done once an output is requested.

The thread pools inherent issue is the global interpreter lock that it will hit, which gets worse considering a few c extensions specifically lock their part globally as well. The only way this will improve is if custom c extensions are written which do some bulk work, but release the GIL once they have acquired their resources.

Due to the nature of having multiple objects in git, its easy to distribute that work cleanly among threads.

Note:the current implementation returns channels which are meant to be used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.
LockCls = None
TaskQueueCls = None
WorkerCls = None
add_task(task)

Add a new task to be processed.

Returns:a read channel to retrieve processed items. If that handle is lost, the task will be considered orphaned and will be deleted on the next occasion.
num_tasks()
Returns:amount of tasks
remove_task(task, _from_destructor_=False)

Delete the task. Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume its items.

This method blocks until all tasks to be removed have been processed, if they are currently being processed.

Returns:self
set_size(size=0)

Set the amount of workers to use in this pool. When reducing the size, threads will continue with their work until they are done before effectively being removed.

Returns:self
Parameters:size – if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads. If the size is 0, newly added tasks will use channels which are NOT threadsafe to optimize item throughput.
Note:currently NOT threadsafe !
size()
Returns:amount of workers in the pool
Note:method is not threadsafe !
class async.pool.ThreadPool(size=0)

A pool using threads as worker

LockCls()

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(LockType) for information about locks.

TaskQueueCls

alias of AsyncQueue

WorkerCls

alias of WorkerThread

Task

class async.task.Task(id, fun, apply_single=True, min_count=None, max_chunksize=0, writer=None)

Abstracts a named task, which contains additional information on how the task should be queued and processed.

Results of the item processing are sent to a writer, which is to be set by the creator using the set_writer method.

Items are read using the internal _read callable, subclasses are meant to set this to a callable that supports the Reader interface’s read function.

  • min_count assures that not less than min_count items will be processed per call.
  • max_chunksize assures that multi-threading is happening in smaller chunks. If
someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can specify this here, causing chunks to be no larger than max_chunksize
  • apply_single if True, default True, individual items will be given to the

    worker function. If False, a list of possibly multiple items will be passed instead.

apply_single
close()

A closed task will close its channel to assure the readers will wake up :note: its safe to call this method multiple times

error()
Returns:Exception caught during last processing or None
fun
is_closed()
Returns:True if the task’s write channel is closed
is_done()
Returns:True if we are finished processing
max_chunksize
min_count
process(count=0)

Process count items and send the result individually to the output channel

set_done()

Set ourselves to being done, has we have completed the processing

set_writer(writer)

Set the write channel to the given one

writer()
Returns:a proxy to our write channel or None if non is set
Note:you must not hold a reference to our write channel when the task is being processed. This would cause the write channel never to be closed as the task will think there is still another instance being processed which can close the channel once it is done. In the worst case, this will block your reads.
class async.task.ThreadTaskBase

Describes tasks which can be used with theaded pools

class async.task.IteratorTaskBase(iterator, *args, **kwargs)

Implements a task which processes items from an iterable in a multi-processing safe manner

class async.task.IteratorThreadTask(iterator, *args, **kwargs)

An input iterator for threaded pools

lock_type()

allocate_lock() -> lock object (allocate() is an obsolete synonym)

Create a new lock object. See help(LockType) for information about locks.

class async.task.ChannelThreadTask(in_reader, *args, **kwargs)

Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.

pool()
Returns:pool we are attached to, or None
reader()
Returns:input channel from which we read
set_pool(pool)
set_read(read)

Adjust the read method to the given one

Thread

Module with threading utilities

async.thread.do_terminate_threads(whitelist=[])

Simple function which terminates all of our threads :param whitelist: If whitelist is given, only the given threads will be terminated

async.thread.terminate_threads(func)

Kills all worker threads the method has created by sending the quit signal. This takes over in case of an error in the main function

class async.thread.TerminatableThread

A simple thread able to terminate itself on behalf of the user.

Terminate a thread as follows:

t.stop_and_join()

Derived classes call _should_terminate() to determine whether they should abort gracefully

schedule_termination()

Schedule this thread to be terminated as soon as possible. :note: this method does not block.

start()

Start the thread and return self

stop_and_join()

Ask the thread to stop its operation and wait for it to terminate :note: Depending on the implenetation, this might block a moment

class async.thread.WorkerThread(inq=None)

This base allows to call functions on class instances natively. As it is meant to work with a pool, the result of the call must be handled by the callee. The thread runs forever unless it receives the terminate signal using its task queue.

Tasks could be anything, but should usually be class methods and arguments to allow the following:

inq = Queue() w = WorkerThread(inq) w.start() inq.put((WorkerThread.<method>, args, kwargs))

finally we call quit to terminate asap.

alternatively, you can make a call more intuitively - the output is the output queue allowing you to get the result right away or later w.call(arg, kwarg=’value’).get()

inq.put(WorkerThread.quit) w.join()

You may provide the following tuples as task: t[0] = class method, function or instance method t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine

inq
run()

Process input tasks until we receive the quit signal

shutdown_check_time_s = 0.5
classmethod stop(*args)

If send via the inq of the thread, it will stop once it processed the function

stop_and_join()

Send stop message to ourselves - we don’t block, the thread will terminate once it has finished processing its input queue to receive our termination event

Util

Module with utilities related to async operations

class async.util.AsyncQueue(maxsize=0)

A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.

empty()
get(block=True, timeout=None)
mutex
not_empty
put(item, block=True, timeout=None)
qsize()
set_writable(state)

Set the writable flag of this queue to True or False :return: The previous state

writable()
class async.util.DummyLock

An object providing a do-nothing lock interface for use in sync mode

acquire()
release()
class async.util.HSCondition(lock=None)

Cleaned up code of the original condition object in order to make it run and respond faster.

acquire(block=None)
delay = 0.0002
notify(n=1)

Its vital that this method is threadsafe - we absolutely have to get a lock at the beginning of this method to be sure we get the correct amount of waiters back. If we bail out, although a waiter is about to be added, it will miss its wakeup notification, and block forever (possibly)

notify_all()
release()
wait(timeout=None)
exception async.util.ReadOnly

Thrown when trying to write to a read-only queue

class async.util.SyncQueue

Adapter to allow using a deque like a queue, without locking

empty()
get(block=True, timeout=None)
put(item, block=True, timeout=None)
set_writable(state)
writable()
async.util.cpu_count()

:return:number of CPUs in the system :note: inspired by multiprocessing

Table Of Contents

Previous topic

Usage Guide

Next topic

Changelog

This Page