Streams

Bloop provides a simple, pythonic interface to DynamoDB's complex Streams API. This abstracts away the minutiae of managing and refreshing iterators, tracking sequence numbers and shard splits, merging records from adjacent shards, and saving and loading processing state.

Warning

Chronological order is not guaranteed for high throughput streams.

DynamoDB guarantees ordering:

  • within any single shard
  • across shards for a single hash/range key

There is no way to exactly order records from adjacent shards. High throughput streams provide approximate ordering using each record's "ApproximateCreationDateTime".

Tables with a single partition guarantee order across all records.

See Stream Internals for details.

Enable Streaming

Add the following to a model's Meta to enable a stream with new and old objects in each record:

class User(BaseModel):
    class Meta:
        stream = {
            "include": ["new", "old"]
        }
    id = Column(Integer, hash_key=True)
    email = Column(String)
    verified = Column(Boolean)

engine.bind(User)

"include" has four possible values, matching StreamViewType:

{"keys"}, {"new"}, {"old"}, {"new", "old"}

Create a Stream

Next, create a stream on the model. This example starts at "trim_horizon" to get all records from the last 24 hours, but could also be "latest" to only return records created after the stream was instantiated.

>>> stream = engine.stream(User, "trim_horizon")

If you want to start at a certain point in time, you can also use an arrow.arrow.Arrow datetime. Creating streams at a specific time is very expensive, and will iterate all records since the stream's trim_horizon until the target time.

>>> stream = engine.stream(User, arrow.now().replace(hours=-12))

If you are trying to resume processing from the same position as another stream, you should load from a persisted Stream.token instead of using a specific time. See Pausing and Resuming for an example of a stream token.

>>> import json
>>> original_stream = engine.stream(User, "trim_horizon")
>>> with open("/tmp/state", "w") as f:
...     json.dump(original_stream.token, f)
...
# Some time later
>>> with open("/tmp/state", "r") as f:
...     token = json.load(f)
...
>>> stream = engine.stream(User, token)

Retrieve Records

You only need to call next() on a Stream to get the next record:

>>> record = next(stream)

If there are no records at the current position, record will be None. A common pattern is to poll immediately when a record is found, but to wait a small amount when no record is found.

>>> while True:
...     record = next(stream)
...     if not record:
...         time.sleep(0.2)
...     else:
...         process(record)

Record Structure

Each record is a dict with instances of the model in one or more of "key", "old", and "new". These are populated according to the stream's "include" above, as well as the event type. A key-only stream will never have new or old objects. If a stream includes new and old objects and the event type is delete, new will be None.

Save a new user, and then update the email address:

>>> user = User(id=3, email="user@domain.com")
>>> engine.save(user)
>>> user.email = "admin@domain.com"
>>> engine.save(user)

The first record won't have an old value, since it was the first time this item was saved:

>>> next(stream)
{'key': None,
 'old': None,
 'new': User(email='user@domain.com', id=3, verified=None),
 'meta': {
     'created_at': <Arrow [2016-10-23T00:28:00-07:00]>,
     'event': {
         'id': '3fe6d339b7cb19a1474b3d853972c12a',
         'type': 'insert',
         'version': '1.1'},
     'sequence_number': '700000000007366876916'}
}

The second record shows the change to email, and has both old and new objects:

>>> next(stream)
{'key': None,
 'old': User(email='user@domain.com', id=3, verified=None),
 'new': User(email='admin@domain.com', id=3, verified=None),
 'meta': {
     'created_at': <Arrow [2016-10-23T00:28:00-07:00]>,
     'event': {
         'id': '73a4b8568a85a0bcac25799f806df239',
         'type': 'modify',
         'version': '1.1'},
     'sequence_number': '800000000007366876936'}
}

Periodic Heartbeats

You should call Stream.heartbeat() at least every 14 minutes in your processing loop.

Iterators only last 15 minutes, and need to be refreshed periodically. There's no way to safely refresh an iterator that hasn't found a record. For example, refreshing an iterator at "latest" could miss records since the time that the previous iterator was at "latest". If you call this every 15 minutes, an iterator may expire due to clock skew or processing time.

Only iterators without sequence numbers will be refreshed. Once a shard finds a record it's skipped on every subsequent heartbeat. For a moderately active stream, heartbeat will make about one call per shard.

The following pattern will call heartbeat every 12 minutes (if record processing is quick):

>>> next_heartbeat = arrow.now()
>>> while True:
...     record = next(stream)
...     process(record)
...     if arrow.now() > next_heartbeat:
...         next_heartbeat = arrow.now().replace(minutes=12)
...         stream.heartbeat()

Pausing and Resuming

Use Stream.token to save the current state and resume processing later:

>>> with open("/tmp/stream-token", "r" as f):
...     token = json.load(f)
...
>>> stream = engine.stream(User, token)

When reloading from a token, Bloop will automatically prune shards that have expired, and extend the state to include new shards. Any iterators that fell behind the current trim_horizon will be moved to each of their children's trim_horizons.

Here's a token from a new stream. After 8-12 hours there will be one active shard, but also a few closed shards that form the lineage of the stream.

{
    "active": [
        "shardId-00000001477207595861-d35d208d"
    ],
    "shards": [
        {
            "iterator_type": "after_sequence",
            "sequence_number": "800000000007366876936",
            "shard_id": "shardId-00000001477207595861-d35d208d"
        }
    ],
    "stream_arn": "arn:.../stream/2016-10-23T07:26:33.312"
}

Moving Around

This function takes the same position argument as Engine.stream:

# Any stream token; this one rebuilds the
# stream in its current location
>>> stream.move_to(stream.token)

# Jump back in time 2 hours
>>> stream.move_to(arrow.now().replace(hours=-2))

# Move to the oldest record in the stream
>>> stream.move_to("trim_horizon")

As noted above, moving to a specific time is very expensive.