Source code for gcouchbase.bucket

from gevent import GreenletExit
from gevent.event import AsyncResult, Event
from gevent.hub import get_hub, getcurrent, Waiter

from couchbase.async.bucket import AsyncBucket
from couchbase.async.view import AsyncViewBase
from couchbase.async.n1ql import AsyncN1QLRequest
from couchbase.views.iterator import AlreadyQueriedError
try:
    from gcouchbase.iops_gevent0x import IOPS
except ImportError:
    from gcouchbase.iops_gevent10 import IOPS


class GRowsHandler(object):
    def __init__(self):
        """
        Subclass of :class:`~.AsyncViewBase`
        This doesn't expose an API different from the normal
        synchronous view API. It's just implemented differently
        """

        # We use __double_underscore to mangle names. This is because
        # the views class has quite a bit of data attached to it.
        self.__waiter = Waiter()
        self.__raw_rows = []
        self.__done_called = False
        self.start()
        self.raw.rows_per_call = 100000

    def _callback(self, *args):
        # This method overridden from the parent. Rather than do the processing
        # on demand, we must defer it for later. This is done by copying the
        # rows to a list. In the typical case we shouldn't accumulate all
        # the rows in the buffer (since .switch() will typically have something
        # waiting for us). However if the view is destroyed prematurely,
        # or if the user is not actively iterating over us, or if something
        # else happens (such as more rows arriving during a get request with
        # include_docs), we simply accumulate the rows here.
        self.__raw_rows.append(self.raw.rows)
        if self.raw.done:
            self._clear()
        self.__waiter.switch()

    def _errback(self, mres, *args):
        self._clear()
        self.__waiter.throw(*args)

    def __iter__(self):
        if not self._do_iter:
            raise AlreadyQueriedError.pyexc("Already queried")

        while self._do_iter and not self.__done_called:
            self.__waiter.get()

            rowset_list = self.__raw_rows
            self.__raw_rows = []
            for rowset in rowset_list:
                for row in self._process_payload(rowset):
                    yield row

        self._do_iter = False


class GView(GRowsHandler, AsyncViewBase):
    def __init__(self, *args, **kwargs):
        AsyncViewBase.__init__(self, *args, **kwargs)
        GRowsHandler.__init__(self)


class GN1QLRequest(GRowsHandler, AsyncN1QLRequest):
    def __init__(self, *args, **kwargs):
        AsyncN1QLRequest.__init__(self, *args, **kwargs)
        GRowsHandler.__init__(self)


def dummy_callback(*args):
    pass


[docs]class Bucket(AsyncBucket): def __init__(self, *args, **kwargs): """ This class is a 'GEvent'-optimized subclass of libcouchbase which utilizes the underlying IOPS structures and the gevent event primitives to efficiently utilize couroutine switching. """ super(Bucket, self).__init__(IOPS(), *args, **kwargs) def _do_ctor_connect(self): if self.connected: return self._connect() self._evconn = AsyncResult() self._conncb = self._on_connected self._evconn.get() self._evconn = None def _on_connected(self, err): if err: self._evconn.set_exception(err) else: self._evconn.set(None) def _waitwrap(self, cbasync): cur_thread = getcurrent() errback = lambda r, x, y, z: cur_thread.throw(x, y, z) cbasync.set_callbacks(cur_thread.switch, errback) try: return get_hub().switch() finally: # Deregister callbacks to prevent another request on the same # greenlet to get the result from this context. cbasync.set_callbacks(dummy_callback, dummy_callback) def _meth_factory(meth, name): def ret(self, *args, **kwargs): return self._waitwrap(meth(self, *args, **kwargs)) return ret def _http_request(self, **kwargs): res = super(Bucket, self)._http_request(**kwargs) w = Waiter() res.callback = lambda x: w.switch(x) res.errback = lambda x, c, o, b: w.throw(c, o, b) return w.get() def query(self, *args, **kwargs): kwargs['itercls'] = GView return super(Bucket, self).query(*args, **kwargs) def n1ql_query(self, query, *args, **kwargs): kwargs['itercls'] = GN1QLRequest return super(Bucket, self).n1ql_query(query, *args, **kwargs) def _get_close_future(self): ev = Event() def _dtor_cb(*args): ev.set() self._dtorcb = _dtor_cb return ev locals().update(AsyncBucket._gen_memd_wrappers(_meth_factory))