Source code for couchbase.bucketmanager

# Copyright 2013, Couchbase, Inc.
# All Rights Reserved
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import time

import couchbase._libcouchbase as _LCB
import couchbase.exceptions as exceptions
from couchbase.exceptions import CouchbaseError, ArgumentError
from couchbase.views.params import Query, SpatialQuery, STALE_OK
from couchbase._pyport import single_dict_key
from couchbase._ixmgmt import IxmgmtRequest, N1qlIndex, N1QL_PRIMARY_INDEX

[docs]class BucketManager(object): """ The `BucketManager` class allows access to common maintenance APIs related to a :class:`~couchbase.bucket.Bucket` object. It is normally returned via the :meth:`~couchbase.bucket.Bucket.bucket_manager` method """ def __init__(self, cb): self._cb = cb def _http_request(self, **kwargs): return self._cb._http_request(**kwargs) def _mk_devmode(self, *args): return self._cb._mk_devmode(*args) def _view(self, *args, **kwargs): return self._cb._view(*args, **kwargs) def _doc_rev(self, res): """ Returns the rev id from the header """ jstr = res.headers['X-Couchbase-Meta'] jobj = json.loads(jstr) return jobj['rev'] def _poll_vq_single(self, dname, use_devmode, ddresp): """ Initiate a view query for a view located in a design document :param ddresp: The design document to poll (as JSON) :return: True if successful, False if no views. """ vname = None query = None v_mr = ddresp.get('views', {}) v_spatial = ddresp.get('spatial', {}) if v_mr: vname = single_dict_key(v_mr) query = Query() elif v_spatial: vname = single_dict_key(v_spatial) query = SpatialQuery() if not vname: return False query.stale = STALE_OK query.limit = 1 for r in self._cb.query(dname, vname, use_devmode=use_devmode, query=query): pass return True def _design_poll(self, name, mode, oldres, timeout=5, use_devmode=False): """ Poll for an 'async' action to be complete. :param string name: The name of the design document :param string mode: One of ``add`` or ``del`` to indicate whether we should check for addition or deletion of the document :param oldres: The old result from the document's previous state, if any :param float timeout: How long to poll for. If this is 0 then this function returns immediately :type oldres: :class:`~couchbase.result.HttpResult` """ if not timeout: return True if timeout < 0: raise ArgumentError.pyexc("Interval must not be negative") t_end = time.time() + timeout old_rev = None if oldres: old_rev = self._doc_rev(oldres) while time.time() < t_end: try: cur_resp = self.design_get(name, use_devmode=use_devmode) if old_rev and self._doc_rev(cur_resp) == old_rev: continue try: if not self._poll_vq_single( name, use_devmode, cur_resp.value): continue return True except CouchbaseError: continue except CouchbaseError: if mode == 'del': # Deleted, whopee! return True raise exceptions.TimeoutError.pyexc( "Wait time for design action completion exceeded")
[docs] def design_create(self, name, ddoc, use_devmode=True, syncwait=0): """ Store a design document :param string name: The name of the design :param ddoc: The actual contents of the design document :type ddoc: string or dict If ``ddoc`` is a string, it is passed, as-is, to the server. Otherwise it is serialized as JSON, and its ``_id`` field is set to ``_design/{name}``. :param bool use_devmode: Whether a *development* mode view should be used. Development-mode views are less resource demanding with the caveat that by default they only operate on a subset of the data. Normally a view will initially be created in 'development mode', and then published using :meth:`design_publish` :param float syncwait: How long to poll for the action to complete. Server side design operations are scheduled and thus this function may return before the operation is actually completed. Specifying the timeout here ensures the client polls during this interval to ensure the operation has completed. :raise: :exc:`couchbase.exceptions.TimeoutError` if ``syncwait`` was specified and the operation could not be verified within the interval specified. :return: An :class:`~couchbase.result.HttpResult` object. .. seealso:: :meth:`design_get`, :meth:`design_delete`, :meth:`design_publish` """ name = self._cb._mk_devmode(name, use_devmode) fqname = "_design/{0}".format(name) if not isinstance(ddoc, dict): ddoc = json.loads(ddoc) ddoc = ddoc.copy() ddoc['_id'] = fqname ddoc = json.dumps(ddoc) existing = None if syncwait: try: existing = self.design_get(name, use_devmode=False) except CouchbaseError: pass ret = self._cb._http_request( type=_LCB.LCB_HTTP_TYPE_VIEW, path=fqname, method=_LCB.LCB_HTTP_METHOD_PUT, post_data=ddoc, content_type="application/json") self._design_poll(name, 'add', existing, syncwait, use_devmode=use_devmode) return ret
[docs] def design_get(self, name, use_devmode=True): """ Retrieve a design document :param string name: The name of the design document :param bool use_devmode: Whether this design document is still in "development" mode :return: A :class:`~couchbase.result.HttpResult` containing a dict representing the format of the design document :raise: :exc:`couchbase.exceptions.HTTPError` if the design does not exist. .. seealso:: :meth:`design_create` """ name = self._mk_devmode(name, use_devmode) existing = self._http_request(type=_LCB.LCB_HTTP_TYPE_VIEW, path="_design/" + name, method=_LCB.LCB_HTTP_METHOD_GET, content_type="application/json") return existing
[docs] def design_publish(self, name, syncwait=0): """ Convert a development mode view into a production mode views. Production mode views, as opposed to development views, operate on the entire cluster data (rather than a restricted subset thereof). :param string name: The name of the view to convert. Once the view has been converted, ensure that all functions (such as :meth:`design_get`) have the ``use_devmode`` parameter disabled, otherwise an error will be raised when those functions are used. Note that the ``use_devmode`` option is missing. This is intentional as the design document must currently be a development view. :return: An :class:`~couchbase.result.HttpResult` object. :raise: :exc:`couchbase.exceptions.HTTPError` if the design does not exist .. seealso:: :meth:`design_create`, :meth:`design_delete`, :meth:`design_get` """ existing = self.design_get(name, use_devmode=True) rv = self.design_create(name, existing.value, use_devmode=False, syncwait=syncwait) self.design_delete(name, use_devmode=True, syncwait=syncwait) self._design_poll(name, 'add', None, timeout=syncwait, use_devmode=False) return rv
[docs] def design_delete(self, name, use_devmode=True, syncwait=0): """ Delete a design document :param string name: The name of the design document to delete :param bool use_devmode: Whether the design to delete is a development mode design doc. :param float syncwait: Timeout for operation verification. See :meth:`design_create` for more information on this parameter. :return: An :class:`HttpResult` object. :raise: :exc:`couchbase.exceptions.HTTPError` if the design does not exist :raise: :exc:`couchbase.exceptions.TimeoutError` if ``syncwait`` was specified and the operation could not be verified within the specified interval. .. seealso:: :meth:`design_create`, :meth:`design_get` """ name = self._mk_devmode(name, use_devmode) existing = None if syncwait: try: existing = self.design_get(name, use_devmode=False) except CouchbaseError: pass ret = self._http_request(type=_LCB.LCB_HTTP_TYPE_VIEW, path="_design/" + name, method=_LCB.LCB_HTTP_METHOD_DELETE) self._design_poll(name, 'del', existing, syncwait) return ret
def _mk_index_def(self, ix, primary=False): if isinstance(ix, N1qlIndex): return N1qlIndex(ix) info = N1qlIndex() info.keyspace = self._cb.bucket info.primary = primary if ix: = ix elif not primary: raise ValueError('Missing name for non-primary index') return info
[docs] def n1ql_index_create(self, ix, **kwargs): """ Create an index for use with N1QL. :param str ix: The name of the index to create :param bool defer: Whether the building of indexes should be deferred. If creating multiple indexes on an existing dataset, using the `defer` option in conjunction with :meth:`build_deferred_indexes` and :meth:`watch_indexes` may result in substantially reduced build times. :param bool ignore_exists: Do not throw an exception if the index already exists. :param list fields: A list of fields that should be supplied as keys for the index. For non-primary indexes, this must be specified and must contain at least one field name. :param bool primary: Whether this is a primary index. If creating a primary index, the name may be an empty string and `fields` must be empty. :param str condition: Specify a condition for indexing. Using a condition reduces an index size :raise: :exc:`~.KeyExistsError` if the index already exists .. seealso:: :meth:`n1ql_index_create_primary` """ defer = kwargs.pop('defer', False) ignore_exists = kwargs.pop('ignore_exists', False) primary = kwargs.pop('primary', False) fields = kwargs.pop('fields', []) cond = kwargs.pop('condition', None) if kwargs: raise TypeError('Unknown keyword arguments', kwargs) info = self._mk_index_def(ix, primary) if primary and fields: raise TypeError('Cannot create primary index with explicit fields') elif not primary and not fields: raise ValueError('Fields required for non-primary index') if fields: info.fields = fields if primary and is N1QL_PRIMARY_INDEX: del if cond: if primary: raise ValueError('cannot specify condition for primary index') info.condition = cond options = { 'ignore_exists': ignore_exists, 'defer': defer } # Now actually create the indexes return IxmgmtRequest(self._cb, 'create', info, **options).execute()
[docs] def n1ql_index_create_primary(self, defer=False, ignore_exists=False): """ Create the primary index on the bucket. Equivalent to:: n1ql_index_create('', primary=True, **kwargs) :param bool defer: :param bool ignore_exists: .. seealso:: :meth:`create_index` """ return self.n1ql_index_create( '', defer=defer, primary=True, ignore_exists=ignore_exists)
[docs] def n1ql_index_drop(self, ix, primary=False, **kwargs): """ Delete an index from the cluster. :param str ix: the name of the index :param bool primary: if this index is a primary index :param bool ignore_missing: Do not raise an exception if the index does not exist :raise: :exc:`~.NotFoundError` if the index does not exist and `ignore_missing` was not specified """ info = self._mk_index_def(ix, primary) return IxmgmtRequest(self._cb, 'drop', info, **kwargs).execute()
[docs] def n1ql_index_drop_primary(self, **kwargs): """ Remove the primary index Equivalent to ``n1ql_index_drop('', primary=True, **kwargs)`` """ return self.n1ql_index_drop('', primary=True, **kwargs)
[docs] def n1ql_index_list(self, other_buckets=False): """ List indexes in the cluster. :param bool other_buckets: Whether to also include indexes belonging to other buckets (i.e. buckets other than the current `Bucket` object) :return: list[couchbase._ixmgmt.Index] objects """ info = N1qlIndex() if not other_buckets: info.keyspace = self._cb.bucket return IxmgmtRequest(self._cb, 'list', info).execute()
[docs] def n1ql_index_build_deferred(self, other_buckets=False): """ Instruct the server to begin building any previously deferred index definitions. This method will gather a list of all pending indexes in the cluster (including those created using the `defer` option with :meth:`create_index`) and start building them in an efficient manner. :param bool other_buckets: Whether to also build indexes found in other buckets, if possible :return: list[couchbase._ixmgmt.Index] objects. This list contains the indexes which are being built and may be passed to :meth:`n1ql_index_watch` to poll their build statuses. You can use the :meth:`n1ql_index_watch` method to wait until all indexes have been built:: mgr.n1ql_index_create('ix_fld1', fields=['field1'], defer=True) mgr.n1ql_index_create('ix_fld2', fields['field2'], defer=True) mgr.n1ql_index_create('ix_fld3', fields=['field3'], defer=True) indexes = mgr.n1ql_index_build_deferred() # [IndexInfo('field1'), IndexInfo('field2'), IndexInfo('field3')] mgr.n1ql_index_watch(indexes, timeout=30, interval=1) """ info = N1qlIndex() if not other_buckets: info.keyspace = self._cb.bucket return IxmgmtRequest(self._cb, 'build', info).execute()
[docs] def n1ql_index_watch(self, indexes, timeout=30, interval=1, watch_primary=False): """ Await completion of index building This method will wait up to `timeout` seconds for every index in `indexes` to have been built. It will poll the cluster every `interval` seconds. :param list indexes: A list of indexes to check. This is returned by :meth:`build_deferred_indexes` :param float timeout: How long to wait for the indexes to become ready. :param float interval: How often to poll the cluster. :param bool watch_primary: Whether to also watch the primary index. This parameter should only be used when manually constructing a list of string indexes :raise: :exc:`~.TimeoutError` if the timeout was reached before all indexes were built :raise: :exc:`~.NotFoundError` if one of the indexes passed no longer exists. """ kwargs = { 'timeout_us': int(timeout * 1000000), 'interval_us': int(interval * 1000000) } ixlist = [N1qlIndex.from_any(x, self._cb.bucket) for x in indexes] if watch_primary: ixlist.append( N1qlIndex.from_any(N1QL_PRIMARY_INDEX, self._cb.bucket)) return IxmgmtRequest(self._cb, 'watch', ixlist, **kwargs).execute()
create_n1ql_index = n1ql_index_create create_n1ql_primary_index = n1ql_index_create_primary list_n1ql_indexes = n1ql_index_list drop_n1ql_index = n1ql_index_drop drop_n1ql_primary_index = n1ql_index_drop_primary build_n1ql_deferred_indexes = n1ql_index_build_deferred watch_n1ql_indexes = n1ql_index_watch