Using the Engine

The Engine is the main way you'll interact with DynamoDB (and DynamoDBStreams). Once you've defined some models, you're ready to start loading, saving and querying.

Attention

This section uses the same User model from the previous section. If you've haven't already done so, go back and set that up.

Bind

As noted in the previous section, every model must first be bound to a backing table with Engine.bind before we can interact with instances in DynamoDB.

When an engine binds a model, it also binds all non-abstract subclasses. This means you can bind all models in one call, centralizing any error handling or table correction. For example, you may have specialized models for users, notifications, and impressions. Each of these can be grouped with an abstract base, and then all specialized models created at once:

class BaseUser(BaseModel):
    class Meta:
        abstract = True

class BaseNotification(BaseModel):
    class Meta:
        abstract = True

...

class Admin(BaseUser):
    ...

class Moderator(BaseUser):
    ...

class PriorityNotification(BaseNotification):
    ...

class EmailNotification(BaseNotification):
    ...


try:
    engine.bind(BaseUser)
except TableMismatch:
    print("Failed to bind all user models")

try:
    engine.bind(BaseNotification)
except TableMismatch:
    print("Failed to bind all notification models")

Now you can import a single base (BaseModel or a subclass) from your models.py module and automatically bind any dynamic models created from that base.

Save

Save is performed with UpdateItem since absolute overwrites (such as PutItem) are rarely desired in a distributed, optimistic concurrency system. This is the central decision that enables a table to back multiple models. A partial save allows a model to update an item in the table without accidentally clearing the columns that model doesn't know about.

Saving an item or items is very simple:

>>> user = User(...)
>>> engine.save(user)
>>> tweet = Tweet(...)
>>> user.last_activity = arrow.now()
>>> engine.save(user, tweet)

You can perform optimistic saves with a condition. If a condition is not met when DynamoDB tries to apply the update, the update fails and bloop immediately raises ConstraintViolation. Conditions are specified on columns using the standard <, >=, ==, ... operators, as well as begins_with, between, contains, in_. Conditions can be chained together and combined with bitwise operators &, |, ~:

>>> user = User(username="numberoverzero")
>>> username_available = User.username.is_(None)
>>> engine.save(user, condition=username_available)
# Success
>>> engine.save(user, condition=username_available)
Traceback (most recent call last):
  ...
ConstraintViolation: The condition was not met.

A common use for conditions is performing atomic updates. Save provides a shorthand for this, atomic=True. By default saves are not atomic. Bloop's specific definition of atomic is "only if the state in DynamoDB at time of save is the same as the local state was aware of". If you create a new User and perform an atomic save, it will fail if there was any previous state for that hash/range key (since the expected state before the save was non-existent). If you fetch an object from a query which doesn't project all columns, only the columns that are projected will be part of the atomic condition (not loading a column doesn't say whether we should expect it to have a value or not).

See also

Atomic conditions can be tricky, and there are subtle edge cases. See the Atomic Conditions section of the User Guide for detailed examples of generated atomic conditions.

If you provide a condition and atomic is True, the atomic condition will be ANDed with the condition to form a single ConditionExpression.

>>> is_verified = User.verified.is_(True)
>>> no_profile = User.profile.is_(None)
>>> engine.save(
...     user,
...     condition=(is_verified & no_profile),
...     atomic=True)

Delete

Delete has the same signature as save(). Both operations are mutations on an object that may or may not exist, and simply map to two different apis (Delete calls DeleteItem). You can delete multiple objects at once, specify a condition, and use the atomic=True shorthand to only delete objects unchanged since you last loaded them from DynamoDB.

>>> engine.delete(user, tweet)
>>> engine.delete(tps_report, atomic=True)
>>> cutoff = arrow.now().repalce(years=-2)
>>> engine.delete(
...     account,
...     condition=Account.last_login < cutoff)

Load

Unlike most existing DynamoDB object mappers, Bloop does not create new instances when loading objects. This improves performance and makes atomic tracking much easier, and allows you to use thick or thin models by minimizing how many times the constructor is invoked for effectively the same object (same hash/range keys).

Like save() and delete() above, Engine.load takes a variable number of objects to load from DynamoDB:

>>> user = User(id="some-id")
>>> tweet = Tweet(user="some-id", id="some-tweet")
>>> engine.load(user, tweet)

If consistent is True, then strongly consistent reads will be used:

>>> objs = user, tweet
>>> engine.load(*objs, consistent=True)

If any objects aren't loaded, Bloop raises MissingObjects:

>>> user = User(username="not-real")
>>> engine.load(user)
Traceback (most recent call last):
  ...
MissingObjects: Failed to load some objects.

You can access MissingObjects.objects to see which objects failed to load.

Query

This section defines a new model to demonstrate the various filtering and conditions available:

class Account(BaseModel):
    name = Column(String, hash_key=True)
    number = Column(Integer, range_key=True)
    created_on = Column(DateTime)
    balance = Column(Number)
    level = Column(Integer)

    by_level = GlobalSecondaryIndex(
        projection="all", hash_key=level)

    by_balance = LocalSecondaryIndex(
        projection=["created_on"], range_key="balance")

engine = Engine()
engine.bind(Account)

First

Often, you'll only need a single result from the query; with the correct sorting and indexes, the first result can be used to get a maximum or minimum. Use first() to get the first result, if it exists. If there are no results, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.first()
Account(name='numberoverzero', number=21623)

One

Similar to first(), you can get the unique result of a query with one(). If there are no results, or more than one result, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.one()
Traceback (most recent call last):
    ...
ConstraintViolation: Query found more than one result.

Key Conditions

Queries can be performed against a Model or an Index. You must specify at least a hash key equality condition; a range key condition is optional.

>>> owned_by_stacy = Account.name == "Stacy"
>>> q = engine.query(Account, key=owned_by_stacy)
>>> for account in q:
...     print(account)
...

Here, the query uses the Index's range_key to narrow the range of accounts to find:

>>> owned_by_stacy = Account.name == "Stacy"
>>> at_least_one_mil = Account.balance >= 1000000
>>> q = engine.query(Account.by_balance,
...     key=owned_by_stacy & at_least_one_mil)
>>> for account in q:
...     print(account.balance)

Note

A query must always include an equality check == or is_ against the model or index's hash key. If you want to include a condition on the range key, it can be one of ==, <, <=, >, >=, between, begins_with.

See the KeyConditionExpression parameter of the Query operation in the Developer's Guide.

Filtering

If you provide a filter condition, DynamoDB only returns items that match the filter. Conditions can be on any column -- except the hash and range key being queried -- projected into the Index. All non-key columns are available for queries against a model. A filter condition can use any condition operations. Here is the same LSI query as above, but now excluding accounts created in the last 30 days:

>>> key_condition = owned_by_stacy & at_least_one_mil
>>> exclude_recent = Account.created_on < arrow.now().replace(days=-30)
>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=exclude_recent)

