API reference

General utilities

Graceful shutdown

aiotk.run_until_complete(coro, loop=None)

Run a task through to completion.

The .run_until_complete() method on asyncio event loop objects doesn’t finish tasks when it receives a SIGINT/CTRL-C. The method simply raises a KeyboardInterrupt exception and this usually results in warnings about unfinished tasks plus some “event loop closed” RuntimeError exceptions in pending tasks.

This is a really annoying default behavior and this function aims at replacing that behavior with something that ensures the task actually runs through to completion. When the KeyboardInterrupt exception is caught, the task is canceled and resumed to give it a chance to clean up properly.

aiotk.cancel(task, loop=None)

Cancel a task and wait until it’s done.

Note: this function is a coroutine.

Canceling a child task and returning without waiting for the child task to complete is a common cause of “event loop closed” RuntimeError exceptions, especially during program shutdown. Therefore, this becomes a common pattern:

task.cancel()
await asyncio.wait({task})

However, if the parent task itself is also canceled, then the asyncio.wait() call will be interrupted and the child task will still not complete. To solve this, we must also manage to trap the asyncio.CancelledError exception and call asyncio.wait({task}) again and properly re-raise the asyncio.CancelledError exception. For example:

task.cancel()
try:
    await asyncio.wait({task})
except asyncio.CancelledError:
    await asyncio.wait({task})
    raise

This is not trivial and must be done so many times in a program that cancels tasks that it merits a replacement API for task.cancel().

Parameters:
  • task – The asyncio.Task object to cancel.
  • loop – The event loop to use for awaiting. Defaults to the current event loop.
aiotk.cancel_all(tasks, loop=None)

Cancel a set of tasks and wait until they’re done.

Note: this function is a coroutine.

Canceling a set of child tasks and returning without waiting for the child task to complete is a common cause of “event loop closed” RuntimeError exceptions, especially during shutdown of servers with one ore more task per connection. Therefore, this becomes a common pattern:

for task in tasks:
    task.cancel()
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

However, if the parent task itself is also canceled, then the asyncio.wait() call will be interrupted and one or more of the child tasks will still not complete. To solve this, we must also manage to trap the asyncio.CancelledError exception and call asyncio.wait(tasks) again and properly re-raise the asyncio.CancelledError exception. For example:

for task in tasks:
    task.cancel()
try:
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
except asyncio.CancelledError:
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    raise

This is not trivial and must be done so many server programs that cancels tasks that it merits a helper.

Parameters:
  • tasks – The set of asyncio.Task objects to cancel.
  • loop – The event loop to use for awaiting. Defaults to the current event loop.
aiotk.follow_through(task, loop=None)

Wait for a task to complete (even if canceled while waiting).

Note: this function is a coroutine.

Not propagating cancellation to a child task and returning without waiting for the child task to complete is a common cause of “event loop closed” RuntimeError exceptions, especially during program shutdown. Therefore, this becomes a common pattern:

try:
    await asyncio.wait({task})
except asyncio.CancelledError:
    task.cancel()
    await asyncio.wait({task})
    raise
return task.result()

This is not trivial and must be done so many times in a program that spawns child tasks that it merits a helper method.

Parameters:
  • task – The asyncio.Task object to see through to completion.
  • loop – The event loop to use for awaiting. Defaults to the current event loop.
aiotk.wait_until_cancelled(*, propagate=True, loop=None)

Wait until the calling task is canceled.

Note: this function is a coroutine.

When using a context manager to complete one or more background tasks, it’s common to have the “main” task block until something cancels it (e.g. a SIGINT/CTRL-C handler).

It’s also convenient in tests that verify the behavior of cancellation to need to spawn a background task that waits forever.

This leads to the not-so-idiomatic:

await asyncio.Future()

This of often wrapped in a helper function to make the call more readable. Instead of propagating multiple variants of this, it should be placed in a library that everybody can import.

Parameters:loop – Loop in which the coroutine will block. Defaults to the current event loop.
class aiotk.AsyncExitStack

Rollback stack for asynchronous context managers.

This context manager provides the following features over direct use of contextlib.ExitStack:

  • supports asynchronous context managers (in addition to regular context managers).
import asyncio
from aiotk import AsyncExitStack

class Greeter(object):

    def __init__(self, name):
        self._name = name

    async def __aenter__(self):
        print('Hello, %s!' % self._name)

    async def __aexit__(self, *args):
        print('Bye, %s!' % self._name)

async def demo():
    async with AsyncExitStack() as stack:
        await stack.enter_context(Greeter('Alice'))
        await stack.enter_context(Greeter('Bob'))

asyncio.get_event_loop().run_until_complete(demo())
Hello, Alice!
Hello, Bob!
Bye, Bob!
Bye, Alice!
enter_context(context)

Push an (asynchronous) context manager onto the rollback stack.

Parameters:context – context manager or asynchronous context manager.
Returns:The return value of the context manager’s __enter__ or __aenter__ method.
class aiotk.EnsureDone(coro, cancel=True, loop=None)

Ensure a background task completes before leaving a code block.

This is mostly useful for spawning multiple tasks in the background using AsyncExitStack, but it can also be used directly for single tasks.

from aiotk import EnsureDone, wait_until_cancelled

async def background_task():
    await wait_until_cancelled(loop=event_loop)

async def demo():
    async with EnsureDone(background_task()) as task:
        assert not task.done()

    assert task.done()

asyncio.get_event_loop().run_until_complete(demo())
class aiotk.TaskPool(loop=None)

Collect background tasks.

This is useful for server programs that wish to spawn one task per connection. It removes the need to implement the redundant task management code required to close tasks as they complete and the need to implement graceful shutdown (with cancellation) to clean up when shutting down the server.

import asyncio
from aiotk import TaskPool, wait_until_cancelled

async def background_task(ready):
    ready.set()
    await wait_until_cancelled()

async def demo():
    ready = asyncio.Event()
    async with TaskPool() as pool:
        task = await pool.spawn(background_task, ready)
        await ready.wait()
        assert not task.done()

    assert task.done()

asyncio.get_event_loop().run_until_complete(demo())
close()

Start the shutdown sequence.

spawn(fn, *args, **kwds)

Add a new task to collect.

The pool is designed to keep memory usage as low as possible by collecting tasks as soon as possible; it keeps no references to the task once it has completed. If you want to get the task’s result, you can keep a reference to the asyncio.Task object that is returned.

Parameters:
  • fn – The coroutine function to invoke.
  • args – Positional arguments to pass to the coroutine function.
  • kwds – Keyword arguments to pass to the coroutine function.
Returns:

An asyncio.Task object.

start()

Start the collection task.

wait_busy()

Block until the task is known to be waiting for tasks to complete.

This is mostly useful for testing purposes (it avoids flaky behavior due to timing and makes the code paths more predictable).

wait_closed()

Wait until the shutdown sequence has completed.

This unblocks once all tasks in the pool have completed and that the collection task has completed.

wait_idle()

Block until all tasks have completed.

This is useful to perform fan-in: spawn a set of tasks and then wait for all of them to complete.

wait_started()

Wait until the collection task has started.

Streams

aiotk.udp_server(host, port, service, loop=None)

Simple UDP-based service.

The only examples for UDP in asyncio documentation use protocols, the callback-based APIs and are a bit confusing (connection made? connection lost?).

This helper method tries to turn the low-level UDP socket support into a stream-based API. You pass in a coroutine function to which a pair of queues will be passed. From there, you can use async/await syntax to send and receive packets.

Parameters:
  • host – Network interface on which to bind.
  • port – Port number on which to bind.
  • loop – Loop in which the service will run.
Service:

coroutine that will perform logic.

The service coroutine should have the following signature:

async def my_udp_service(*, iqueue, oqueue, loop, **kwds):
    pass

The iqueue and oqueue parameters are asyncio.Queue objects that the coroutine can use to read from and write to, respectively.

import asyncio
from aiotk import (
    cancel,
    udp_server,
    EnsureDone,
    AsyncExitStack,
)

host = '127.0.0.1'
server_port = 5555
client_port = 5556

async def echo_server(iqueue, oqueue, loop):
    """UDP echo server."""

    try:
        while True:
            peer, data = await iqueue.get()
            assert peer == (host, client_port)
            await oqueue.put((peer, data))
    except asyncio.CancelledError:
        pass

