Source code for findig.extras.redis

from ast import literal_eval
from collections.abc import Callable, Mapping
from contextlib import contextmanager
from time import time

import redis

from findig.context import ctx
from findig.resource import AbstractResource
from findig.tools.dataset import MutableDataSet, MutableRecord, FilteredDataSet


class IndexToken(Mapping):
    __slots__ = 'sz', 'fields'

    def __init__(self, fields, bytesize=4):
        self.fields = fields
        self.sz = bytesize

    def __str__(self):
        return ",".join("{}={!r}".format(k, self.fields[k])
                        for k in sorted(self.fields))

    def __hash__(self):
        return hash(str(self)) & (2**(8*self.sz - 1) - 1)

    def __iter__(self):
        yield from self.fields

    def __len__(self):
        return len(self.fields)

    def __getitem__(self, key):
        return self.fields[key]

    @property
    def value(self):
        return hash(self).to_bytes(self.sz, 'big')


class RedisObj(MutableRecord):
    def __init__(self, key, collection=None, include_id=True):
        self.itemkey = key
        self.collection = collection
        self.include_id = include_id
        self.r = (collection.r 
                  if collection is not None
                  else redis.StrictRedis())
        self.inblock = False

    def __repr__(self):
        return "<{name}({key!r}){suffix}>".format(
            name="redis-object" if self.collection is None else "item",
            key=self.itemkey,
            suffix="" if self.collection is None
                      else " of {!r}".format(self.collection)
        )

    def start_edit_block(self):
        client = self.r
        self.r = self.r.pipeline()
        self.inblock = True
        return (client, dict(self))

    def close_edit_block(self, token):
        client, old_data = token

        ret = self.r.execute()
        self.r = client
        
        data = dict(self)

        if self.collection is not None:
            self.collection.reindex(
                self.id,
                data,
                old_data
            )

        self.invalidate(new_data=data)
        self.inblock = False
        

    def patch(self, add_data, remove_fields, replace=False):
        p = self.r.pipeline()

        if not self.inblock:
            old_data = dict(self)

        if replace:
            p.delete(self.itemkey)

        elif remove_fields:
            p.hdel(self.itemkey, *remove_fields)

        self.store(add_data, self.itemkey, p)
        p.execute()

        if self.inblock:
            data = {k: old_data[k] for k in old_data 
                    if k not in remove_fields}
            data.update(add_data)

            self.invalidate(new_data=data)

            if self.collection is not None:
                self.collection.reindex(self.id, data, old_data)

        else:
            self.invalidate()

    def read(self):
        data = self.r.hgetall(self.itemkey)
        if self.include_id:
            data[b'id'] = self.id.encode("utf8")
        return {k.decode('utf8'):literal_eval(v.decode('utf8')) 
                for k,v in data.items()}

    def delete(self):
        if self.collection is not None:
            self.collection.remove_from_index(self.id, self)
            self.collection.untrack_id(self.id)

        self.r.delete(self.itemkey)

    @staticmethod
    def store(data, key, client):
        data = {k: repr(v).encode('utf8')
                for k,v in data.items()}

        return client.hmset(key, data)

    @property
    def id(self):
        return self.itemkey.rpartition(":")[-1]

            
[docs]class RedisSet(MutableDataSet): """ RedisSet(key=None, client=None, index_size=4) A RedisSet is an :class:`AbstractDataSet` that stores its items in a Redis database (using a Sorted Set to represent the collection, and a sorted set to represent items). :param key: The base key that should be used for the sorted set. If not given, one is deterministically generated based on the current resource. :param client: A :class:`redis.StrictRedis` instance that should be used to communicate with the redis server. If not given, a default instance is used. :param index_size: The number of bytes to use to index items in the set (per item). """ def __init__(self, key=None, client=None, **args): if key is None: key = ctx.resource if isinstance(key, AbstractResource): key = "findig:resource:{}".format(key.name) self.colkey = key self.itemkey = self.colkey + ':item:{id}' self.indkey = self.colkey + ':index' self.incrkey = self.colkey + ':next-id' self.genid = args.pop( 'generate_id', lambda d: self.r.incr(self.incrkey) ) self.indsize = args.pop('index_size', 4) self.filterby = args.pop('filterby', {}) self.indexby = args.pop('candidate_keys', [('id',)]) self.include_ids = args.pop('include_ids', True) self.r = redis.StrictRedis() if client is None else client def __repr__(self): if self.filterby: name = "filtered-redis-view" suffix = "|{}".format( ",".join("{}={!r}".format(k,v) for k,v in self.filterby.items()) ) else: name = "redis-set" suffix = "" return "<{name}({key!r}){suffix}>".format( name=name, suffix=suffix, key=self.colkey ) def __iter__(self): """Query the set and iterate through the elements.""" # If there is a filter, and it is completely encapsulated by # our index, we can use that to iter through the items tokens = self.__buildindextokens(self.filterby, raise_err=False) if tokens: # Pick an index to scan token = random.choice(tokens) id_blobs = self.r.zrangebylex(self.indkey, token.value, token.value) ids = [bs[self.indsize:] for bs in id_blobs] else: ids = self.r.zrange(self.colkey, 0, -1) for id in map(lambda bs: bs.decode('ascii'), ids): itemkey = self.itemkey.format(id=id) if self.filterby: # Check the items against the filter if it was # specified data = RedisObj(itemkey, self, self.include_ids) if FilteredDataSet.check_match(data, self.filterby): yield data else: yield RedisObj(itemkey, self, self.include_ids) def add(self, data): id = str(data['id'] if 'id' in data else self.genid(data)) itemkey = self.itemkey.format(id=id) with self.group_redis_commands(): tokens = self.add_to_index(id, data) self.track_id(id) RedisObj.store(data, itemkey, self.r) return tokens[0] def fetch_now(self, **spec): if list(spec) == ['id']: # Fetching by ID only; just lookup the item according to its # key itemkey = self.itemkey.format(id=spec['id']) if not self.r.exists(itemkey): raise LookupError("No matching item found.") else: return RedisObj(itemkey, self) else: return super(RedisSet, self).fetch_now(**spec) def track_id(self, id): self.r.zadd(self.colkey, time(), id) def untrack_id(self, id): self.r.zrem(self.colkey, id) def remove_from_index(self, id, data): tokens = self.__buildindextokens(data, id, False) for token in tokens: self.r.zrem( self.indkey, token.value + id.encode('ascii') ) def add_to_index(self, id, data): tokens = self.__buildindextokens(data, id) for token in tokens: self.r.zadd( self.indkey, 0, token.value + id.encode('ascii') ) return tokens def reindex(self, id, data, old_data): with self.group_redis_commands(): self.remove_from_index(id, data) self.add_to_index(id, data) def clear(self): # Remove all the child objects for_removal = list(self) with self.group_redis_commands(): for obj in for_removal: obj.delete() self.r.delete(self.incrkey) # Delete all the redis structures # Technically this step shouldn't be necessary; # Redis should clean up the other data structures def filtered(self, **spec): filter = dict(self.filterby) filter.update(spec) args = { 'key': self.colkey, 'candidate_keys': self.indexby, 'index_size': self.indsize, 'filterby': filter, 'client': self.r, } return RedisSet(**args) @contextmanager def group_redis_commands(self): client = self.r self.r = client.pipeline() yield self.r.execute() self.r = client def __buildindextokens(self, data, generated_id=None, raise_err=True): index = [] for ind in self.indexby: mapping = {} for field in ind: if field in data: mapping[field] = data[field] elif field == 'id' and generated_id is not None: # special case mapping[field] = generated_id else: # Can't use this index break else: index.append(IndexToken(mapping, self.indsize)) if not index: if raise_err: raise ValueError("Could not index this data. " "This may be due to insuffient index keys " "or incomplete data." ) else: return [] else: return index
__all__ = ["RedisSet"]