Warning

Trying to use a column that's not part of an Index's projection will raise InvalidFilterCondition, since the value can't be loaded. This does not apply to queries against an LSI with strict=False, which will consume additional reads to apply the filter.

>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=Account.level == 3)
Traceback (most recent call last):
  ...
InvalidFilterCondition: <Column[Account.level]> is not available for the projection.

Projections

By default, queries return all columns projected into the index or model. You can use the projection parameter to control which columns are returned for each object. This must be "all" to include everything in the index or model's projection, or a list of columns or column model names to include. Use "count" to get the number of results that match the query.

>>> q = engine.query(Account,
...     key=key_condition,
...     projection=["email", "balance"])
>>> account = q.first()
>>> account.email
'user@domain.com'
>>> account.balance
Decimal('3400')
>>> account.level
Traceback (most recent call last):
    ...
AttributeError: ...

Because the projection did not include Account.level, it was not loaded on the account object.

Configuration Options

The remaining options are consistent and forward. When consistent is True, strongly consistent reads are used. By default, consistent is False. Use forward to query ascending or descending. By default forward is True, or ascending.

Iterator State

The QueryIterator exposes a number of properties to inspect its current progress:

  • count -- the number of items loaded from DynamoDB so far, including buffered items.
  • exhausted -- True if there are no more results
  • scanned -- the number of items DynamoDB evaluated, before applying any filter condition.

To restart a query, use QueryIterator.reset():

>>> query = engine.query(...)
>>> unique = query.one()
>>> query.exhausted
True
>>> query.reset()
>>> query.exhausted
False
>>> same = query.one()
>>> unique == same  # Assume we implemented __eq__
True

Scan

Scan and Query share a very similar interface. Unlike Query, Scan does not have a key condition and can't be performed in descending order. Scans can be performed in parallel, however.

Using the same model from Query, we can scan the model or an index:

>>> for account in engine.scan(Account):
...     print(account.email)
...
>>> for account in engine.scan(Account.by_email):
...     print(account.email)

And get the first, or unique result:

>>> some_account = engine.scan(Account).first()
>>> one_account = engine.scan(Account).one()
Traceback (most recent call last):
    ...
ConstraintViolation: Scan found more than one result.

Use filter and projection to exclude items and control which columns are included in results:

>>> scan = engine.scan(Account,
...     filter=Account.email.contains("@"),
...     projection=["level", "email"])

And consistent to use strongly consistent reads:

>>> scan = engine.scan(Account.by_balance, consistent=True)

Parallel Scans

Scans can be performed in parallel, using the parallel parameter. To specify which segment you are constructing the scan for, pass a tuple of (Segment, TotalSegments):

>>> first_segment = engine.scan(Account, parallel=(0, 2))
>>> second_segment = engine.scan(Account, parallel=(1, 2))

You can easily construct a parallel scan with s segments by calling engine.scan in a loop:

def parallelize(s, engine, *args, **kwargs):
    for i in range(s):
        kwargs["parallel"] = (i, s)
        yield engine.scan(*args, **kargs)

workers = scan_workers(n=10)
scans = parallelize(10, engine, Account, filter=...)
for worker, scan in zip(threads, scans):
    worker.process(scan)

Stream

Note

Before you can create a stream on a model, you need to enable it in the model's Meta. For a detailed guide to using streams, head over to the Streams section of the User Guide.

To start from the beginning or end of the stream, use "trim_horizon" and "latest":

>>> stream = engine.stream(User, position="trim_horizon")
>>> stream = engine.stream(Account, "latest")

Alternatively, you can use an existing stream token to reload its previous state:

>>> same_stream = engine.stream(
...     Impression, previous_stream.token)

Lastly, you can use an arrow datetime. This is an expensive call, and walks the entire stream from the trim horizon until it finds the first record in each shard after the target datetime.

>>> yesterday = arrow.now().replace(hours=-12)
>>> stream = engine.stream(User, yesterday)