async def echo_client(iqueue, oqueue, loop):
    """UDP echo client."""

    # Repeatedly send until the server ACKs.
    item = None
    while item is None:
        try:
            item = iqueue.get_nowait()
        except asyncio.QueueEmpty:
            await asyncio.sleep(0.5)
            await oqueue.put(((host, server_port), b'PING'))

    peer, data = item
    assert peer == (host, server_port)
    assert data == b'PING'

async def demo():
    async with AsyncExitStack() as stack:
        server = await stack.enter_context(EnsureDone(
            udp_server(host, client_port, echo_server),
        ))
        client = await stack.enter_context(EnsureDone(
            udp_server(host, server_port, echo_client),
        ))
        await asyncio.wait_for(client, timeout=5.0)
        await cancel(server)

    assert client.result() is None
    assert server.result() is None

asyncio.get_event_loop().run_until_complete(demo())
aiotk.mempipe(loop=None, limit=None)

In-memory pipe, returns a (reader, writer) pair.

New in version 0.1.

import asyncio
from aiotk import mempipe

async def demo():
    reader, writer = mempipe()
    writer.write('Hello, world!\n'.encode('utf-8'))
    rep = await reader.readline()
    print(rep.decode('utf-8').strip())
    writer.close()

asyncio.get_event_loop().run_until_complete(demo())
Hello, world!

Testing

Subprocesses

aiotk.mock_subprocess(run, loop=None)

Calls run() instead of spawning a sub-process.

Parameters:run – A coroutine function that simulates the sub-process. Can return None or 0 to simulate successful process execution or a non-zero error code to simulate sub-process terminate with a non-zero exit code. If an exception is raised, the result is 1 (non-zero). This function can accept a variable number of arguments, see below.

Dependency injection is used with the run() coroutine function to pass only arguments that are declared in the function’s signature. Omit all but the arguments you intend to use. Here are all the available arguments:

  • argv: a list of strings passed as positional arguments to asyncio.create_subprocess_exec().
  • stdin: an asyncio.StreamReader instance. When output is not redirected, this reads from the “real” sys.stdin.
  • stdout: an asyncio.StreamWriter instance. When output is not redirected, this writes to the “real” sys.stdout.
  • stderr: an asyncio.StreamWriter instance. When output is not redirected, this writes to the “real” sys.stderr.
  • env: a dict containing environment variables passed to asyncio.create_subprocess_exec().
  • signals: an asyncio.Queue object that receives integers passed to asyncio.Process.send_signal().
  • kwds: extra keyword arguments passed to asyncio.create_subprocess_exec().
import aiotk

async def echo(stdin, stdout):
    line = await stdin.readline()
    while line:
        stdout.write(line)
        line = await stdin.readline()

