Source code for txcouchbase.bucket

# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file contains the twisted-specific bits for the Couchbase client.
"""

from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

from couchbase.async.bucket import AsyncBucket
from couchbase.async.view import AsyncViewBase
from couchbase.async.n1ql import AsyncN1QLRequest
from couchbase.async.events import EventQueue
from couchbase.exceptions import CouchbaseError
from txcouchbase.iops import v0Iops


class BatchedRowMixin(object):
    def __init__(self, *args, **kwargs):
        """
        Iterator/Container object for a single-call row-based results.

        This functions as an iterator over all results of the query, once the
        query has been completed.

        Additional metadata may be obtained by examining the object. See
        :class:`~couchbase.views.iterator.Views` for more details.

        You will normally not need to construct this object manually.
        """
        self._d = Deferred()
        self.__rows = [] # likely a superlcass might have this?

    def _getDeferred(self):
        return self._d

    def start(self):
        super(BatchedRowMixin, self).start()
        self.raw.rows_per_call = -1
        return self

    def on_rows(self, rowiter):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_rows`
        """
        self.__rows = rowiter
        self._d.callback(self)
        self._d = None

    def on_error(self, ex):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_error`
        """
        if self._d:
            self._d.errback()
            self._d = None

    def on_done(self):
        """
        Reimplemented from :meth:`~AsyncViewBase.on_done`
        """
        if self._d:
            self._d.callback(self)
            self._d = None

    def __iter__(self):
        """
        Iterate over the rows in this resultset
        """
        return iter(self.__rows)


[docs]class BatchedView(BatchedRowMixin, AsyncViewBase):
[docs] def __init__(self, *args, **kwargs): AsyncViewBase.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs)
class BatchedN1QLRequest(BatchedRowMixin, AsyncN1QLRequest): def __init__(self, *args, **kwargs): AsyncN1QLRequest.__init__(self, *args, **kwargs) BatchedRowMixin.__init__(self, *args, **kwargs) class TxEventQueue(EventQueue): """ Subclass of EventQueue. This implements the relevant firing methods, treating an 'Event' as a 'Deferred' """ def fire_async(self, event): reactor.callLater(0, event.callback, None) def call_single_success(self, event, *args, **kwargs): event.callback(None) def call_single_failure(self, event, *args, **kwargs): event.errback(None) class ConnectionEventQueue(TxEventQueue): """ For events fired upon connect """ def maybe_raise(self, err, *args, **kwargs): if not err: return raise err
[docs]class RawBucket(AsyncBucket):
[docs] def __init__(self, connstr=None, **kwargs): """ Bucket subclass for Twisted. This inherits from the 'AsyncBucket' class, but also adds some twisted-specific logic for hooking on a connection. """ if connstr and 'connstr' not in kwargs: kwargs['connstr'] = connstr iops = v0Iops(reactor) super(RawBucket, self).__init__(iops=iops, **kwargs) self._evq = { 'connect': ConnectionEventQueue(), '_dtor': TxEventQueue() } self._conncb = self._evq['connect'] self._dtorcb = self._evq['_dtor']
[docs] def registerDeferred(self, event, d): """ Register a defer to be fired at the firing of a specific event. :param string event: Currently supported values are `connect`. Another value may be `_dtor` which will register an event to fire when this object has been completely destroyed. :param event: The defered to fire when the event succeeds or failes :type event: :class:`Deferred` If this event has already fired, the deferred will be triggered asynchronously. Example:: def on_connect(*args): print("I'm connected") def on_connect_err(*args): print("Connection failed") d = Deferred() cb.registerDeferred('connect', d) d.addCallback(on_connect) d.addErrback(on_connect_err) :raise: :exc:`ValueError` if the event name is unrecognized """ try: self._evq[event].schedule(d) except KeyError: raise ValueError("No such event type", event)
[docs] def connect(self): """ Short-hand for the following idiom:: d = Deferred() cb.registerDeferred('connect', d) return d :return: A :class:`Deferred` """ d = Deferred() self.registerDeferred('connect', d) return d
[docs] def defer(self, opres): """ Converts a raw :class:`couchbase.results.AsyncResult` object into a :class:`Deferred`. This is shorthand for the following "non-idiom":: d = Deferred() opres = cb.upsert("foo", "bar") opres.callback = d.callback def d_err(res, ex_type, ex_val, ex_tb): d.errback(opres, ex_type, ex_val, ex_tb) opres.errback = d_err return d :param opres: The operation to wrap :type opres: :class:`couchbase.results.AsyncResult` :return: a :class:`Deferred` object. Example:: opres = cb.upsert("foo", "bar") d = cb.defer(opres) def on_ok(res): print("Result OK. Cas: {0}".format(res.cas)) d.addCallback(opres) """ d = Deferred() opres.callback = d.callback def _on_err(mres, ex_type, ex_val, ex_tb): try: raise ex_type(ex_val) except CouchbaseError: d.errback() opres.errback = _on_err return d
def queryEx(self, viewcls, *args, **kwargs): """ Query a view, with the ``viewcls`` instance receiving events of the query as they arrive. :param type viewcls: A class (derived from :class:`AsyncViewBase`) to instantiate Other arguments are passed to the standard `query` method. This functions exactly like the :meth:`~couchbase.async.AsyncBucket.query` method, except it automatically schedules operations if the connection has not yet been negotiated. """ kwargs['itercls'] = viewcls o = super(AsyncBucket, self).query(*args, **kwargs) if not self.connected: self.connect().addCallback(lambda x: o.start()) else: o.start() return o def queryAll(self, *args, **kwargs): """ Returns a :class:`Deferred` object which will have its callback invoked with a :class:`BatchedView` when the results are complete. Parameters follow conventions of :meth:`~couchbase.bucket.Bucket.query`. Example:: d = cb.queryAll("beer", "brewery_beers") def on_all_rows(rows): for row in rows: print("Got row {0}".format(row)) d.addCallback(on_all_rows) """ if not self.connected: cb = lambda x: self.queryAll(*args, **kwargs) return self.connect().addCallback(cb) kwargs['itercls'] = BatchedView o = super(RawBucket, self).query(*args, **kwargs) o.start() return o._getDeferred() def n1qlQueryEx(self, cls, *args, **kwargs): """ Execute a N1QL statement providing a custom handler for rows. This method allows you to define your own subclass (of :class:`~AsyncN1QLRequest`) which can handle rows as they are received from the network. :param cls: The subclass (not instance) to use :param args: Positional arguments for the class constructor :param kwargs: Keyword arguments for the class constructor .. seealso:: :meth:`queryEx`, around which this method wraps """ kwargs['itercls'] = cls o = super(AsyncBucket, self).n1ql_query(*args, **kwargs) if not self.connected: self.connect().addCallback(lambda x: o.start()) else: o.start() return o def n1qlQueryAll(self, *args, **kwargs): """ Execute a N1QL query, retrieving all rows. This method returns a :class:`Deferred` object which is executed with a :class:`~.N1QLRequest` object. The object may be iterated over to yield the rows in the result set. This method is similar to :meth:`~couchbase.bucket.Bucket.n1ql_query` in its arguments. Example:: def handler(req): for row in req: # ... handle row d = cb.n1qlQueryAll('SELECT * from `travel-sample` WHERE city=$1`, 'Reno') d.addCallback(handler) :return: A :class:`Deferred` .. seealso:: :meth:`~couchbase.bucket.Bucket.n1ql_query` """ if not self.connected: cb = lambda x: self.n1qlQueryAll(*args, **kwargs) return self.connect().addCallback(cb) kwargs['itercls'] = BatchedN1QLRequest o = super(RawBucket, self).n1ql_query(*args, **kwargs) o.start() return o._getDeferred()
[docs]class Bucket(RawBucket):
[docs] def __init__(self, *args, **kwargs): """ This class inherits from :class:`RawBucket`. In addition to the connection methods, this class' data access methods return :class:`Deferreds` instead of :class:`AsyncResult` objects. Operations such as :meth:`get` or :meth:`set` will invoke the :attr:`Deferred.callback` with the result object when the result is complete, or they will invoke the :attr:`Deferred.errback` with an exception (or :class:`Failure`) in case of an error. The rules of the :attr:`~couchbase.connection.Connection.quiet` attribute for raising exceptions apply to the invocation of the ``errback``. This means that in the case where the synchronous client would raise an exception, the Deferred API will have its ``errback`` invoked. Otherwise, the result's :attr:`~couchbase.result.Result.success` field should be inspected. Likewise multi operations will be invoked with a :class:`~couchbase.result.MultiResult` compatible object. Some examples: Using single items:: d_set = cb.upsert("foo", "bar") d_get = cb.get("foo") def on_err_common(*args): print("Got an error: {0}".format(args)), def on_set_ok(res): print("Successfuly set key with CAS {0}".format(res.cas)) def on_get_ok(res): print("Successfuly got key with value {0}".format(res.value)) d_set.addCallback(on_set_ok).addErrback(on_err_common) d_get.addCallback(on_get_ok).addErrback(on_get_common) # Note that it is safe to do this as operations performed on the # same key are *always* performed in the order they were scheduled. Using multiple items:: d_get = cb.get_multi(("Foo", "bar", "baz)) def on_mres(mres): for k, v in mres.items(): print("Got result for key {0}: {1}".format(k, v.value)) d.addCallback(mres) """ super(Bucket, self).__init__(*args, **kwargs)
def _connectSchedule(self, f, meth, *args, **kwargs): qop = Deferred() qop.addCallback(lambda x: f(meth, *args, **kwargs)) self._evq['connect'].schedule(qop) return qop def _wrap(self, meth, *args, **kwargs): """ Calls a given method with the appropriate arguments, or defers such a call until the instance has been connected """ if not self.connected: return self._connectSchedule(self._wrap, meth, *args, **kwargs) opres = meth(self, *args, **kwargs) return self.defer(opres) ### Generate the methods def _meth_factory(meth, name): def ret(self, *args, **kwargs): return self._wrap(meth, *args, **kwargs) return ret locals().update(RawBucket._gen_memd_wrappers(_meth_factory)) for x in RawBucket._MEMCACHED_OPERATIONS: if locals().get(x+'_multi', None): locals().update({x+"Multi": locals()[x+"_multi"]})