Source code for piecash.kvp

import datetime
import decimal
import sys
import uuid
from importlib import import_module

from enum import Enum
from sqlalchemy import Column, VARCHAR, INTEGER, REAL, BIGINT, types, event
from sqlalchemy.orm import relation, foreign, object_session, backref

from ._common import CallableList
from ._common import hybrid_property_gncnumeric
from .sa_extra import _DateTime, DeclarativeBase, _Date

if sys.version > '3':
    str_unicode = str
else:
    str_unicode = basestring


class KVP_Type(Enum):
    KVP_TYPE_INVALID = -1
    KVP_TYPE_GINT64 = 1
    KVP_TYPE_DOUBLE = 2
    KVP_TYPE_NUMERIC = 3
    KVP_TYPE_STRING = 4
    KVP_TYPE_GUID = 5
    KVP_TYPE_TIMESPEC = 6
    KVP_TYPE_BINARY = 7
    KVP_TYPE_GLIST = 8
    KVP_TYPE_FRAME = 9
    KVP_TYPE_GDATE = 10


pytype_KVPtype = {
    int: KVP_Type.KVP_TYPE_GINT64,
    float: KVP_Type.KVP_TYPE_DOUBLE,
    decimal.Decimal: KVP_Type.KVP_TYPE_NUMERIC,
    dict: KVP_Type.KVP_TYPE_FRAME,
    list: KVP_Type.KVP_TYPE_GLIST,
    # to fill
}

KVPtype_fields = {
    KVP_Type.KVP_TYPE_GINT64: 'int64_val',
    KVP_Type.KVP_TYPE_DOUBLE: 'double_val',
    KVP_Type.KVP_TYPE_STRING: 'string_val',
    KVP_Type.KVP_TYPE_GUID: 'guid_val',
    KVP_Type.KVP_TYPE_TIMESPEC: 'timespec_val',
    KVP_Type.KVP_TYPE_GDATE: 'gdate_val',
    KVP_Type.KVP_TYPE_NUMERIC: ('numeric_val_num', 'numeric_val_denom'),
    KVP_Type.KVP_TYPE_FRAME: 'guid',
    KVP_Type.KVP_TYPE_GLIST: 'guid',
}


