Source code for piecash.core.session

import datetime
import os
import shutil
import socket
from collections import defaultdict

from sqlalchemy import event, Column, VARCHAR, INTEGER, Table, PrimaryKeyConstraint
from sqlalchemy.sql.ddl import DropConstraint, DropIndex
from sqlalchemy_utils import database_exists

from .book import Book
from .._common import GnucashException
from ..sa_extra import create_piecash_engine, DeclarativeBase, Session

version_supported = {u'Gnucash-Resave': 19920, u'invoices': 3, u'books': 1, u'accounts': 1, u'slots': 3,
                     u'taxtables': 2, u'lots': 2, u'orders': 1, u'vendors': 1, u'customers': 2, u'jobs': 1,
                     u'transactions': 3, u'Gnucash': 2060400, u'budget_amounts': 1, u'billterms': 2, u'recurrences': 2,
                     u'entries': 3, u'prices': 2, u'schedxactions': 1, u'splits': 4, u'taxtable_entries': 3,
                     u'employees': 2, u'commodities': 1, u'budgets': 1}

# this is not a declarative as it is used before binding the session to an engine.
gnclock = Table(u'gnclock', DeclarativeBase.metadata,
                Column('hostname', VARCHAR(length=255)),
                Column('pid', INTEGER()),
                )


