1. Interprocess queues

Turberfield provides a pipes module for interprocess communication. Your programs can send and receive messages consisting of simple Python objects.

The module provides interprocess Queues. Two variants are available; one for use with an asyncio event loop, and one for code written in a blocking-call style. Both variants accept the following Python structures: strings, bytes, numbers, tuples, lists, dicts, sets, booleans, and None.

The Queues are implemented with POSIX named pipes; this module works only on those operating systems which support them.

class turberfield.utils.pipes.SimplePipeQueue(path, history=True)
Parameters:
  • path – supplies the path to the underlying POSIX named pipe.
  • history – If True, a pipe which already exists will be reused, and not removed after exiting the Queue.

This class can send messages without blocking your code:

pq = SimplePipeQueue.pipequeue("/tmp/pq.fifo")
pq.put_nowait((0, "First message."))
pq.close()

You can also use this class as a context manager. Don’t forget that get() is a blocking operation:

with SimplePipeQueue("/tmp/pq.fifo") as pq:
    msg = pq.get()
classmethod pipequeue(*args, **kwargs)

This is a factory function which creates and initialises a Queue. Your code should call close() on the queue when finished.

put_nowait(msg)

Put an item into the queue without blocking.

get()

Remove and return an item from the queue. If queue is empty, block until an item is available.

close()

Completes the use of the queue.

class turberfield.utils.pipes.PipeQueue(*args, **kwargs)
Parameters:
  • path – supplies the path to the underlying POSIX named pipe.
  • history – If True, a pipe which already exists will be reused, and not removed after exiting the Queue.

This is a subclass of SimplePipeQueue, extended for use like an asyncio.Queue:

pq = PipeQueue.pipequeue("/tmp/pq.fifo")
yield from pq.put((0, "First message."))
pq.close()

and:

pq = PipeQueue.pipequeue("/tmp/pq.fifo")
msg = yield from pq.get()
pq.close()
get()

Remove and return an item from the queue. If queue is empty, wait until an item is available.

This method is a coroutine.

put(msg)

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

This method is a coroutine.