[docs]class SlotType(types.TypeDecorator): """Used to customise the DateTime type for sqlite (ie without the separators as in gnucash """ impl = INTEGER def process_bind_param(self, value, dialect): if value is not None: return value.value def process_result_value(self, value, dialect): if value is not None: return KVP_Type(value)
class DictWrapper(object): def __contains__(self, key): for sl in self.slots: if sl.name == key: return True else: return False def __getitem__(self, key): assert not isinstance(key, int), "You are accessing slots with an integer (={}) while a string is expected".format(key) keys = key.split("/", 1) key = keys[0] for sl in self.slots: if sl.name == key: break else: raise KeyError("No slot exists with name '{}'".format(key)) if len(keys) > 1: return sl[keys[1]] else: return sl # .value def __setitem__(self, key, value): keys = key.split("/", 1) key = keys[0] for sl in self.slots: if sl.name == key: break else: # new key if len(keys) > 1: if isinstance(self, SlotFrame): sf = SlotFrame(name=self._name + "/" + key) else: sf = SlotFrame(name=key) sf[keys[1]] = value self.slots.append(sf) else: self.slots.append(slot(parent=self, name=key, value=value)) return if len(keys) > 1: sl[keys[1]] = value return # assign if type is correct if isinstance(value, sl._python_type): sl.value = value else: raise TypeError("Type of '{}' is not one of {}".format(value, sl._python_type)) def __delitem__(self, key): if isinstance(key, slice): # delete all del self.slots[key] return keys = key.split("/", 1) for i, sl in enumerate(self.slots): if sl.name == keys[0]: break else: raise KeyError("No slot exists with name '{}'".format(key)) if len(keys) > 1: del sl[keys[1]] else: del self.slots[i] def iteritems(self): for sl in self.slots: yield sl.name, sl def get(self, key, default=None): try: return self[key].value except KeyError: return default class Slot(DeclarativeBase): __tablename__ = 'slots' __table_args__ = {'sqlite_autoincrement': True} # column definitions id = Column('id', INTEGER(), primary_key=True, nullable=False) obj_guid = Column('obj_guid', VARCHAR(length=32), nullable=False, index=True) _name = Column('name', VARCHAR(length=4096), nullable=False) @property def name(self): if self._name: return self._name.split("/")[-1] else: return self._name @name.setter def name(self, value): self._name = value slot_type = Column('slot_type', SlotType(), nullable=False) __mapper_args__ = { 'polymorphic_on': slot_type, } def __init__(self, name, value=None): self.name = name if value is not None: self.value = value def __unirepr__(self): return u"<{} {}={!r}>".format(self.__class__.__name__, self.name, self.value) class SlotSimple(Slot): __mapper_args__ = { 'polymorphic_identity': -1, } _python_type = () @property def value(self): return getattr(self, self._field) @value.setter def value(self, value): setattr(self, self._field, value) def define_simpleslot(postfix, pytype, KVPtype, field, col_type, col_default): cls = type( 'Slot{}'.format(postfix), (SlotSimple,), { "__mapper_args__": {'polymorphic_identity': KVPtype}, field: Column(field, col_type, default=col_default), "_field": field, "_python_type": pytype, } ) return cls SlotInt = define_simpleslot(postfix="Int", pytype=(int,), KVPtype=KVP_Type.KVP_TYPE_GINT64, field="int64_val", col_type=BIGINT(), col_default=0, ) SlotString = define_simpleslot(postfix="String", pytype=(str_unicode,), KVPtype=KVP_Type.KVP_TYPE_STRING, field="string_val", col_type=VARCHAR(length=4096), col_default=None, ) SlotDouble = define_simpleslot(postfix="Double", pytype=(float,), KVPtype=KVP_Type.KVP_TYPE_DOUBLE, field="double_val", col_type=REAL(), col_default=0, ) SlotTime = define_simpleslot(postfix="Time", pytype=(datetime.time,), KVPtype=KVP_Type.KVP_TYPE_TIMESPEC, field="timespec_val", col_type=_DateTime(), col_default=None, ) class SlotFrame(DictWrapper, Slot): __mapper_args__ = { 'polymorphic_identity': KVP_Type.KVP_TYPE_FRAME } _python_type = (dict,) guid_val = Column('guid_val', VARCHAR(length=32)) slots = relation('Slot', primaryjoin=foreign(Slot.obj_guid) == guid_val, cascade='all, delete-orphan', collection_class=CallableList, single_parent=True, backref=backref("parent", remote_side=guid_val), ) @property def value(self): # convert to dict return {sl.name: sl.value for sl in self.slots} @value.setter def value(self, value): self.slots = [slot(parent=self, name=k, value=v) for k, v in value.items()] def __init__(self, **kwargs): self.guid_val = uuid.uuid4().hex super(SlotFrame, self).__init__(**kwargs) class SlotList(SlotFrame): __mapper_args__ = { 'polymorphic_identity': KVP_Type.KVP_TYPE_GLIST } _python_type = (list,) @property def value(self): # convert to dict return [sl.value for sl in self.slots] @value.setter def value(self, value): self.slots = [slot(parent=self, name=str(i), value=v) for i, v in enumerate(value)] def __init__(self, **kwargs): self.guid_val = uuid.uuid4().hex super(SlotFrame, self).__init__(**kwargs) @event.listens_for(SlotFrame.slots, 'remove') def remove_slot(target, value, initiator): s = object_session(value) if value in s.new: s.expunge(value) else: s.delete(value) class SlotGUID(SlotFrame): __mapper_args__ = { 'polymorphic_identity': KVP_Type.KVP_TYPE_GUID } _python_type = (DeclarativeBase,) # add _mapping_name_class = { 'from-sched-xaction': 'piecash.core.transaction.ScheduledTransaction', 'account': 'piecash.core.account.Account', 'invoice-guid': 'piecash.business.invoice.Invoice', 'peer_guid': 'piecash.core.transaction.Split', 'gains-split': 'piecash.core.transaction.Split', 'gains-source': 'piecash.core.transaction.Split', 'default-currency': 'piecash.core.commodity.Commodity', } @property def Class(self): name, guid = self.name, self.guid_val if name.startswith('CURRENCY::'): # handle capital gain account class_to_retrieve = 'piecash.core.account.Account' else: class_to_retrieve = self._mapping_name_class.get(name, None) if class_to_retrieve is None: raise ValueError("Smart retrieval of GUID slot with name '{}' is not yet supported." "Need to retrieve proper object type in kvp module (add in SlotGUID._mapping_name_class)".format(name)) class_module, class_name = class_to_retrieve.rsplit('.', 1) mod = import_module(class_module) Class = getattr(mod, class_name) return Class @property def value(self): return object_session(self).query(self.Class).filter_by(guid=self.guid_val).one() @value.setter def value(self, value): assert isinstance(value, self.Class) self.guid_val = value.guid def get_all_subclasses(cls): all_subclasses = [] direct_subclasses = cls.__subclasses__() all_subclasses.extend(direct_subclasses) for subclass in direct_subclasses: all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses def slot(parent, name, value): if isinstance(parent, SlotFrame): name = parent._name + "/" + name # handle datetime before others (as otherwise can be mixed with date) if isinstance(value, datetime.datetime): return SlotTime(name=name, value=value) for cls in get_all_subclasses(Slot): if isinstance(value, cls._python_type) and cls != SlotFrame and cls != SlotList: return cls(name=name, value=value) if isinstance(value, dict): # transform a dict to Frame/Slots sf = SlotFrame(name=name) for k, v in value.items(): sl = slot(parent=sf, name=k, value=v) sl.parent = sf return sf if isinstance(value, list): # transform a list to List/Slots sf = SlotList(name=name) for i, v in enumerate(value): sl = slot(parent=sf, name=str(i), value=v) sl.parent = sf return sf raise ValueError("Cannot handle type of '{}'".format(value)) class SlotNumeric(Slot): __mapper_args__ = { 'polymorphic_identity': KVP_Type.KVP_TYPE_NUMERIC } _python_type = (tuple, decimal.Decimal) _numeric_val_num = Column('numeric_val_num', BIGINT(), nullable=False, default=0) _numeric_val_denom = Column('numeric_val_denom', BIGINT(), nullable=False, default=1) value = hybrid_property_gncnumeric(_numeric_val_num, _numeric_val_denom) SlotDate = define_simpleslot(postfix="Date", pytype=(datetime.date,), KVPtype=KVP_Type.KVP_TYPE_GDATE, field="gdate_val", col_type=_Date(), col_default=None, )