[docs]class Version(DeclarativeBase): """The declarative class for the 'versions' table. """ __tablename__ = 'versions' __table_args__ = {} # column definitions # : The name of the table table_name = Column('table_name', VARCHAR(length=50), primary_key=True, nullable=False) #: The version for the table table_version = Column('table_version', INTEGER(), nullable=False) def __init__(self, table_name, table_version): self.table_name = table_name self.table_version = table_version def __unirepr__(self): return u"Version<{}={}>".format(self.table_name, self.table_version)
[docs]def build_uri(sqlite_file=None, uri_conn=None, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, ): """Create the connection string in function of some choices. :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param str db_type: type of database in ["postgres","mysql"] :param str db_user: username of database :param str db_password: password for the use of database :param str db_name: name of database :param str db_host: host of database :param str db_port: port of database :return: the connection string :rtype: str """ db_config = (db_type, db_host, db_port, db_name, db_user, db_password) db_config_isdefined = map(lambda x: x is not None, db_config[:-1]) if any(db_config_isdefined): if not all(db_config_isdefined): raise ValueError("When using db_* arguments, all must be specified : {}".format(db_config)) uri_conn = {"postgres": "postgresql://{username}:{password}@{host}:{port}/{name}", "mysql": "mysql+pymysql://{username}:{password}@{host}:{port}/{name}?charset=utf8", }[db_type].format(username=db_user, password=db_password, host=db_host, port=db_port, name=db_name) # db_postgres_uri = "postgresql://postgres:{pwd}@localhost:5432/foo".format(pwd=pg_password) # db_mysql_uri = "mysql+pymysql://travis:@localhost/foo?charset=utf8" # db_sqlite_uri = "sqlite:///{}".format(db_sqlite) if sqlite_file and uri_conn: raise ValueError("Only one of 'sqlite_file' or 'uri_conn' argument can be defined") if uri_conn is None: if sqlite_file: uri_conn = "sqlite:///{}".format(sqlite_file) else: uri_conn = "sqlite:///:memory:" return uri_conn
[docs]def create_book(sqlite_file=None, uri_conn=None, currency="EUR", overwrite=False, keep_foreign_keys=False, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, **kwargs): """Create a new empty GnuCash book. If both sqlite_file and uri_conn are None, then an "in memory" sqlite book is created. :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param str currency: the ISO symbol of the default currency of the book :param bool overwrite: True if book should be deleted and recreated if it exists already :param bool keep_foreign_keys: True if the foreign keys should be kept (may not work at all with GnuCash) :param str db_type: type of database in ["postgres","mysql"] :param str db_user: username of database :param str db_password: password for the use of database :param str db_name: name of database :param str db_host: host of database :param str db_port: port of database :return: the document as a gnucash session :rtype: :class:`GncSession` :raises GnucashException: if document already exists and overwrite is False """ from sqlalchemy_utils.functions import database_exists, create_database, drop_database uri_conn = build_uri(sqlite_file, uri_conn, db_type, db_user, db_password, db_name, db_host, db_port) # create database (if DB is not a sqlite in memory) if uri_conn != "sqlite:///:memory:": if database_exists(uri_conn): if overwrite: drop_database(uri_conn) else: raise GnucashException("'{}' db already exists".format(uri_conn)) create_database(uri_conn) engine = create_piecash_engine(uri_conn, **kwargs) # drop constraints if we de not want to keep them (keep_foreign_keys=False), the default if not keep_foreign_keys: for n, tbl in DeclarativeBase.metadata.tables.items(): # drop index constraints for idx in tbl.indexes: event.listen(tbl, "after_create", DropIndex(idx), once=True) # drop FK constraints for cstr in tbl.constraints: if isinstance(cstr, PrimaryKeyConstraint): continue else: event.listen(tbl, "before_drop", DropConstraint(cstr), once=True) # # create all (tables, fk, ...) DeclarativeBase.metadata.create_all(engine) s = Session(bind=engine) # create all rows in version table for table_name, table_version in version_supported.items(): s.add(Version(table_name=table_name, table_version=table_version)) # create book and merge with session b = Book() s.add(b) adapt_session(s, book=b, readonly=False) # create commodities and initial accounts from .account import Account b.root_account = Account(name="Root Account", type="ROOT", commodity=None, book=b) b.root_template = Account(name="Template Root", type="ROOT", commodity=None, book=b) b["default-currency"] = b.currencies(mnemonic=currency) b.save() return b
[docs]def open_book(sqlite_file=None, uri_conn=None, readonly=True, open_if_lock=False, do_backup=True, db_type=None, db_user=None, db_password=None, db_name=None, db_host=None, db_port=None, **kwargs): """Open an existing GnuCash book :param str sqlite_file: a path to an sqlite3 file (only used if uri_conn is None) :param str uri_conn: a sqlalchemy connection string :param bool readonly: open the file as readonly (useful to play with and avoid any unwanted save) :param bool open_if_lock: open the file even if it is locked by another user (using open_if_lock=True with readonly=False is not recommended) :param bool do_backup: do a backup if the file written in RW (i.e. readonly=False) (this only works with the sqlite backend and copy the file with .{:%Y%m%d%H%M%S}.gnucash appended to it) :return: the document as a gnucash session :rtype: :class:`GncSession` :raises GnucashException: if the document does not exist :raises GnucashException: if there is a lock on the file and open_if_lock is False """ uri_conn = build_uri(sqlite_file, uri_conn, db_type, db_user, db_password, db_name, db_host, db_port) if uri_conn == "sqlite:///:memory:": raise ValueError("An in memory sqlite gnucash databook cannot be opened, it should be created") # create database (if not sqlite in memory if not database_exists(uri_conn): raise GnucashException("Database '{}' does not exist (please use create_book to create " \ "GnuCash books from scratch)".format(uri_conn)) engine = create_piecash_engine(uri_conn, **kwargs) # backup database if readonly=False and do_backup=True if not readonly and do_backup: if engine.name != "sqlite": raise GnucashException( "Cannot do a backup for engine '{}'. Do yourself a backup and then specify do_backup=False".format( engine.name)) url = uri_conn[len("sqlite:///"):] url_backup = url + ".{:%Y%m%d%H%M%S}.gnucash".format(datetime.datetime.now()) shutil.copyfile(url, url_backup) locks = list(engine.execute(gnclock.select())) # ensure the file is not locked by GnuCash itself if locks and not open_if_lock: raise GnucashException("Lock on the file") s = Session(bind=engine) # check the versions in the table versions is consistent with the API # TODO: improve this in the future to allow more than 1 version version_book = {v.table_name: v.table_version for v in s.query(Version).all()} for k, v in version_book.items(): # skip GnuCash if k in ("Gnucash"): continue assert version_supported[k] == v, "Unsupported version for table {} : got {}, supported {}".format(k, v, version_supported[ k]) book = s.query(Book).one() adapt_session(s, book=book, readonly=readonly) return book
[docs]def adapt_session(session, book, readonly): """ Change the SA session object to add some features. :param session: the SA session object that will be modified in place :param book: the gnucash singleton book linked to the SA session :param readonly: True if the session should not allow commits. :return: """ # link session and book together book.session = session session.book = book book.uri = session.bind.url # def new_flush(*args, **kwargs): # if session.dirty or session.new or session.deleted: # session.rollback() # raise GnucashException("You cannot change the DB, it is locked !") # add logic to make session readonly def readonly_commit(*args, **kwargs): # session.rollback() raise GnucashException("You cannot change the DB, it is locked !") if readonly: session.commit = readonly_commit # add logic to create/delete GnuCash locks def delete_lock(): session.execute(gnclock.delete(whereclause=(gnclock.c.hostname == socket.gethostname()) and (gnclock.c.pid == os.getpid()))) session.commit() session.delete_lock = delete_lock def create_lock(): session.execute(gnclock.insert(values=dict(hostname=socket.gethostname(), pid=os.getpid()))) session.commit() session.create_lock = create_lock # add logic to track if a session has been modified or not session._is_modified = False session._all_changes = {} @event.listens_for(session, 'after_flush') def receive_after_flush(session, flush_context): session._is_modified = not session.is_saved @event.listens_for(session, 'after_commit') @event.listens_for(session, 'after_rollback') def init_session_status(session, *args, **kwargs): session._is_modified = False session._all_changes.clear() session.book.session_changes = defaultdict(list) session.__class__.is_saved = property( fget=lambda self: not (self._is_modified or self.dirty or self.deleted or self.new), doc="True if nothing has yet been changed (False otherwise)")
event.listen(Session, 'before_commit', Book.validate_book) event.listen(Session, 'before_flush', Book.track_dirty)