API reference¶
General utilities¶
Graceful shutdown¶
-
aiotk.
run_until_complete
(coro, loop=None)¶ Run a task through to completion.
The
.run_until_complete()
method on asyncio event loop objects doesn’t finish tasks when it receives a SIGINT/CTRL-C. The method simply raises aKeyboardInterrupt
exception and this usually results in warnings about unfinished tasks plus some “event loop closed”RuntimeError
exceptions in pending tasks.This is a really annoying default behavior and this function aims at replacing that behavior with something that ensures the task actually runs through to completion. When the
KeyboardInterrupt
exception is caught, the task is canceled and resumed to give it a chance to clean up properly.
-
aiotk.
cancel
(task, loop=None)¶ Cancel a task and wait until it’s done.
Note: this function is a coroutine.
Canceling a child task and returning without waiting for the child task to complete is a common cause of “event loop closed”
RuntimeError
exceptions, especially during program shutdown. Therefore, this becomes a common pattern:task.cancel() await asyncio.wait({task})
However, if the parent task itself is also canceled, then the
asyncio.wait()
call will be interrupted and the child task will still not complete. To solve this, we must also manage to trap theasyncio.CancelledError
exception and callasyncio.wait({task})
again and properly re-raise theasyncio.CancelledError
exception. For example:task.cancel() try: await asyncio.wait({task}) except asyncio.CancelledError: await asyncio.wait({task}) raise
This is not trivial and must be done so many times in a program that cancels tasks that it merits a replacement API for
task.cancel()
.Parameters: - task – The
asyncio.Task
object to cancel. - loop – The event loop to use for awaiting. Defaults to the current event loop.
- task – The
-
aiotk.
cancel_all
(tasks, loop=None)¶ Cancel a set of tasks and wait until they’re done.
Note: this function is a coroutine.
Canceling a set of child tasks and returning without waiting for the child task to complete is a common cause of “event loop closed”
RuntimeError
exceptions, especially during shutdown of servers with one ore more task per connection. Therefore, this becomes a common pattern:for task in tasks: task.cancel() await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
However, if the parent task itself is also canceled, then the
asyncio.wait()
call will be interrupted and one or more of the child tasks will still not complete. To solve this, we must also manage to trap theasyncio.CancelledError
exception and callasyncio.wait(tasks)
again and properly re-raise theasyncio.CancelledError
exception. For example:for task in tasks: task.cancel() try: await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) except asyncio.CancelledError: await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) raise
This is not trivial and must be done so many server programs that cancels tasks that it merits a helper.
Parameters: - tasks – The set of
asyncio.Task
objects to cancel. - loop – The event loop to use for awaiting. Defaults to the current event loop.
- tasks – The set of
-
aiotk.
follow_through
(task, loop=None)¶ Wait for a task to complete (even if canceled while waiting).
Note: this function is a coroutine.
Not propagating cancellation to a child task and returning without waiting for the child task to complete is a common cause of “event loop closed”
RuntimeError
exceptions, especially during program shutdown. Therefore, this becomes a common pattern:try: await asyncio.wait({task}) except asyncio.CancelledError: task.cancel() await asyncio.wait({task}) raise return task.result()
This is not trivial and must be done so many times in a program that spawns child tasks that it merits a helper method.
Parameters: - task – The
asyncio.Task
object to see through to completion. - loop – The event loop to use for awaiting. Defaults to the current event loop.
- task – The
-
aiotk.
wait_until_cancelled
(*, propagate=True, loop=None)¶ Wait until the calling task is canceled.
Note: this function is a coroutine.
When using a context manager to complete one or more background tasks, it’s common to have the “main” task block until something cancels it (e.g. a SIGINT/CTRL-C handler).
It’s also convenient in tests that verify the behavior of cancellation to need to spawn a background task that waits forever.
This leads to the not-so-idiomatic:
await asyncio.Future()
This of often wrapped in a helper function to make the call more readable. Instead of propagating multiple variants of this, it should be placed in a library that everybody can import.
Parameters: loop – Loop in which the coroutine will block. Defaults to the current event loop.
-
class
aiotk.
AsyncExitStack
¶ Rollback stack for asynchronous context managers.
This context manager provides the following features over direct use of
contextlib.ExitStack
:- supports asynchronous context managers (in addition to regular context managers).
import asyncio from aiotk import AsyncExitStack class Greeter(object): def __init__(self, name): self._name = name async def __aenter__(self): print('Hello, %s!' % self._name) async def __aexit__(self, *args): print('Bye, %s!' % self._name) async def demo(): async with AsyncExitStack() as stack: await stack.enter_context(Greeter('Alice')) await stack.enter_context(Greeter('Bob')) asyncio.get_event_loop().run_until_complete(demo())
Hello, Alice! Hello, Bob! Bye, Bob! Bye, Alice!
-
enter_context
(context)¶ Push an (asynchronous) context manager onto the rollback stack.
Parameters: context – context manager or asynchronous context manager. Returns: The return value of the context manager’s __enter__
or__aenter__
method.
-
class
aiotk.
EnsureDone
(coro, cancel=True, loop=None)¶ Ensure a background task completes before leaving a code block.
This is mostly useful for spawning multiple tasks in the background using
AsyncExitStack
, but it can also be used directly for single tasks.from aiotk import EnsureDone, wait_until_cancelled async def background_task(): await wait_until_cancelled(loop=event_loop) async def demo(): async with EnsureDone(background_task()) as task: assert not task.done() assert task.done() asyncio.get_event_loop().run_until_complete(demo())
-
class
aiotk.
TaskPool
(loop=None)¶ Collect background tasks.
This is useful for server programs that wish to spawn one task per connection. It removes the need to implement the redundant task management code required to close tasks as they complete and the need to implement graceful shutdown (with cancellation) to clean up when shutting down the server.
import asyncio from aiotk import TaskPool, wait_until_cancelled async def background_task(ready): ready.set() await wait_until_cancelled() async def demo(): ready = asyncio.Event() async with TaskPool() as pool: task = await pool.spawn(background_task, ready) await ready.wait() assert not task.done() assert task.done() asyncio.get_event_loop().run_until_complete(demo())
-
close
()¶ Start the shutdown sequence.
-
spawn
(fn, *args, **kwds)¶ Add a new task to collect.
The pool is designed to keep memory usage as low as possible by collecting tasks as soon as possible; it keeps no references to the task once it has completed. If you want to get the task’s result, you can keep a reference to the
asyncio.Task
object that is returned.Parameters: - fn – The coroutine function to invoke.
- args – Positional arguments to pass to the coroutine function.
- kwds – Keyword arguments to pass to the coroutine function.
Returns: An
asyncio.Task
object.
-
start
()¶ Start the collection task.
-
wait_busy
()¶ Block until the task is known to be waiting for tasks to complete.
This is mostly useful for testing purposes (it avoids flaky behavior due to timing and makes the code paths more predictable).
-
wait_closed
()¶ Wait until the shutdown sequence has completed.
This unblocks once all tasks in the pool have completed and that the collection task has completed.
-
wait_idle
()¶ Block until all tasks have completed.
This is useful to perform fan-in: spawn a set of tasks and then wait for all of them to complete.
-
wait_started
()¶ Wait until the collection task has started.
-
Streams¶
-
aiotk.
udp_server
(host, port, service, loop=None)¶ Simple UDP-based service.
The only examples for UDP in asyncio documentation use protocols, the callback-based APIs and are a bit confusing (connection made? connection lost?).
This helper method tries to turn the low-level UDP socket support into a stream-based API. You pass in a coroutine function to which a pair of queues will be passed. From there, you can use async/await syntax to send and receive packets.
Parameters: - host – Network interface on which to bind.
- port – Port number on which to bind.
- loop – Loop in which the service will run.
Service: coroutine that will perform logic.
The
service
coroutine should have the following signature:async def my_udp_service(*, iqueue, oqueue, loop, **kwds): pass
The
iqueue
andoqueue
parameters areasyncio.Queue
objects that the coroutine can use to read from and write to, respectively.import asyncio from aiotk import ( cancel, udp_server, EnsureDone, AsyncExitStack, ) host = '127.0.0.1' server_port = 5555 client_port = 5556 async def echo_server(iqueue, oqueue, loop): """UDP echo server.""" try: while True: peer, data = await iqueue.get() assert peer == (host, client_port) await oqueue.put((peer, data)) except asyncio.CancelledError: pass async def echo_client(iqueue, oqueue, loop): """UDP echo client.""" # Repeatedly send until the server ACKs. item = None while item is None: try: item = iqueue.get_nowait() except asyncio.QueueEmpty: await asyncio.sleep(0.5) await oqueue.put(((host, server_port), b'PING')) peer, data = item assert peer == (host, server_port) assert data == b'PING' async def demo(): async with AsyncExitStack() as stack: server = await stack.enter_context(EnsureDone( udp_server(host, client_port, echo_server), )) client = await stack.enter_context(EnsureDone( udp_server(host, server_port, echo_client), )) await asyncio.wait_for(client, timeout=5.0) await cancel(server) assert client.result() is None assert server.result() is None asyncio.get_event_loop().run_until_complete(demo())
-
aiotk.
mempipe
(loop=None, limit=None)¶ In-memory pipe, returns a
(reader, writer)
pair.New in version 0.1.
import asyncio from aiotk import mempipe async def demo(): reader, writer = mempipe() writer.write('Hello, world!\n'.encode('utf-8')) rep = await reader.readline() print(rep.decode('utf-8').strip()) writer.close() asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
Testing¶
Subprocesses¶
-
aiotk.
mock_subprocess
(run, loop=None)¶ Calls
run()
instead of spawning a sub-process.Parameters: run – A coroutine function that simulates the sub-process. Can return None
or0
to simulate successful process execution or a non-zero error code to simulate sub-process terminate with a non-zero exit code. If an exception is raised, the result is 1 (non-zero). This function can accept a variable number of arguments, see below.Dependency injection is used with the
run()
coroutine function to pass only arguments that are declared in the function’s signature. Omit all but the arguments you intend to use. Here are all the available arguments:argv
: a list of strings passed as positional arguments toasyncio.create_subprocess_exec()
.stdin
: anasyncio.StreamReader
instance. When output is not redirected, this reads from the “real”sys.stdin
.stdout
: anasyncio.StreamWriter
instance. When output is not redirected, this writes to the “real”sys.stdout
.stderr
: anasyncio.StreamWriter
instance. When output is not redirected, this writes to the “real”sys.stderr
.env
: adict
containing environment variables passed toasyncio.create_subprocess_exec()
.signals
: anasyncio.Queue
object that receives integers passed toasyncio.Process.send_signal()
.kwds
: extra keyword arguments passed toasyncio.create_subprocess_exec()
.
import aiotk async def echo(stdin, stdout): line = await stdin.readline() while line: stdout.write(line) line = await stdin.readline() async def demo(): with aiotk.mock_subprocess(echo): process = await asyncio.create_subprocess_exec( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(input=b'Hello, world!\n'), timeout=5.0 ) assert stderr is None print(stdout.decode('utf-8').strip()) asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
Compatibility helpers¶
asyncio backports¶
-
aiotk.
monkey_patch
()¶ Applies all monkey patches.
This is a series of backports for asyncio functions in order to use them on all Python 3.5+ versions.
import asyncio import aiotk aiotk.monkey_patch() async def demo(): reader, writer = mempipe() writer.write('Hello, world!'.encode('utf-8')) rep = await reader.readuntil(b'!') print(rep.decode('utf-8')) writer.close() asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
CTRL-C / SIGINT handler¶
-
aiotk.
handle_ctrlc
(f, loop=None)¶ Context manager that schedules a callback when SIGINT is received.
Parameters: - f – future to fulfill when SIGINT is received. Will only be set once even if SIGINT is received multiple times.
- loop – event loop (defaults to global event loop).
import asyncio import os import signal from aiotk import handle_ctrlc async def demo(): loop = asyncio.get_event_loop() done = asyncio.Future() with handle_ctrlc(done): loop.call_soon(os.kill, os.getpid(), signal.SIGINT) print('Press CTRL-C to continue...') await asyncio.wait_for(done, timeout=1.0) print('Done!') asyncio.get_event_loop().run_until_complete(demo())
Press CTRL-C to continue... Done!
Network facilities¶
UNIX socket server¶
-
class
aiotk.
UnixSocketServer
(path, callback, loop=None)¶ Asynchronous context manager to accept UNIX connections.
This context manager provides the following features over direct use of
asyncio.start_unix_server()
:- connection handlers are coroutines
- prevent leaking connections when the writer is not properly closed by the connection handler
- automatically cancel all tasks that handle connections when it’s time to shut down
- wait until all connections are closed before shutting down the server application (includes handling of a rare race condition)
- automatically unlink the UNIX socket when shutting down (assuming the process does not crash)
Parameters: - path – Path to the UNIX socket on which to listen for incoming connections.
- callback – Coroutine function that will be used to spawn a task for
each established connection. This coroutine must accept two positional
arguments:
(reader, writer)
which allow interaction with the peer. - loop – Event loop in which to run the server’s asynchronous tasks.
When
None
, the current default event loop will be used.
import asyncio from aiotk import UnixSocketServer async def echo(reader, writer): chunk = await reader.read(256) while chunk: writer.write(chunk) chunk = await reader.read(256) async def demo(): path = './echo.sock' async with UnixSocketServer(path, echo): reader, writer = await asyncio.open_unix_connection(path) writer.write('Hello, world!\n'.encode('utf-8')) rep = await reader.readline() print(rep.decode('utf-8').strip()) writer.close() asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
-
close
()¶ Stop accepting connections.
Note
Since connections may still be pending in the kernel’s TCP stack at the time where you call this, it’s possible that new connections seem to get established after you signal your intent to stop accepting connections.
See:
-
path
¶ UNIX socket on which the server listens for incoming connections.
-
start
()¶ Start accepting connections.
Only use this method if you are not using the server as an asynchronous context manager.
See:
-
wait_closed
(timeout=None)¶ Wait until all connections are closed.
See:
-
wait_started
()¶ Wait until the server is ready to accept connections.
See:
TCP socket server¶
-
aiotk.
tcp_server
(**kwds)¶ Run a TCP server in the foreground.
Note: this function is a coroutine.
This context manager provides the following features over direct use of
asyncio.create_server()
:- runs in the foreground, blocking the calling task
- automatically closes the server object with graceful shutdown.
There are two main difference with the related
TCPServer
:- based on
asyncio.create_server()
for compatibility with libraries that are based on protocols (asyncio’s callback API) - runs in the foreground
-
class
aiotk.
TCPServer
(host, port, callback, loop=None)¶ Asynchronous context manager to accept TCP connections.
This context manager provides the following features over direct use of
asyncio.start_server()
:- connection handlers are coroutines
- prevent leaking connections when the writer is not properly closed by the connection handler
- automatically cancel all tasks that handle connections when it’s time to shut down
- wait until all connections are closed before shutting down the server application (includes handling of a rare race condition)
Parameters: - host – Network interface on which to listen for incoming connections.
- port – Port number on which to listen for incoming connections.
- callback – Coroutine function that will be used to spawn a task for
each established connection. This coroutine must accept two positional
arguments:
(reader, writer)
which allow interaction with the peer. - loop – Event loop in which to run the server’s asynchronous tasks.
When
None
, the current default event loop will be used.
import asyncio import random from aiotk import TCPServer async def echo(reader, writer): chunk = await reader.read(256) while chunk: writer.write(chunk) chunk = await reader.read(256) async def demo(): host = '127.0.0.1' port = random.randint(49152, 65535) async with TCPServer(host, port, echo): reader, writer = await asyncio.open_connection(host, port) writer.write('Hello, world!\n'.encode('utf-8')) rep = await reader.readline() print(rep.decode('utf-8').strip()) writer.close() asyncio.get_event_loop().run_until_complete(demo())
Hello, world!
-
close
()¶ Stop accepting connections.
Note
Since connections may still be pending in the kernel’s TCP stack at the time where you call this, it’s possible that new connections seem to get established after you signal your intent to stop accepting connections.
See:
-
host
¶ Network interface on which the server listens for connections.
See:
-
port
¶ Port number on which the server listens for connections.
See:
-
start
()¶ Start accepting connections.
Only use this method if you are not using the server as an asynchronous context manager.
See:
-
wait_closed
(timeout=None)¶ Wait until all connections are closed.
See:
-
wait_started
()¶ Wait until the server is ready to accept connections.
See: