Internal

In addition to documenting internal classes, this section describes complex internal systems (such as Streams, atomic tracking via weakrefs) and specific parameters and error handling that Bloop employs when talking to DynamoDB (such as SessionWrapper's error inspection, and partial table validation).

SessionWrapper

class bloop.session.SessionWrapper(dynamodb=None, dynamodbstreams=None)[source]

Provides a consistent interface to DynamoDb and DynamoDbStreams clients.

If either client is None, that client is built using boto3.client().

Parameters:
  • dynamodb -- A boto3 client for DynamoDB. Defaults to boto3.client("dynamodb").
  • dynamodbstreams -- A boto3 client for DynamoDbStreams. Defaults to boto3.client("dynamodbstreams").
create_table(model)[source]

Create the model's table.

Does not wait for the table to create, and does not validate an existing table. Will not raise "ResourceInUseException" if the table exists or is being created.

Parameters:model -- The BaseModel to create the table for.
delete_item(item)[source]

Delete an object in DynamoDB.

Parameters:item -- Unpacked into kwargs for boto3.DynamoDB.Client.delete_item().
Raises:bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.
describe_stream(stream_arn, first_shard=None)[source]

Wraps boto3.DynamoDBStreams.Client.describe_stream(), handling continuation tokens.

Parameters:
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].
  • first_shard (str) -- (Optional) If provided, only shards after this shard id will be returned.
Returns:

All shards in the stream, or a subset if first_shard is provided.

Return type:

list

get_shard_iterator(*, stream_arn, shard_id, iterator_type, sequence_number=None)[source]

Wraps boto3.DynamoDBStreams.Client.get_shard_iterator().

Parameters:
  • stream_arn (str) -- Stream arn. Usually Shard.stream_arn.
  • shard_id (str) -- Shard identifier. Usually Shard.shard_id.
  • iterator_type (str) -- "sequence_at", "sequence_after", "trim_horizon", or "latest"
  • sequence_number --
Returns:

Iterator id, valid for 15 minutes.

Return type:

str

Raises:

bloop.exceptions.RecordsExpired -- Tried to get an iterator beyond the Trim Horizon.

get_stream_records(iterator_id)[source]

Wraps boto3.DynamoDBStreams.Client.get_records().

Parameters:

iterator_id -- Iterator id. Usually Shard.iterator_id.

Returns:

Dict with "Records" list (may be empty) and "NextShardIterator" str (may not exist).

Return type:

dict

Raises:
load_items(items)[source]

Loads any number of items in chunks, handling continuation tokens.

Parameters:items -- Unpacked in chunks into "RequestItems" for boto3.DynamoDB.Client.batch_get_item().
query_items(request)[source]

Wraps boto3.DynamoDB.Client.query().

Response always includes "Count" and "ScannedCount"

Parameters:request -- Unpacked into boto3.DynamoDB.Client.query()
save_item(item)[source]

Save an object to DynamoDB.

Parameters:item -- Unpacked into kwargs for boto3.DynamoDB.Client.update_item().
Raises:bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.
scan_items(request)[source]

Wraps boto3.DynamoDB.Client.scan().

Response always includes "Count" and "ScannedCount"

Parameters:
  • mode (str) -- "query" or "scan"
  • request -- Unpacked into boto3.DynamoDB.Client.scan()
search_items(mode, request)[source]

Invoke query/scan by name.

Response always includes "Count" and "ScannedCount"

Parameters:
  • mode (str) -- "query" or "scan"
  • request -- Unpacked into boto3.DynamoDB.Client.query() or boto3.DynamoDB.Client.scan()
validate_table(model)[source]

Polls until a creating table is ready, then verifies the description against the model's requirements.

The model may have a subset of all GSIs and LSIs on the table, but the key structure must be exactly the same. The table must have a stream if the model expects one, but not the other way around.

Parameters:model -- The BaseModel to validate the table of.
Raises:bloop.exceptions.TableMismatch -- When the table does not meet the constraints of the model.

Modeling

ModelMetaclass

class bloop.models.ModelMetaclass[source]

The metaclass for BaseModel. Binds model_name to each Column; validates key configuration; binds the model to each Index; populates model's Meta with modeling metadata (columns, keys, indexes, etc).

Index

class bloop.models.Index(*, projection, hash_key=None, range_key=None, name=None, **kwargs)[source]

Abstract base class for GSIs and LSIs.

An index needs to be bound to a model by calling Index._bind(model), which lets the index compute projected columns, validate hash and range keys, etc.

Parameters:
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.
  • hash_key -- The column that the index can be queried against. Always the table hash_key for LSIs.
  • range_key -- The column that the index can be sorted on. Always required for an LSI. Default is None.
  • name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.
dynamo_name

The name of this index in DynamoDB. Defaults to the index's model_name.

hash_key

The column that the index can be queried against. (LSI's hash_key is always the table hash_key.)

model

The model this index is attached to.

model_name

The name of this index in the model. Set during _bind().

projection

Computed during _bind().

{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}
range_key

The column that the index can be sorted on.

_bind(model)[source]

Compute attributes and resolve column names.

  • If hash and/or range keys are strings, resolve them to Column instances from the model by model_name.
  • If projection is a list of strings, resolve each to a Column instance.
  • Compute projection dict from model Metadata and Index's temporary projection attribute.
Parameters:model -- The BaseModel this Index is attached to.
Raises:bloop.exceptions.InvalidIndex -- If the hash or range keys are misconfigured.

Searching

PreparedSearch

class bloop.search.PreparedSearch[source]

Mutable search object.

Creates SearchModelIterator objects which can be used to iterate the results of a query or search multiple times.

prepare(engine=None, mode=None, model=None, index=None, key=None, filter=None, projection=None, consistent=None, forward=None, parallel=None)[source]

Validates the search parameters and builds the base request dict for each Query/Scan call.

SearchIterator

class bloop.search.SearchIterator(*, session, model, index, request, projected)[source]

Reusable search iterator.

Parameters:
  • session -- SessionWrapper to make Query, Scan calls.
  • model -- BaseModel for repr only.
  • index -- Index to search, or None.
  • request (dict) -- The base request dict for each search.
  • projected (set) -- Set of Column that should be included in each result.
count = None

Number of items that have been loaded from DynamoDB so far, including buffered items.

exhausted

True if there are no more results.

first()[source]

Return the first result. If there are no results, raises ConstraintViolation.

Returns:The first result.
Raises:bloop.exceptions.ConstraintViolation -- No results.
one()[source]

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns:The unique result.
Raises:bloop.exceptions.ConstraintViolation -- Not exactly one result.
reset()[source]

Reset to the initial state, clearing the buffer and zeroing count and scanned.

scanned = None

Number of items that DynamoDB evaluated, before any filter was applied.

SearchModelIterator

class bloop.search.SearchModelIterator(*, engine, model, index, request, projected)[source]

Reusable search iterator that unpacks result dicts into model instances.

Parameters:
  • engine -- Engine to unpack models with.
  • model -- BaseModel being searched.
  • index -- Index to search, or None.
  • request (dict) -- The base request dict for each search call.
  • projected (set) -- Set of Column that should be included in each result.
count

Number of items that have been loaded from DynamoDB so far, including buffered items.

exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

scanned

Number of items that DynamoDB evaluated, before any filter was applied.

Streaming

Coordinator

class bloop.stream.coordinator.Coordinator(*, session, stream_arn)[source]

Encapsulates the shard-level management for a whole Stream.

Parameters:
  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].
advance_shards()[source]

Poll active shards for records and insert them into the buffer. Rotate exhausted shards.

Returns immediately if the buffer isn't empty.

heartbeat()[source]

Keep active shards with "trim_horizon", "latest" iterators alive by advancing their iterators.

move_to(position)[source]

Set the Coordinator to a specific endpoint or time, or load state from a token.

Parameters:position -- "trim_horizon", "latest", Arrow, or a Coordinator.token
remove_shard(shard)[source]

Remove a Shard from the Coordinator. Drops all buffered records from the Shard.

If the Shard is active or a root, it is removed and any children promoted to those roles.

Parameters:shard -- The shard to remove :type shard: Shard
token

JSON-serializable representation of the current Stream state.

Use Engine.stream(YourModel, token) to create an identical stream, or stream.move_to(token) to move an existing stream to this position.

Returns:Stream state as a json-friendly dict
Return type:dict

Shard

class bloop.stream.shard.Shard(*, stream_arn, shard_id, iterator_id=None, iterator_type=None, sequence_number=None, parent=None, session=None)[source]

Encapsulates the record-level iterator management for a single Shard.

Parameters:
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].
  • shard_id (str) -- Shard id, usually from a DescribeStream call.
  • iterator_id (str) -- (Optional) An existing Shard iterator id. Default is None.
  • iterator_type (str) -- (Optional) The shard's iterator type, usually when loading from a token. One of "trim_horizon", "at_sequence", "after_sequence", or "latest". Default is None.
  • sequence_number (str) -- (Optional) SequenceNumber for an "at_sequence" or "after_sequence" iterator type. Default is None.
  • parent (Shard) -- (Optional) This shard's parent. Default is None.
  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.
exhausted

True if the shard is closed and there are no additional records to get.

get_records()[source]

Get the next set of records in this shard. An empty list doesn't guarantee the shard is exhausted.

Returns:A list of reformatted records. May be empty.
jump_to(*, iterator_type, sequence_number=None) → None[source]

Move to a new position in the shard using the standard parameters to GetShardIterator.

Parameters:
  • iterator_type (str) -- "trim_horizon", "at_sequence", "after_sequence", "latest"
  • sequence_number (str) -- (Optional) Sequence number to use with at/after sequence. Default is None.
load_children() → None[source]

If the Shard doesn't have any children, tries to find some from DescribeStream.

If the Shard is open this won't find any children, so an empty response doesn't mean the Shard will never have children.

seek_to(position)[source]

Move the Shard's iterator to the earliest record after the Arrow time.

Returns the first records at or past position. If the list is empty, the seek failed to find records, either because the Shard is exhausted or it reached the HEAD of an open Shard.

Parameters:position (Arrow) -- The position in time to move to.
Returns:A list of the first records found after position. May be empty.
token

JSON-serializable representation of the current Shard state.

The token is enough to rebuild the Shard as part of rebuilding a Stream.

Returns:Shard state as a json-friendly dict
Return type:dict
walk_tree()[source]

Generator that yields each Shard by walking the shard's children in order.

RecordBuffer

class bloop.stream.buffer.RecordBuffer[source]

Maintains a total ordering for records across any number of shards.

Methods are thin wrappers around heapq. Buffer entries have the form:

where total_ordering is a tuple of (created_at, sequence_number, monotomic_clock) created from each record as it is inserted.

clear()[source]

Drop the entire buffer.

clock()[source]

Returns a monotonically increasing integer.

Do not rely on the clock using a fixed increment.

>>> buffer.clock()
3
>>> buffer.clock()
40
>>> buffer.clock()
41
>>> buffer.clock()
300
Returns:A unique clock value guaranteed to be larger than every previous value
Return type:int
peek()[source]

A pop() without removing the (record, shard) from the buffer.

Returns:Oldest (record, shard) tuple.
pop()[source]

Pop the oldest (lowest total ordering) record and the shard it came from.

Returns:Oldest (record, shard) tuple.
push(record, shard)[source]

Push a new record into the buffer

Parameters:
  • record (dict) -- new record
  • shard (Shard) -- Shard the record came from
push_all(record_shard_pairs)[source]

Push multiple (record, shard) pairs at once, with only one heapq.heapify() call to maintain order.

Parameters:record_shard_pairs -- list of (record, shard) tuples (see push()).

Conditions

ReferenceTracker

class bloop.conditions.ReferenceTracker(engine)[source]

De-dupes reference names for the same path segments and generates unique placeholders for all names, paths, and values. The reference tracker can also forget references if, for example, a value fails to render but the rest of the condition should be left intact. This is primarily used when a value is unexpectedly dumped as None, or an expression uses another column as a value.

Parameters:engine (Engine) -- Used to dump column values for value refs.
any_ref(*, column, value=<Sentinel[missing]>, dumped=False, inner=False)[source]

Returns a NamedTuple of (name, type, value) for any type of reference.

# Name ref
>>> tracker.any_ref(column=User.email)
Reference(name='email', type='name', value=None)

# Value ref
>>> tracker.any_ref(column=User.email, value='user@domain')
Reference(name='email', type='value', value={'S': 'user@domain'})

# Passed as value ref, but value is another column
>>> tracker.any_ref(column=User.email, value=User.other_column)
Reference(name='other_column', type='name', value=None)
Parameters:
  • column (ComparisonMixin) -- The column to reference. If value is None, this will render a name ref for this column.
  • value -- (Optional) If provided, this is likely a value ref. If value is also a column, this will render a name ref for that column (not the column parameter).
  • dumped (bool) -- (Optional) True if the value has already been dumped and should not be dumped through the column's typedef again. Commonly used with atomic conditions (which store the object's dumped representation). Default is False.
  • inner (bool) -- (Optional) True if this is a value ref and it should be dumped through a collection's inner type, and not the collection type itself. Default is False.
Returns:

A name or value reference

Return type:

bloop.conditions.Reference

pop_refs(*refs)[source]

Decrement the usage of each ref by 1.

If this was the last use of a ref, remove it from attr_names or attr_values.

ConditionRenderer

class bloop.conditions.ConditionRenderer(engine)[source]

Renders collections of BaseCondition into DynamoDB's wire format for expressions, including:

  • "ConditionExpression" -- used in conditional operations
  • "FilterExpression" -- used in queries and scans to ignore results that don't match the filter
  • "KeyConditionExpressions" -- used to describe a query's hash (and range) key(s)
  • "ProjectionExpression" -- used to include a subset of possible columns in the results of a query or scan
  • "UpdateExpression" -- used to save objects

Normally, you will only need to call render() to handle any combination of conditions. You can also call each individual render_* function to control how multiple conditions of each type are applied.

You can collect the rendered condition at any time through rendered.

>>> renderer.render(obj=user, atomic=True)
>>> renderer.rendered
{'ConditionExpression': '((#n0 = :v1) AND (attribute_not_exists(#n2)) AND (#n4 = :v5))',
 'ExpressionAttributeNames': {'#n0': 'age', '#n2': 'email', '#n4': 'id'},
 'ExpressionAttributeValues': {':v1': {'N': '3'}, ':v5': {'S': 'some-user-id'}}}
Parameters:engine (Engine) -- Used to dump values in conditions into the appropriate wire format.
render(obj=None, condition=None, atomic=False, update=False, filter=None, projection=None, key=None)[source]

Main entry point for rendering multiple expressions. All parameters are optional, except obj when atomic or update are True.

Parameters:
  • obj -- (Optional) An object to render an atomic condition or update expression for. Required if update or atomic are true. Default is False.
  • condition (BaseCondition) -- (Optional) Rendered as a "ConditionExpression" for a conditional operation. If atomic is True, the two are rendered in an AND condition. Default is None.
  • atomic (bool) -- (Optional) True if an atomic condition should be created for obj and rendered as a "ConditionExpression". Default is False.
  • update (bool) -- (Optional) True if an "UpdateExpression" should be rendered for obj. Default is False.
  • filter (BaseCondition) -- (Optional) A filter condition for a query or scan, rendered as a "FilterExpression". Default is None.
  • projection (set Column) -- (Optional) A set of Columns to include in a query or scan, redered as a "ProjectionExpression". Default is None.
  • key (BaseCondition) -- (Optional) A key condition for queries, rendered as a "KeyConditionExpression". Default is None.
rendered

The rendered wire format for all conditions that have been rendered. Rendered conditions are never cleared. A new ConditionRenderer should be used for each operation.

Built-in Conditions

class bloop.conditions.BaseCondition(operation, *, column=None, values=None, dumped=False)[source]
class bloop.conditions.AndCondition(*values)[source]
class bloop.conditions.OrCondition(*values)[source]
class bloop.conditions.NotCondition(value)[source]
class bloop.conditions.ComparisonCondition(operation, column, value)[source]
class bloop.conditions.BeginsWithCondition(column, value)[source]
class bloop.conditions.BetweenCondition(column, lower, upper)[source]
class bloop.conditions.ContainsCondition(column, value)[source]
class bloop.conditions.InCondition(column, values)[source]
class bloop.conditions.ComparisonMixin(*args, *, proxied=None, path=None, **kwargs)[source]

Utilities

class bloop.util.Sentinel(name)[source]

Simple string-based placeholders for missing or special values.

Names are unique, and instances are re-used for the same name:

>>> from bloop.util import Sentinel
>>> empty = Sentinel("empty")
>>> empty
<Sentinel[empty]>
>>> same_token = Sentinel("empty")
>>> empty is same_token
True

This removes the need to import the same signal or placeholder value everywhere; two modules can create Sentinel("some-value") and refer to the same object. This is especially helpful where None is a possible value, and so can't be used to indicate omission of an optional parameter.

Implements __repr__ to render nicely in function signatures. Standard object-based sentinels:

>>> missing = object()
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<object object at 0x7f0f3f29e5d0>)

With the Sentinel class:

>>> from bloop.util import Sentinel
>>> missing = Sentinel("Missing")
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<Sentinel[Missing]>)
Parameters:name (str) -- The name for this sentinel.
class bloop.util.WeakDefaultDictionary(default_factory)[source]

The cross product of weakref.WeakKeyDictionary and collections.defaultdict.

Implementation Details

Models must be Hashable

By default python makes all user classes are hashable:

>>> class Dict(): pass
>>> hash(Dict())
8771845190811

Classes are unhashable in two cases:

  1. The class declares __hash__ = None.
  2. The class implements __eq__ but not __hash__

In the first case, Bloop will simply raise InvalidModel. In the second case, Bloop's ModelMetaclass manually locates a __hash__ method in the model's base classes:

for base in bases:
    hash_fn = getattr(base, "__hash__")
    if hash_fn:
        break
else:
    hash_fn = object.__hash__
attrs["__hash__"] = hash_fn

This is required because python doesn't provide a default hash method when __eq__ is implemented, and won't fall back to a parent class's definition:

>>> class Base():
...     def __hash__(self):
...         print("Base.__hash__")
...         return 0
...
>>> class Derived(Base):
...     def __eq__(self, other):
...         return True
...

>>> hash(Base())
Base.__hash__
>>> hash(Derived())
TypeError: unhashable type: 'Derived'

Stream Ordering Guarantees

The DynamoDB Streams API exposes a limited amount temporal information and few options for navigating within a shard. Due to these constraints, it was hard to reduce the API down to a single __next__ call without compromising performance or ordering.

The major challenges described below include:

  • Creating a plausible total ordering across shards

  • Managing an iterator:

    • Refreshing expired iterators without data loss
    • Preventing low-volume iterators without sequence numbers from expiring
    • Promoting children when a shard runs out of records
    • Distinguishing open shards from gaps between records
  • Managing multiple shards:

    • Mapping stream "trim_horizon" and "latest" to a set of shards
    • Buffering records from multiple shards and applying a total ordering
  • Loading and saving tokens:

    • Simplifying an entire stream into a human-readable json blob
    • Pruning old shards when loading
    • Inserting new shards when loading
    • Resolving TrimmedDataAccessException for old shards

The following sections use a custom notation to describe shards and records.

Sn and Rn represent shards and records, where n is an integer:

R11, R13, R32  # In general, RnX comes from Sn
S1, S12, S23   # In general, SnX is a child of Sn

< represents chronological ordering between records:

R12 < R13  # In general, RX < RX when X < Y

=> represents parent/child relationships between shards:

S1 => {}          # S1 has no children
S2 => S21         # S2 has one child
# In general, SnX and SnY are adjacent children of Sn
S3 => {S31, S32}

~ represents two shards that are not within the same lineage:

S1 ~ S2  # Not related

S1 => S12 => S13; S4 => S41
# Both child shards, but of different lineages
S12 ~ S41

: represents a set of records from a single shard:

S1: R11, R12   # no guaranteed order
S2: R23 < R24  # guaranteed order

Shards and Lineage

DynamoDB only offers three guarantees for chronological ordering:

  1. All records within a single Shard.
  2. All parent shard records are before all child shard records.
  3. Changes to the same hash will always go to the same shard. When a parent splits, further changes to that hash will go to only one child of that shard, and always the same child.

Given the following:

S1 ~ S2
S1: R11 < R12 < R13
R2: R24 < R25 < R26

The first rule offers no guarantees between R1x and R2x for any x.

Given the following:

S1 => {S12, S13}
S1:  R111 < R112
S12: R124 < R125
S13: R136 < R137

The second rule guarantees both of the following:

R111 < R112 < R124 < R125
R111 < R112 < R136 < R137

but does not guarantee any ordering between R12x and R13x for any x.

