Using the Engine¶
The Engine
is the main way you'll interact with DynamoDB (and DynamoDBStreams).
Once you've defined some models, you're ready to start
loading
, saving
and
querying
.
Attention
This section uses the same User
model from the previous section. If you've haven't already done so,
go back and set that up.
Bind¶
As noted in the previous section, every model must first be bound to a backing table with
Engine.bind
before we can interact with instances in DynamoDB.
When an engine binds a model, it also binds all non-abstract subclasses. This means you can bind all models in one call, centralizing any error handling or table correction. For example, you may have specialized models for users, notifications, and impressions. Each of these can be grouped with an abstract base, and then all specialized models created at once:
class BaseUser(BaseModel):
class Meta:
abstract = True
class BaseNotification(BaseModel):
class Meta:
abstract = True
...
class Admin(BaseUser):
...
class Moderator(BaseUser):
...
class PriorityNotification(BaseNotification):
...
class EmailNotification(BaseNotification):
...
try:
engine.bind(BaseUser)
except TableMismatch:
print("Failed to bind all user models")
try:
engine.bind(BaseNotification)
except TableMismatch:
print("Failed to bind all notification models")
Now you can import a single base (BaseModel
or a subclass) from your models.py
module
and automatically bind any dynamic models created from that base.
Save¶
Save
is performed with UpdateItem since absolute overwrites (such as PutItem)
are rarely desired in a distributed, optimistic concurrency system. This is the central decision that enables a
table to back multiple models. A partial save allows a model to update an item in the table without accidentally
clearing the columns that model doesn't know about.
Saving an item or items is very simple:
>>> user = User(...)
>>> engine.save(user)
>>> tweet = Tweet(...)
>>> user.last_activity = arrow.now()
>>> engine.save(user, tweet)
You can perform optimistic saves with a condition
. If a condition is not met when DynamoDB tries to apply the
update, the update fails and bloop immediately raises ConstraintViolation
. Conditions are
specified on columns using the standard <, >=, ==, ...
operators, as well as
begins_with, between, contains, in_
. Conditions can be chained together and combined with bitwise operators
&, |, ~
:
>>> user = User(username="numberoverzero")
>>> username_available = User.username.is_(None)
>>> engine.save(user, condition=username_available)
# Success
>>> engine.save(user, condition=username_available)
Traceback (most recent call last):
...
ConstraintViolation: The condition was not met.
A common use for conditions is performing atomic updates. Save provides a shorthand for this, atomic=True
. By
default saves are not atomic. Bloop's specific definition of atomic is "only if the state in DynamoDB at time of
save is the same as the local state was aware of". If you create a new User and perform an atomic save, it will
fail if there was any previous state for that hash/range key (since the expected state before the save was
non-existent). If you fetch an object from a query which doesn't project all columns, only the columns that are
projected will be part of the atomic condition (not loading a column doesn't say whether we should expect it to have
a value or not).
See also
Atomic conditions can be tricky, and there are subtle edge cases. See the Atomic Conditions section of the User Guide for detailed examples of generated atomic conditions.
If you provide a condition
and atomic
is True, the atomic condition will be ANDed with the condition to
form a single ConditionExpression.
>>> is_verified = User.verified.is_(True)
>>> no_profile = User.profile.is_(None)
>>> engine.save(
... user,
... condition=(is_verified & no_profile),
... atomic=True)
Delete¶
Delete
has the same signature as save()
. Both
operations are mutations on an object that may or may not exist, and simply map to two different apis (Delete calls
DeleteItem). You can delete multiple objects at once, specify a condition
, and use the atomic=True
shorthand to only delete objects unchanged since you last loaded them from DynamoDB.
>>> engine.delete(user, tweet)
>>> engine.delete(tps_report, atomic=True)
>>> cutoff = arrow.now().repalce(years=-2)
>>> engine.delete(
... account,
... condition=Account.last_login < cutoff)
Load¶
Unlike most existing DynamoDB object mappers, Bloop does not create new instances when loading objects. This improves performance and makes atomic tracking much easier, and allows you to use thick or thin models by minimizing how many times the constructor is invoked for effectively the same object (same hash/range keys).
Like save()
and delete()
above,
Engine.load
takes a variable number of objects to load from DynamoDB:
>>> user = User(id="some-id")
>>> tweet = Tweet(user="some-id", id="some-tweet")
>>> engine.load(user, tweet)
If consistent
is True, then strongly consistent reads will be used:
>>> objs = user, tweet
>>> engine.load(*objs, consistent=True)
If any objects aren't loaded, Bloop raises MissingObjects
:
>>> user = User(username="not-real")
>>> engine.load(user)
Traceback (most recent call last):
...
MissingObjects: Failed to load some objects.
You can access MissingObjects.objects
to see which objects failed
to load.
Query¶
This section defines a new model to demonstrate the various filtering and conditions available:
class Account(BaseModel):
name = Column(String, hash_key=True)
number = Column(Integer, range_key=True)
created_on = Column(DateTime)
balance = Column(Number)
level = Column(Integer)
by_level = GlobalSecondaryIndex(
projection="all", hash_key=level)
by_balance = LocalSecondaryIndex(
projection=["created_on"], range_key="balance")
engine = Engine()
engine.bind(Account)
First¶
Often, you'll only need a single result from the query; with the correct sorting and indexes, the first result can
be used to get a maximum or minimum. Use first()
to get the first result,
if it exists. If there are no results, raises ConstraintViolation
.
>>> q = engine.query(Account,
... key=Account.name == "numberoverzero")
>>> q.first()
Account(name='numberoverzero', number=21623)
One¶
Similar to first()
, you can get the unique result of a query with
one()
. If there are no results, or more than one result, raises
ConstraintViolation
.
>>> q = engine.query(Account,
... key=Account.name == "numberoverzero")
>>> q.one()
Traceback (most recent call last):
...
ConstraintViolation: Query found more than one result.
Key Conditions¶
Queries can be performed against a Model or an Index. You must specify at least a hash key equality condition; a range key condition is optional.
>>> owned_by_stacy = Account.name == "Stacy"
>>> q = engine.query(Account, key=owned_by_stacy)
>>> for account in q:
... print(account)
...
Here, the query uses the Index's range_key to narrow the range of accounts to find:
>>> owned_by_stacy = Account.name == "Stacy"
>>> at_least_one_mil = Account.balance >= 1000000
>>> q = engine.query(Account.by_balance,
... key=owned_by_stacy & at_least_one_mil)
>>> for account in q:
... print(account.balance)
Note
A query must always include an equality check ==
or is_
against the model or index's hash key.
If you want to include a condition on the range key, it can be one of ==, <, <=, >, >=, between, begins_with
.
See the KeyConditionExpression parameter of the Query operation in the Developer's Guide.
Filtering¶
If you provide a filter
condition, DynamoDB only returns items that match the filter. Conditions can be on
any column -- except the hash and range key being queried -- projected into the Index. All non-key columns are
available for queries against a model. A filter condition can use any condition operations.
Here is the same LSI query as above, but now excluding accounts created in the last 30 days:
>>> key_condition = owned_by_stacy & at_least_one_mil
>>> exclude_recent = Account.created_on < arrow.now().replace(days=-30)
>>> q = engine.query(Account.by_balance,
... key=key_condition,
... filter=exclude_recent)
Warning
Trying to use a column that's not part of an Index's projection will raise
InvalidFilterCondition
, since the value can't be loaded. This does not apply to queries
against an LSI with strict=False
, which will consume additional reads to apply the filter.
>>> q = engine.query(Account.by_balance,
... key=key_condition,
... filter=Account.level == 3)
Traceback (most recent call last):
...
InvalidFilterCondition: <Column[Account.level]> is not available for the projection.
Projections¶
By default, queries return all columns projected into the index or model. You can use the projection
parameter
to control which columns are returned for each object. This must be "all" to include everything in the index or
model's projection, or a list of columns or column model names to include. Use "count" to get the number of results
that match the query.
>>> q = engine.query(Account,
... key=key_condition,
... projection=["email", "balance"])
>>> account = q.first()
>>> account.email
'user@domain.com'
>>> account.balance
Decimal('3400')
>>> account.level
Traceback (most recent call last):
...
AttributeError: ...
Because the projection did not include Account.level
, it was not loaded on the account object.
Configuration Options¶
The remaining options are consistent
and forward
. When consistent
is True,
strongly consistent reads are used. By default, consistent is False. Use forward
to query ascending
or descending. By default forward
is True, or ascending.
Iterator State¶
The QueryIterator
exposes a number of properties to inspect its current progress:
count
-- the number of items loaded from DynamoDB so far, including buffered items.exhausted
-- True if there are no more resultsscanned
-- the number of items DynamoDB evaluated, before applying any filter condition.
To restart a query, use QueryIterator.reset()
:
>>> query = engine.query(...)
>>> unique = query.one()
>>> query.exhausted
True
>>> query.reset()
>>> query.exhausted
False
>>> same = query.one()
>>> unique == same # Assume we implemented __eq__
True
Scan¶
Scan and Query share a very similar interface. Unlike Query, Scan does not have a key condition and can't be performed in descending order. Scans can be performed in parallel, however.
Using the same model from Query, we can scan the model or an index:
>>> for account in engine.scan(Account):
... print(account.email)
...
>>> for account in engine.scan(Account.by_email):
... print(account.email)
And get the first, or unique result:
>>> some_account = engine.scan(Account).first()
>>> one_account = engine.scan(Account).one()
Traceback (most recent call last):
...
ConstraintViolation: Scan found more than one result.
Use filter
and projection
to exclude items and control which columns are included in results:
>>> scan = engine.scan(Account,
... filter=Account.email.contains("@"),
... projection=["level", "email"])
And consistent
to use strongly consistent reads:
>>> scan = engine.scan(Account.by_balance, consistent=True)
Parallel Scans¶
Scans can be performed in parallel, using the parallel
parameter. To specify which segment you are
constructing the scan for, pass a tuple of (Segment, TotalSegments)
:
>>> first_segment = engine.scan(Account, parallel=(0, 2))
>>> second_segment = engine.scan(Account, parallel=(1, 2))
You can easily construct a parallel scan with s
segments by calling engine.scan in a loop:
def parallelize(s, engine, *args, **kwargs):
for i in range(s):
kwargs["parallel"] = (i, s)
yield engine.scan(*args, **kargs)
workers = scan_workers(n=10)
scans = parallelize(10, engine, Account, filter=...)
for worker, scan in zip(threads, scans):
worker.process(scan)
Stream¶
Note
Before you can create a stream on a model, you need to enable it in the model's Meta. For a detailed guide to using streams, head over to the Streams section of the User Guide.
To start from the beginning or end of the stream, use "trim_horizon" and "latest":
>>> stream = engine.stream(User, position="trim_horizon")
>>> stream = engine.stream(Account, "latest")
Alternatively, you can use an existing stream token to reload its previous state:
>>> same_stream = engine.stream(
... Impression, previous_stream.token)
Lastly, you can use an arrow datetime. This is an expensive call, and walks the entire stream from the trim horizon until it finds the first record in each shard after the target datetime.
>>> yesterday = arrow.now().replace(hours=-12)
>>> stream = engine.stream(User, yesterday)