async def demo():
    with aiotk.mock_subprocess(echo):
        process = await asyncio.create_subprocess_exec(
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await asyncio.wait_for(
            process.communicate(input=b'Hello, world!\n'), timeout=5.0
        )
        assert stderr is None
        print(stdout.decode('utf-8').strip())

asyncio.get_event_loop().run_until_complete(demo())
Hello, world!

Compatibility helpers

asyncio backports

aiotk.monkey_patch()

Applies all monkey patches.

This is a series of backports for asyncio functions in order to use them on all Python 3.5+ versions.

import asyncio
import aiotk

aiotk.monkey_patch()

async def demo():
    reader, writer = mempipe()
    writer.write('Hello, world!'.encode('utf-8'))
    rep = await reader.readuntil(b'!')
    print(rep.decode('utf-8'))
    writer.close()

asyncio.get_event_loop().run_until_complete(demo())
Hello, world!

CTRL-C / SIGINT handler

aiotk.handle_ctrlc(f, loop=None)

Context manager that schedules a callback when SIGINT is received.

Parameters:
  • f – future to fulfill when SIGINT is received. Will only be set once even if SIGINT is received multiple times.
  • loop – event loop (defaults to global event loop).
import asyncio
import os
import signal

from aiotk import handle_ctrlc

async def demo():
    loop = asyncio.get_event_loop()
    done = asyncio.Future()
    with handle_ctrlc(done):
        loop.call_soon(os.kill, os.getpid(), signal.SIGINT)
        print('Press CTRL-C to continue...')
        await asyncio.wait_for(done, timeout=1.0)
    print('Done!')

asyncio.get_event_loop().run_until_complete(demo())
Press CTRL-C to continue...
Done!

Network facilities

UNIX socket server

class aiotk.UnixSocketServer(path, callback, loop=None)

Asynchronous context manager to accept UNIX connections.

This context manager provides the following features over direct use of asyncio.start_unix_server():

  • connection handlers are coroutines
  • prevent leaking connections when the writer is not properly closed by the connection handler
  • automatically cancel all tasks that handle connections when it’s time to shut down
  • wait until all connections are closed before shutting down the server application (includes handling of a rare race condition)
  • automatically unlink the UNIX socket when shutting down (assuming the process does not crash)
Parameters:
  • path – Path to the UNIX socket on which to listen for incoming connections.
  • callback – Coroutine function that will be used to spawn a task for each established connection. This coroutine must accept two positional arguments: (reader, writer) which allow interaction with the peer.
  • loop – Event loop in which to run the server’s asynchronous tasks. When None, the current default event loop will be used.
import asyncio

from aiotk import UnixSocketServer

async def echo(reader, writer):
    chunk = await reader.read(256)
    while chunk:
        writer.write(chunk)
        chunk = await reader.read(256)

async def demo():
    path = './echo.sock'
    async with UnixSocketServer(path, echo):
        reader, writer = await asyncio.open_unix_connection(path)
        writer.write('Hello, world!\n'.encode('utf-8'))
        rep = await reader.readline()
        print(rep.decode('utf-8').strip())
        writer.close()

asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
close()

Stop accepting connections.

Note

Since connections may still be pending in the kernel’s TCP stack at the time where you call this, it’s possible that new connections seem to get established after you signal your intent to stop accepting connections.

See:

path

UNIX socket on which the server listens for incoming connections.

start()

Start accepting connections.

Only use this method if you are not using the server as an asynchronous context manager.

See:

wait_closed(timeout=None)

Wait until all connections are closed.

See:

wait_started()

Wait until the server is ready to accept connections.

See:

TCP socket server

aiotk.tcp_server(**kwds)

Run a TCP server in the foreground.

Note: this function is a coroutine.

This context manager provides the following features over direct use of asyncio.create_server():

  • runs in the foreground, blocking the calling task
  • automatically closes the server object with graceful shutdown.

There are two main difference with the related TCPServer:

  • based on asyncio.create_server() for compatibility with libraries that are based on protocols (asyncio’s callback API)
  • runs in the foreground
class aiotk.TCPServer(host, port, callback, loop=None)

Asynchronous context manager to accept TCP connections.

This context manager provides the following features over direct use of asyncio.start_server():

  • connection handlers are coroutines
  • prevent leaking connections when the writer is not properly closed by the connection handler
  • automatically cancel all tasks that handle connections when it’s time to shut down
  • wait until all connections are closed before shutting down the server application (includes handling of a rare race condition)
Parameters:
  • host – Network interface on which to listen for incoming connections.
  • port – Port number on which to listen for incoming connections.
  • callback – Coroutine function that will be used to spawn a task for each established connection. This coroutine must accept two positional arguments: (reader, writer) which allow interaction with the peer.
  • loop – Event loop in which to run the server’s asynchronous tasks. When None, the current default event loop will be used.
import asyncio
import random

from aiotk import TCPServer

async def echo(reader, writer):
    chunk = await reader.read(256)
    while chunk:
        writer.write(chunk)
        chunk = await reader.read(256)

async def demo():
    host = '127.0.0.1'
    port = random.randint(49152, 65535)
    async with TCPServer(host, port, echo):
        reader, writer = await asyncio.open_connection(host, port)
        writer.write('Hello, world!\n'.encode('utf-8'))
        rep = await reader.readline()
        print(rep.decode('utf-8').strip())
        writer.close()

asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
close()

Stop accepting connections.

Note

Since connections may still be pending in the kernel’s TCP stack at the time where you call this, it’s possible that new connections seem to get established after you signal your intent to stop accepting connections.

See:

host

Network interface on which the server listens for connections.

See:

port

Port number on which the server listens for connections.

See:

start()

Start accepting connections.

Only use this method if you are not using the server as an asynchronous context manager.

See:

wait_closed(timeout=None)

Wait until all connections are closed.

See:

wait_started()

Wait until the server is ready to accept connections.

See: