# Copyright 2011 Canonical Ltd.
#
# This file is part of u1db.
#
# u1db is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# u1db is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with u1db. If not, see <http://www.gnu.org/licenses/>.
"""U1DB"""
try:
import simplejson as json
except ImportError:
import json # noqa
from u1db.errors import InvalidJSON, InvalidContent
__version_info__ = (0, 1, 4)
__version__ = '.'.join(map(str, __version_info__))
[docs]def open(path, create, document_factory=None):
"""Open a database at the given location.
Will raise u1db.errors.DatabaseDoesNotExist if create=False and the
database does not already exist.
:param path: The filesystem path for the database to open.
:param create: True/False, should the database be created if it doesn't
already exist?
:param document_factory: A function that will be called with the same
parameters as Document.__init__.
:return: An instance of Database.
"""
from u1db.backends import sqlite_backend
return sqlite_backend.SQLiteDatabase.open_database(
path, create=create, document_factory=document_factory)
# constraints on database names (relevant for remote access, as regex)
DBNAME_CONSTRAINTS = r"[a-zA-Z0-9][a-zA-Z0-9.-]*"
# constraints on doc ids (as regex)
# (no slashes, and no characters outside the ascii range)
DOC_ID_CONSTRAINTS = r"[a-zA-Z0-9.%_-]+"
[docs]class Database(object):
"""A JSON Document data store.
This data store can be synchronized with other u1db.Database instances.
"""
[docs] def set_document_factory(self, factory):
"""Set the document factory that will be used to create objects to be
returned as documents by the database.
:param factory: A function that returns an object which at minimum must
satisfy the same interface as does the class DocumentBase.
Subclassing that class is the easiest way to create such
a function.
"""
raise NotImplementedError(self.set_document_factory)
[docs] def set_document_size_limit(self, limit):
"""Set the maximum allowed document size for this database.
:param limit: Maximum allowed document size in bytes.
"""
raise NotImplementedError(self.set_document_size_limit)
[docs] def whats_changed(self, old_generation=0):
"""Return a list of documents that have changed since old_generation.
This allows APPS to only store a db generation before going
'offline', and then when coming back online they can use this
data to update whatever extra data they are storing.
:param old_generation: The generation of the database in the old
state.
:return: (generation, trans_id, [(doc_id, generation, trans_id),...])
The current generation of the database, its associated transaction
id, and a list of of changed documents since old_generation,
represented by tuples with for each document its doc_id and the
generation and transaction id corresponding to the last intervening
change and sorted by generation (old changes first)
"""
raise NotImplementedError(self.whats_changed)
[docs] def get_doc(self, doc_id, include_deleted=False):
"""Get the JSON string for the given document.
:param doc_id: The unique document identifier
:param include_deleted: If set to True, deleted documents will be
returned with empty content. Otherwise asking for a deleted
document will return None.
:return: a Document object.
"""
raise NotImplementedError(self.get_doc)
[docs] def get_docs(self, doc_ids, check_for_conflicts=True,
include_deleted=False):
"""Get the JSON content for many documents.
:param doc_ids: A list of document identifiers.
:param check_for_conflicts: If set to False, then the conflict check
will be skipped, and 'None' will be returned instead of True/False.
:param include_deleted: If set to True, deleted documents will be
returned with empty content. Otherwise deleted documents will not
be included in the results.
:return: iterable giving the Document object for each document id
in matching doc_ids order.
"""
raise NotImplementedError(self.get_docs)
[docs] def get_all_docs(self, include_deleted=False):
"""Get the JSON content for all documents in the database.
:param include_deleted: If set to True, deleted documents will be
returned with empty content. Otherwise deleted documents will not
be included in the results.
:return: (generation, [Document])
The current generation of the database, followed by a list of all
the documents in the database.
"""
raise NotImplementedError(self.get_all_docs)
[docs] def create_doc(self, content, doc_id=None):
"""Create a new document.
You can optionally specify the document identifier, but the document
must not already exist. See 'put_doc' if you want to override an
existing document.
If the database specifies a maximum document size and the document
exceeds it, create will fail and raise a DocumentTooBig exception.
:param content: A Python dictionary.
:param doc_id: An optional identifier specifying the document id.
:return: Document
"""
raise NotImplementedError(self.create_doc)
[docs] def create_doc_from_json(self, json, doc_id=None):
"""Create a new document.
You can optionally specify the document identifier, but the document
must not already exist. See 'put_doc' if you want to override an
existing document.
If the database specifies a maximum document size and the document
exceeds it, create will fail and raise a DocumentTooBig exception.
:param json: The JSON document string
:param doc_id: An optional identifier specifying the document id.
:return: Document
"""
raise NotImplementedError(self.create_doc_from_json)
[docs] def put_doc(self, doc):
"""Update a document.
If the document currently has conflicts, put will fail.
If the database specifies a maximum document size and the document
exceeds it, put will fail and raise a DocumentTooBig exception.
:param doc: A Document with new content.
:return: new_doc_rev - The new revision identifier for the document.
The Document object will also be updated.
"""
raise NotImplementedError(self.put_doc)
[docs] def delete_doc(self, doc):
"""Mark a document as deleted.
Will abort if the current revision doesn't match doc.rev.
This will also set doc.content to None.
"""
raise NotImplementedError(self.delete_doc)
[docs] def create_index(self, index_name, *index_expressions):
"""Create an named index, which can then be queried for future lookups.
Creating an index which already exists is not an error, and is cheap.
Creating an index which does not match the index_expressions of the
existing index is an error.
Creating an index will block until the expressions have been evaluated
and the index generated.
:param index_name: A unique name which can be used as a key prefix
:param index_expressions: index expressions defining the index
information.
Examples:
"fieldname", or "fieldname.subfieldname" to index alphabetically
sorted on the contents of a field.
"number(fieldname, width)", "lower(fieldname)"
"""
raise NotImplementedError(self.create_index)
[docs] def delete_index(self, index_name):
"""Remove a named index.
:param index_name: The name of the index we are removing
"""
raise NotImplementedError(self.delete_index)
[docs] def list_indexes(self):
"""List the definitions of all known indexes.
:return: A list of [('index-name', ['field', 'field2'])] definitions.
"""
raise NotImplementedError(self.list_indexes)
[docs] def get_from_index(self, index_name, *key_values):
"""Return documents that match the keys supplied.
You must supply exactly the same number of values as have been defined
in the index. It is possible to do a prefix match by using '*' to
indicate a wildcard match. You can only supply '*' to trailing entries,
(eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
It is also possible to append a '*' to the last supplied value (eg
'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
:param index_name: The index to query
:param key_values: values to match. eg, if you have
an index with 3 fields then you would have:
get_from_index(index_name, val1, val2, val3)
:return: List of [Document]
"""
raise NotImplementedError(self.get_from_index)
[docs] def get_range_from_index(self, index_name, start_value, end_value):
"""Return documents that fall within the specified range.
Both ends of the range are inclusive. For both start_value and
end_value, one must supply exactly the same number of values as have
been defined in the index, or pass None. In case of a single column
index, a string is accepted as an alternative for a tuple with a single
value. It is possible to do a prefix match by using '*' to indicate
a wildcard match. You can only supply '*' to trailing entries, (eg
'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
possible to append a '*' to the last supplied value (eg 'val*', '*',
'*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
:param index_name: The index to query
:param start_values: tuples of values that define the lower bound of
the range. eg, if you have an index with 3 fields then you would
have: (val1, val2, val3)
:param end_values: tuples of values that define the upper bound of the
range. eg, if you have an index with 3 fields then you would have:
(val1, val2, val3)
:return: List of [Document]
"""
raise NotImplementedError(self.get_range_from_index)
[docs] def get_index_keys(self, index_name):
"""Return all keys under which documents are indexed in this index.
:param index_name: The index to query
:return: [] A list of tuples of indexed keys.
"""
raise NotImplementedError(self.get_index_keys)
[docs] def get_doc_conflicts(self, doc_id):
"""Get the list of conflicts for the given document.
The order of the conflicts is such that the first entry is the value
that would be returned by "get_doc".
:return: [doc] A list of the Document entries that are conflicted.
"""
raise NotImplementedError(self.get_doc_conflicts)
[docs] def resolve_doc(self, doc, conflicted_doc_revs):
"""Mark a document as no longer conflicted.
We take the list of revisions that the client knows about that it is
superseding. This may be a different list from the actual current
conflicts, in which case only those are removed as conflicted. This
may fail if the conflict list is significantly different from the
supplied information. (sync could have happened in the background from
the time you GET_DOC_CONFLICTS until the point where you RESOLVE)
:param doc: A Document with the new content to be inserted.
:param conflicted_doc_revs: A list of revisions that the new content
supersedes.
"""
raise NotImplementedError(self.resolve_doc)
[docs] def get_sync_target(self):
"""Return a SyncTarget object, for another u1db to synchronize with.
:return: An instance of SyncTarget.
"""
raise NotImplementedError(self.get_sync_target)
[docs] def close(self):
"""Release any resources associated with this database."""
raise NotImplementedError(self.close)
[docs] def sync(self, url, creds=None, autocreate=True):
"""Synchronize documents with remote replica exposed at url.
:param url: the url of the target replica to sync with.
:param creds: optional dictionary giving credentials
to authorize the operation with the server. For using OAuth
the form of creds is:
{'oauth': {
'consumer_key': ...,
'consumer_secret': ...,
'token_key': ...,
'token_secret': ...
}}
:param autocreate: ask the target to create the db if non-existent.
:return: local_gen_before_sync The local generation before the
synchronisation was performed. This is useful to pass into
whatschanged, if an application wants to know which documents were
affected by a synchronisation.
"""
from u1db.sync import Synchronizer
from u1db.remote.http_target import HTTPSyncTarget
return Synchronizer(self, HTTPSyncTarget(url, creds=creds)).sync(
autocreate=autocreate)
def _get_replica_gen_and_trans_id(self, other_replica_uid):
"""Return the last known generation and transaction id for the other db
replica.
When you do a synchronization with another replica, the Database keeps
track of what generation the other database replica was at, and what
the associated transaction id was. This is used to determine what data
needs to be sent, and if two databases are claiming to be the same
replica.
:param other_replica_uid: The identifier for the other replica.
:return: (gen, trans_id) The generation and transaction id we
encountered during synchronization. If we've never synchronized
with the replica, this is (0, '').
"""
raise NotImplementedError(self._get_replica_gen_and_trans_id)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id):
"""Set the last-known generation and transaction id for the other
database replica.
We have just performed some synchronization, and we want to track what
generation the other replica was at. See also
_get_replica_gen_and_trans_id.
:param other_replica_uid: The U1DB identifier for the other replica.
:param other_generation: The generation number for the other replica.
:param other_transaction_id: The transaction id associated with the
generation.
"""
raise NotImplementedError(self._set_replica_gen_and_trans_id)
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id=''):
"""Insert/update document into the database with a given revision.
This api is used during synchronization operations.
If a document would conflict and save_conflict is set to True, the
content will be selected as the 'current' content for doc.doc_id,
even though doc.rev doesn't supersede the currently stored revision.
The currently stored document will be added to the list of conflict
alternatives for the given doc_id.
This forces the new content to be 'current' so that we get convergence
after synchronizing, even if people don't resolve conflicts. Users can
then notice that their content is out of date, update it, and
synchronize again. (The alternative is that users could synchronize and
think the data has propagated, but their local copy looks fine, and the
remote copy is never updated again.)
:param doc: A Document object
:param save_conflict: If this document is a conflict, do you want to
save it as a conflict, or just ignore it.
:param replica_uid: A unique replica identifier.
:param replica_gen: The generation of the replica corresponding to the
this document. The replica arguments are optional, but are used
during synchronization.
:param replica_trans_id: The transaction_id associated with the
generation.
:return: (state, at_gen) - If we don't have doc_id already,
or if doc_rev supersedes the existing document revision,
then the content will be inserted, and state is 'inserted'.
If doc_rev is less than or equal to the existing revision,
then the put is ignored and state is respecitvely 'superseded'
or 'converged'.
If doc_rev is not strictly superseded or supersedes, then
state is 'conflicted'. The document will not be inserted if
save_conflict is False.
For 'inserted' or 'converged', at_gen is the insertion/current
generation.
"""
raise NotImplementedError(self._put_doc_if_newer)
class DocumentBase(object):
"""Container for handling a single document.
:ivar doc_id: Unique identifier for this document.
:ivar rev: The revision identifier of the document.
:ivar json_string: The JSON string for this document.
:ivar has_conflicts: Boolean indicating if this document has conflicts
"""
def __init__(self, doc_id, rev, json_string, has_conflicts=False):
self.doc_id = doc_id
self.rev = rev
if json_string is not None:
try:
value = json.loads(json_string)
except json.JSONDecodeError:
raise InvalidJSON
if not isinstance(value, dict):
raise InvalidJSON
self._json = json_string
self.has_conflicts = has_conflicts
def same_content_as(self, other):
"""Compare the content of two documents."""
if self._json:
c1 = json.loads(self._json)
else:
c1 = None
if other._json:
c2 = json.loads(other._json)
else:
c2 = None
return c1 == c2
def __repr__(self):
if self.has_conflicts:
extra = ', conflicted'
else:
extra = ''
return '%s(%s, %s%s, %r)' % (self.__class__.__name__, self.doc_id,
self.rev, extra, self.get_json())
def __hash__(self):
raise NotImplementedError(self.__hash__)
def __eq__(self, other):
if not isinstance(other, Document):
return NotImplemented
return (
self.doc_id == other.doc_id and self.rev == other.rev and
self.same_content_as(other) and self.has_conflicts ==
other.has_conflicts)
def __lt__(self, other):
"""This is meant for testing, not part of the official api.
It is implemented so that sorted([Document, Document]) can be used.
It doesn't imply that users would want their documents to be sorted in
this order.
"""
# Since this is just for testing, we don't worry about comparing
# against things that aren't a Document.
return ((self.doc_id, self.rev, self.get_json())
< (other.doc_id, other.rev, other.get_json()))
def get_json(self):
"""Get the json serialization of this document."""
if self._json is not None:
return self._json
return None
def get_size(self):
"""Calculate the total size of the document."""
size = 0
json = self.get_json()
if json:
size += len(json)
if self.rev:
size += len(self.rev)
if self.doc_id:
size += len(self.doc_id)
return size
def set_json(self, json_string):
"""Set the json serialization of this document."""
if json_string is not None:
try:
value = json.loads(json_string)
except json.JSONDecodeError:
raise InvalidJSON
if not isinstance(value, dict):
raise InvalidJSON
self._json = json_string
def make_tombstone(self):
"""Make this document into a tombstone."""
self._json = None
def is_tombstone(self):
"""Return True if the document is a tombstone, False otherwise."""
if self._json is not None:
return False
return True
[docs]class Document(DocumentBase):
"""Container for handling a single document.
:ivar doc_id: Unique identifier for this document.
:ivar rev: The revision identifier of the document.
:ivar json: The JSON string for this document.
:ivar has_conflicts: Boolean indicating if this document has conflicts
"""
# The following part of the API is optional: no implementation is forced to
# have it but if the language supports dictionaries/hashtables, it makes
# Documents a lot more user friendly.
def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False):
# TODO: We convert the json in the superclass to check its validity so
# we might as well set _content here directly since the price is
# already being paid.
super(Document, self).__init__(doc_id, rev, json, has_conflicts)
self._content = None
[docs] def same_content_as(self, other):
"""Compare the content of two documents."""
if self._json:
c1 = json.loads(self._json)
else:
c1 = self._content
if other._json:
c2 = json.loads(other._json)
else:
c2 = other._content
return c1 == c2
[docs] def get_json(self):
"""Get the json serialization of this document."""
json_string = super(Document, self).get_json()
if json_string is not None:
return json_string
if self._content is not None:
return json.dumps(self._content)
return None
[docs] def set_json(self, json):
"""Set the json serialization of this document."""
self._content = None
super(Document, self).set_json(json)
[docs] def make_tombstone(self):
"""Make this document into a tombstone."""
self._content = None
super(Document, self).make_tombstone()
[docs] def is_tombstone(self):
"""Return True if the document is a tombstone, False otherwise."""
if self._content is not None:
return False
return super(Document, self).is_tombstone()
def _get_content(self):
"""Get the dictionary representing this document."""
if self._json is not None:
self._content = json.loads(self._json)
self._json = None
if self._content is not None:
return self._content
return None
def _set_content(self, content):
"""Set the dictionary representing this document."""
try:
tmp = json.dumps(content)
except TypeError:
raise InvalidContent(
"Can not be converted to JSON: %r" % (content,))
if not tmp.startswith('{'):
raise InvalidContent(
"Can not be converted to a JSON object: %r." % (content,))
# We might as well store the JSON at this point since we did the work
# of encoding it, and it doesn't lose any information.
self._json = tmp
self._content = None
content = property(
_get_content, _set_content, doc="Content of the Document.")
# End of optional part.
class SyncTarget(object):
"""Functionality for using a Database as a synchronization target."""
def get_sync_info(self, source_replica_uid):
"""Return information about known state.
Return the replica_uid and the current database generation of this
database, and the last-seen database generation for source_replica_uid
:param source_replica_uid: Another replica which we might have
synchronized with in the past.
:return: (target_replica_uid, target_replica_generation,
target_trans_id, source_replica_last_known_generation,
source_replica_last_known_transaction_id)
"""
raise NotImplementedError(self.get_sync_info)
def record_sync_info(self, source_replica_uid, source_replica_generation,
source_replica_transaction_id):
"""Record tip information for another replica.
After sync_exchange has been processed, the caller will have
received new content from this replica. This call allows the
source replica instigating the sync to inform us what their
generation became after applying the documents we returned.
This is used to allow future sync operations to not need to repeat data
that we just talked about. It also means that if this is called at the
wrong time, there can be database records that will never be
synchronized.
:param source_replica_uid: The identifier for the source replica.
:param source_replica_generation:
The database generation for the source replica.
:param source_replica_transaction_id: The transaction id associated
with the source replica generation.
"""
raise NotImplementedError(self.record_sync_info)
def sync_exchange(self, docs_by_generation, source_replica_uid,
last_known_generation, last_known_trans_id,
return_doc_cb, ensure_callback=None):
"""Incorporate the documents sent from the source replica.
This is not meant to be called by client code directly, but is used as
part of sync().
This adds docs to the local store, and determines documents that need
to be returned to the source replica.
Documents must be supplied in docs_by_generation paired with
the generation of their latest change in order from the oldest
change to the newest, that means from the oldest generation to
the newest.
Documents are also returned paired with the generation of
their latest change in order from the oldest change to the
newest.
:param docs_by_generation: A list of [(Document, generation,
transaction_id)] tuples indicating documents which should be
updated on this replica paired with the generation and transaction
id of their latest change.
:param source_replica_uid: The source replica's identifier
:param last_known_generation: The last generation that the source
replica knows about this target replica
:param last_known_trans_id: The last transaction id that the source
replica knows about this target replica
:param: return_doc_cb(doc, gen): is a callback
used to return documents to the source replica, it will
be invoked in turn with Documents that have changed since
last_known_generation together with the generation of
their last change.
:param: ensure_callback(replica_uid): if set the target may create
the target db if not yet existent, the callback can then
be used to inform of the created db replica uid.
:return: new_generation - After applying docs_by_generation, this is
the current generation for this replica
"""
raise NotImplementedError(self.sync_exchange)
def _set_trace_hook(self, cb):
"""Set a callback that will be invoked to trace database actions.
The callback will be passed a string indicating the current state, and
the sync target object. Implementations do not have to implement this
api, it is used by the test suite.
:param cb: A callable that takes cb(state)
"""
raise NotImplementedError(self._set_trace_hook)
def _set_trace_hook_shallow(self, cb):
"""Set a callback that will be invoked to trace database actions.
Similar to _set_trace_hook, for implementations that don't offer
state changes from the inner working of sync_exchange().
:param cb: A callable that takes cb(state)
"""
self._set_trace_hook(cb)