# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""Models for storing counters for each pattern."""
import re
from invenio_db import db
from sqlalchemy.event import listen
from sqlalchemy.orm import validates
from sqlalchemy.orm.exc import MultipleResultsFound
from .errors import InvalidResetCall, InvalidTemplate
from .utils import double_counter, extract_placeholders
[docs]class TemplateDefinition(db.Model, object):
"""Representation of a template definition."""
__tablename__ = 'sequencegenerator_template'
COUNTER_REGEX = re.compile(r'({counter(!.)?(:.*)?})')
"""Regular expression matching the counter inside the template string."""
name = db.Column(db.String(255), primary_key=True)
"""The identifier of the template definition."""
meta_template = db.Column(db.String(255), unique=True)
"""The template generator."""
parent_name = db.Column(db.ForeignKey(name))
"""Indicate that the template depends on another one."""
start = db.Column(db.Integer, default=0)
"""The starting counter of sequences generated from ``meta_template``."""
step = db.Column(db.Integer, default=1)
"""The incremental step of sequences generated from ``meta_template``."""
children = db.relationship(
'TemplateDefinition',
backref=db.backref('parent', remote_side=name)
)
@validates('meta_template')
[docs] def counter(self, **kwargs):
"""Get counter of this template definition, based on given kwargs."""
meta_template = double_counter(self.meta_template, self.COUNTER_REGEX)
counter = Counter.get(meta_template, kwargs)
if counter is None:
with db.session.begin_nested():
counter = Counter.create(
meta_template=meta_template,
ctx=kwargs,
counter=self.start,
template_definition=self,
)
db.session.add(counter)
return counter
def __repr__(self):
"""Canonical representation of ``TemplateDefinition``."""
return ('TemplateDefinition('
'name={0.name!r}, '
'meta_template={0.meta_template!r}, '
'start={0.start!r}, '
'step={0.step!r})'
).format(self)
[docs]def derive_parent(target, value, oldvalue, initiator):
"""Automatically derive parent from template string."""
placeholders = extract_placeholders(value)
# Check if parent sequence exists
try:
parent = TemplateDefinition.query.filter(
TemplateDefinition.name.in_(placeholders)
).one_or_none() if placeholders else None
except MultipleResultsFound:
raise InvalidTemplate('More than 1 parents '
'in template "{0}".'.format(value))
target.parent = parent
listen(TemplateDefinition.meta_template, 'set', derive_parent)
[docs]class Counter(db.Model):
"""Stores generated identifiers."""
__tablename__ = 'sequencegenerator_counter'
template_instance = db.Column(db.String(255), nullable=False,
primary_key=True, index=True)
"""The template string to use."""
definition_name = db.Column(db.ForeignKey(TemplateDefinition.name))
"""Link to the template definition."""
counter = db.Column(db.Integer, nullable=False)
"""Running counter."""
template_definition = db.relationship(
TemplateDefinition,
lazy='joined',
backref=db.backref('counters', cascade='all, delete-orphan')
)
# Optimistic concurrency control
__mapper_args__ = {
'version_id_col': counter,
'version_id_generator': False
}
@classmethod
[docs] def create(cls, meta_template, ctx=None, **kwargs):
"""Initialize a counter."""
assert 'meta_template' not in kwargs
return cls(
template_instance=meta_template.format(**ctx or {}),
**kwargs
)
@classmethod
[docs] def get(cls, definition, ctx=None):
"""Get a ``Counter``."""
return cls.query.get(definition.format(**ctx or {}))
[docs] def increment(self):
"""Generate next identifier."""
next_identifier = self.template_instance.format(counter=self.counter)
with db.session.begin_nested():
self.counter += self.template_definition.step
return next_identifier
[docs] def reset(self, start=0):
"""Reset counter."""
with db.session.begin_nested():
# Ensure no children exist
for child in self.template_definition.children:
if child.counters:
raise InvalidResetCall()
self.counter = start
def __repr__(self):
"""Canonical representation of ``Counter``."""
return ('Counter('
'template_instance={0.template_instance!r}, '
'definition_name={0.definition_name!r}, '
'counter={0.counter!r}, '
'template_definition={0.template_definition!r})'
).format(self)