Given the following:

S1 => {S2, S3}
R40, R41, R42  # all modify the same hash key
R5, R7, R9     # modify different hash keys

S1: R40, R5

The third rule guarantees that R41 and R42 will both be in either S2 or S3. Meanwhile, it offers no guarantee about where R7 and R9 will be. Both of the following are possible:

S1: R40, R5
S2: R41, R42, R7
S3: R9

S1: R40, R5
S2: R7, R9
S3: R41, R42

But the following is not possible:

S1: R40, R5
S2: R41, R7
S3: R42, R9

Merging Shards

Low-throughput tables will only have a single open shard at any time, and can rely on the first and second guarantees above for rebuilding the exact order of changes to the table.

For high throughput tables, there can be more than one root shard, and each shard lineage can have more than one child open at once. In this case, Bloop's streaming interface can't guarantees ordering for all records in the stream, because there is no absolute chronological ordering across a partitioned table. Instead, Bloop will fall back to a total ordering scheme that uses each record's ApproximateCreationDateTime and, when two records have the same creation time, a monotonically increasing integral clock to break ties.

Consider the following stream:

S0 => {S1, S2}
S0: R00
S1: R11 < R12 < R13
S2: R24 < R25 < R26

Where each record has the following (simplified) creation times:

Record ApproximateCreationDateTime
R00 7 hours ago
R11 6 hours ago
R12 4 hours ago
R13 2 hours ago
R24 4 hours ago
R25 3 hours ago
R26 3 hours ago

Bloop performs the following in one step:

  1. The second guarantee says all records in S0 are before records in that shard's children:

    R00 < (R11, R12, R13, R24, R25, R26)
    
  2. The first guarantee says all records in the same shard are ordered:

    R00 < ((R11 < R12 < R13), (R24 < R25 < R26)
    
  3. Then, ApproximateCreationDateTime is used to partially merge S1 and S2 records:

    R00 < R11 < (R12, R24) < (R25 < R26) < R13
    
  4. There were still two collisions after using ApproximateCreationDateTime: R12, R24 and R25, R26.

    1. To resolve (R12, R24) Bloop breaks the tie with an incrementing clock, and assigns R12 < R24.
    2. (R25, R26) is resolved because the records are in the same shard.

The final ordering is:

R00 < R11 < R12 < R24 < R25 < R26 < R13

Record Gaps

Bloop initially performs up to 5 "catch up" calls to GetRecords when advancing an iterator. If a GetRecords call returns a NextShardIterator but no records it's either due to being nearly caught up to "latest" in an open shard, or from traversing a period of time in the shard with no activity. Endlessly polling until a record comes back would cause every open shard to hang for up to 4 hours, while only calling GetRecords once could desynchronize one shard's iterator from others.

By retrying up to 5 times on an empty GetRecords response (that still has a NextShardIterator) Bloop is confident that any gaps in the shard have been advanced. This is because it takes approximately 4-5 calls to traverse an empty shard completely. In other words, the 6th empty response almost certainly indicates that the iterator is caught up to latest in an open shard, and it's safe to cut back to one call at a time.

Why only 5 calls?

This number came from extensive testing which compared the number of empty responses returned for shards with various activity cadences. It's reasonable to assume that this number would only decrease with time, as advances in software and hardware would enable DynamoDB to cover larger periods in time with the same time investment. Because each call from a customer incurs overhead of creating and indexing each new iterator id, as well as the usual expensive signature-based authentication, it's in DynamoDB's interest to minimize the number of calls a customer needs to traverse a sparsely populated shard.

At worst DynamoDB starts requiring more calls to fully traverse an empty shard, which could result in reordering between records in shards with vastly different activity patterns. Since the creation-time-based ordering is approximate, this doesn't relax the guarantees that Bloop's streaming interface provides.

Changing the Limit

In general you should not need to worry about this value, and leave it alone. In the unlikely case that DynamoDB does increase the number of calls required to traverse an empty shard, Bloop will be updated soon after.

If you still need to tune this value:

import bloop.stream.shard
bloop.stream.shard.CALLS_TO_REACH_HEAD = 5

The exact value of this parameter will have almost no impact on performance in high-activity streams, and there are so few shards in low-activity streams that the total cost will be on par with the other calls to set up the stream.