Source code for railroadtracks.hortator

# Copyright 2014-2015 Novartis Institutes for Biomedical Research

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Persistence/memoization for the DAG
"""

from collections import namedtuple, Counter
from importlib import import_module
from railroadtracks.core import StepAbstract
from railroadtracks import __version__
import collections
import json
import os, shutil
import subprocess
import uuid
import warnings
import itertools
import operator
import networkx
import time
from . import core
from . import unifex

import enum
if hasattr(enum, '__version__') and enum.__version__.startswith('0.4'):
    raise ImportError("""
The Python package 'enum34' is required. Unfortunately, whenever the package 'enum' is also
present it can mask 'enum34'. Make sure that both 'enum34' is installed and 'enum'
is uninstalled.
    """)
Enum = enum.Enum

#FIXME: Shouldn't the definition come from separate sources (so the coupling between creation and access
# to the persistance layer is not too much tied to this Python script ?
_TASK_TODO = 'To do'
_TASK_DONE = 'Done'
_TASK_INPROGRESS = 'Work-in-progress'
_TASK_FAILED = 'Failed'
_TASK_STATUS_LIST = {_TASK_TODO: None, 
                     _TASK_INPROGRESS: None,
                     _TASK_DONE: None,
                     _TASK_FAILED: None}

# structure to have database ID, and flag to indicate if new (just created)
DbID = namedtuple('DbID', 'id new')

[docs]class Step: """ When used in the context of a StepGraph, a Step is small graph, or subgraph, consituted of a vertex, connected downstream to targets and upstream to sources. For more information about a StepGraph, see the documentation for it. """ __slots__ = ['step', 'sources', 'targets', 'parameters', 'model'] def __init__(self, step, sources, targets, parameters, model): """ :param step: A concrete step in the database :type step: StepConcrete_DbEntry :param sources: a sequence of source assets :param targets: a sequence of target assets :param model: a model (e.g., :mod:`railroadtracks.rnaseq`) """ assert isinstance(step, StepConcrete_DbEntry) self.step = step self.sources = sources self.targets = targets self.parameters = parameters self.model = model clsname = property(lambda self: self.step.clsname, None, None) unifiedname = property(lambda self: getattr(self.model, self.clsname)._name, None, None) def iscomplete(self): return self.step.status == _TASK_DONE def run(self): # FIXME: check matching version numbers # self.step.version sources = ['='.join((x.label, x.entityname)) for x in self.sources] targets = ['='.join((x.label, x.entityname)) for x in self.targets] uei = core.UnifiedExecInfo(self.step.executable, self.unifiedname, sources, targets, self.parameters, None, None # logging_file, logging_level ) res = unifex.unified_exec_run(uei, unifex._make_stepdict(self.model)) return res
class GRAPHDISPLAY(Enum): STEPLEVEL = {'layout': 'dot', 'layoutargs': '', 'kwargs': {'se_nodeid': '%(clsname)s_%(label)s', 'se_nodelabel': '{%(clsname)s | [%(label)s]}', 'step_nodeid': '%(clsname)s', 'step_nodelabel': '%(clsname)s'}} TASKLEVEL = {'layout': 'dot', 'layoutargs': '-Goverlap=prism -Gratio=1', 'kwargs': {'se_nodeid': '%(id)i | %(clsname)s', 'se_nodelabel': '%(id)i | {%(clsname)s|[%(label)s]}', 'step_nodeid': '%(id)i | %(clsname)s', 'step_nodelabel': '%(id)i | %(clsname)s'}} # The DAG will be build from 2 types of vertices: # - steps # - assets # Edges will always link of links of 2 different types # FIXME: if the underlying database is changing, this is not updated # (have a lock ? monitor changes ?)
[docs]class StepGraph(object): """ The steps to be performed are stored in a directed acyclic graph (DAG). This graph can be thought of as a two-level graph. The higher level represents the connectivity between steps (we will call supersteps), and the lower-level expands each step into sources, targets, and a step using sources to produce targets. There is a persistent representation (currently a mysql database), and this class is aiming at isolating this implementation detail from a user. """ def __init__(self, persistent_graph): # DAG used to resolve order in which steps should be run dag = networkx.DiGraph() self._dag = dag self._persistent_graph = persistent_graph # cache of DB IDs self._cache_dbid = dict() self._cache_stepvariant_dbid = dict() # build graph from the persistence layer for step in self._persistent_graph.iter_steps(): # step contrains StepConcrete_DbEntry, sources, targets, parameters) self._update_graph(step.step.step_variant_id, step.step.id, step.sources, step.targets) @staticmethod
[docs] def stepconcrete_dirname(stepconcrete_id): """ Name of the directory corresponding to an ID. :param stepconcrete_id: ID for a directory. """ stepconcrete_dirname = 'step_%i' % stepconcrete_id.id return stepconcrete_dirname
[docs] def add(self, step, assets, parameters=tuple(), tag=1, use_cache=True): """ Add a step, associated assets, and optional parameters, to the StepGraph. The task graph is like a directed (presumably) acyclic multilevel graph. Asset vertices are only connected to step vertices (in other words asset vertices represent connective layers between steps). :param step: The step to be added :type step: a :class:`core.StepAbstract` (or of child classes) object :param assets: The assets linked to the step added. If :attr:`assets.target` is undefined, the method will define it with unique identifiers and these will assigned in place. :type assets: a :class:`core.AssetStep` (or of child classes) object :param parameters: Parameters for the step :type parameters: A sequence of :class:`str` elements :param tag: A tag to differentiate repetitions of the exact same task. :rtype: :class:`StepConcrete_DbEntry` as the entry added to the database """ # add assets (note: the _cache ensures uniqueness) # add step (note: the _cache ensures uniqueness) assert isinstance(step, StepAbstract) # if use_cache: task_hashdb = (step.hashdb, assets.source.hashdb, parameters, tag) dbid = self._cache_dbid.get(task_hashdb) if dbid is not None: # We are not done yet as we need to recover the asset.target values for known_target in self._persistent_graph.get_targetassets(dbid): candidate_target = getattr(assets.target, known_target.label) if candidate_target._defined: assert candidate_target.name == known_target.name, "Mismatch between target assets for '%s': previously '%s', now '%s'" % (known_target.label, known_target.entityname, candidate_target.name) else: # update the definition of the target candidate_target.name = known_target.entityname return DbID(dbid, False) # obtain the id for variant dbid = None if use_cache: stepvariant_hashdb = (step.hashdb, tuple(x.value for x in step.activities)) dbid = self._cache_stepvariant_dbid.get(stepvariant_hashdb) if dbid is None: id_stepvariant = self._persistent_graph.id_step_variant(step, step.activities).id # undefined sources is not accepted # (exceptions being the source assets that are optionally None) if any((not y.allownone and not x._defined) for x,y in zip(assets.source, assets.source._sources)): raise ValueError('All sources must be defined.') # retrieve or create the task stepconcrete_id = self._persistent_graph.id_stepconcrete(id_stepvariant, assets.source, assets.target, parameters, tag = tag) stepconcrete_dirname = self.stepconcrete_dirname(stepconcrete_id) absdirname = os.path.join(self._persistent_graph._wd, stepconcrete_dirname) if stepconcrete_id.new: if os.path.isdir(absdirname): raise IOError("The directory %s is already existing." % absdirname) else: os.mkdir(absdirname) else: if not os.path.isdir(absdirname): raise IOError("The directory %s is missing." % absdirname) # loop over targets # # targets_db = list() # # for (label, asset) in zip(targets._fields, targets): # # # each asset can represent several saved objects # # entity_ids = tuple(self.id_stored_entity(*cn).id for cn in asset.iteritems()) # # targets_db.append(entity_ids) # # is this concrete step already known ? # # Look for the sources labelnamepairs = list() for field, t in zip(assets.target._fields, assets.target): if not t._defined: # Generate a presumably unique ID uniquename = str(uuid.uuid1()) if isinstance(t, core.File) and t._extension is not None: uniquename += t._extension[0] uniquename = os.path.join(absdirname, uniquename) # create an entry in the database id_storedentity = self._persistent_graph.id_stored_entity(type(t), uniquename) if not id_storedentity.new: # The newly created entity should be... well, NEW. # if reaching here, we have a (serious) problem raise Exception("Bad, bad, bad... the generated unique name %s is not unique." % uniquename) t.name = uniquename labelnamepairs.append((field, t)) else: # the asset "t" is defined, and sanity checks are (should be?) in self.id_stepconcrete() pass self._persistent_graph._insert_stepconcrete2storedentities(labelnamepairs, 'target', stepconcrete_id.id) self._persistent_graph.connection.commit() sources = list() for x in assets.source: if x is not None: for y in x: sources.append(y) self._update_graph(id_stepvariant, stepconcrete_id.id, tuple(sources), tuple(y for x in assets.target for y in x)) # store in the cache self._cache_dbid[task_hashdb] = stepconcrete_id.id return stepconcrete_id
def _update_graph(self, id_stepvariant, stepconcrete_id, sources, targets): # update the graph dag = self._dag v1 = '%s-%i' % (id_stepvariant, stepconcrete_id) if v1 not in dag: dag.add_node(v1) # add step details (sources and targets) steps_before = set() for src in sources: assert src is not None if src not in dag: dag.add_node(src) vertex = dag[src] # edge between the source file and the step dag.add_edge(src, v1) steps_after = set() for target in targets: assert target is not None if target not in dag: dag.add_node(target) vertex = dag[target] # edge between the step and the target file dag.add_edge(v1, target) def stepcrawler(self): # This is a very rudimentary way to organise tasks. # Bulk synchronous parallel approachs (Google's PREGEL, Apache's Giraf and Hama, Stanford's GPS...) # should be considered if ever becoming a large graph. dag = self._dag # iterate through connected components undag = dag.to_undirected() for cg in networkx.connected_components(undag): # start from a root topo = networkx.topological_sort(cg) root = topo[0] # depth-first search (to get the initial products ASAP) for edge in dag.dfs_edges(root): stepnode = edge[1] useroot_to_do = list() # here the `todo` object contains a list of "immediately next" # steps to be performed for that one root. # Each step can be computed independently #FIXME: shouldn't the logic below be taken out of the crawler, # or in a callback function ? for step in root.iter_steps(): if step.status == _TASK_DONE: continue elif step.status == _TASK_INPROGRESS: # FIXME: have a mechanism to recover from failure, crash, etc... ? continue elif step.status == _TASK_TODO: useroot_to_do.append(step) elif step.status == _TASK_FAILED: raise Exception("Step previously failed: %s" % step) else: # paranoid check raise ValueError("Unknown step %s" % step) yield useroot_to_do # step in that list should be performed before the next iteration pass
[docs] def provenancewalk_storedentity(self, stored_entity, func_stored_entity, func_stored_sequence, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity): """ Walk up the path. :param stored_entity_id: the stored entity to start from :param func_stored_entity: a callback called with each stored entity :param func_step_concrete: a callback called with each step concrete :param func_storedentity_stepconcrete: a callback called with each link between a stored entity and a step concrete :param func_stepconcrete_storedentity: a callback called with each link between a step concrete and a stored entity """ assert hasattr(stored_entity, 'id') storedentity_stack = collections.deque() storedentity_visited = set() step_stack = collections.deque() step_visited = set() if hasattr(stored_entity, 'iter_storedentities'): # sequence: for elt in stored_entity.iter_storedentities(): storedentity_stack.append(elt) storedentity_visited.add(elt.id) func_stored_entity(elt) func_stored_sequence(stored_entity) else: # not a sequence storedentity_stack.append(stored_entity) storedentity_visited.add(stored_entity.id) func_stored_entity(stored_entity) while len(storedentity_stack) > 0: stored_entity = storedentity_stack.popleft() if hasattr(stored_entity, 'iter_storedentities'): # this is a sequence for elt in stored_entity.iter_storedentities(): storedentity_stack.append(elt) storedentity_visited.add(elt.id) func_stored_entity(elt) func_stored_sequence(stored_entity) stored_entity = storedentity_stack.popleft() step_concrete = self._persistent_graph.get_parenttask_of_storedentity(stored_entity) if step_concrete is not None: if step_concrete.id not in step_visited: step_stack.append(step_concrete) func_step_concrete(step_concrete) step_visited.add(step_concrete.id) while len(step_stack) > 0: step_concrete = step_stack.popleft() func_stepconcrete_storedentity(step_concrete, stored_entity) storedentities = self._persistent_graph.get_srcassets(step_concrete.id) for entity in storedentities: if entity.id not in storedentity_visited: storedentity_stack.append(entity) func_stored_entity(entity) storedentity_visited.add(entity.id) #func_stored_entity(entity) func_storedentity_stepconcrete(entity, step_concrete)
def _destination_walk(self, storedentity_stack, storedentity_visited, step_stack, step_visited, func_stored_entity, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity): while len(storedentity_stack) > 0: stored_entity = storedentity_stack.popleft() steps_concrete = self._persistent_graph.get_targetstepconcrete(stored_entity) for entity in steps_concrete: if entity.id not in step_visited: step_stack.append(entity) step_visited.add(entity.id) func_stored_entity(stored_entity) while len(step_stack) > 0: step_concrete = step_stack.popleft() func_step_concrete(step_concrete) func_stepconcrete_storedentity(step_concrete, stored_entity) storedentities = self._persistent_graph.get_targetassets(step_concrete.id) for entity in storedentities: if entity.id not in storedentity_visited: storedentity_stack.append(entity) storedentity_visited.add(entity.id) func_storedentity_stepconcrete(entity, step_concrete) return
[docs] def destinationwalk_stepconcrete(self, step_concrete_id, func_stored_entity, func_stored_sequence, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity): """ Walk down the path.""" storedentity_stack = collections.deque() storedentity_visited = set() step_stack = collections.deque() step_visited = set() #FIXME: clean the code by taking the nested 'while' loop inside-out step_concrete = self._persistent_graph._get_stepconcrete(DbID(step_concrete_id, False)) func_step_concrete(step_concrete) step_visited.add(step_concrete_id) stored_entities = self._persistent_graph.get_targetassets(step_concrete_id) for entity in stored_entities: if entity.id not in storedentity_visited: storedentity_stack.append(entity) storedentity_visited.add(entity.id) else: raise Exception('Step %s has several identical targets.' % str(step_concrete_id)) #func_stepconcrete_storedentity(step_concrete, entity) func_storedentity_stepconcrete(entity, step_concrete) return self._destination_walk(storedentity_stack, storedentity_visited, step_stack, step_visited, func_stored_entity, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity)
[docs] def destinationwalk_storedentity(self, stored_entity, func_stored_entity, func_stored_sequence, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity): """ Walk down the path.""" storedentity_stack = collections.deque() storedentity_visited = set() step_stack = collections.deque() step_visited = set() storedentity_stack.append(stored_entity) storedentity_visited.add(stored_entity.id) return self._destination_walk(storedentity_stack, storedentity_visited, step_stack, step_visited, func_stored_entity, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity)
def _graph_storedentity(self, db_id, func, opposite, display = GRAPHDISPLAY.STEPLEVEL): """Make a provenance graph.""" se_nodeid = display.value['kwargs']['se_nodeid'] se_nodelabel = display.value['kwargs']['se_nodelabel'] step_nodeid = display.value['kwargs']['step_nodeid'] step_nodelabel = display.value['kwargs']['step_nodelabel'] def gst(stored_entity, nodelabel): return nodelabel % dict((x, getattr(stored_entity, x)) for x in ('id', 'clsname', 'label')) def gsc(task, nodelabel): return nodelabel % dict((x, getattr(task, x)) for x in ('id', 'clsname')) dag = networkx.DiGraph() def func_stored_entity(stored_entity): dag.add_node(gst(stored_entity, se_nodeid), label=gst(stored_entity, se_nodelabel), shape='record') def func_stored_sequence(stored_sequence): #dag.add_node(gst(stored_sequence, se_nodelabel), # label=gst(stored_sequence, se_nodelabel), # shape='record') for se in stored_sequence.iter_storedentities(): dag.add_edge(gst(se, se_nodeid), gst(stored_sequence, se_nodeid)) def func_step_concrete(step_concrete): if step_concrete.status == _TASK_STATUS_LIST[_TASK_DONE]: style = 'filled' else: style = '' dag.add_node(gsc(step_concrete, step_nodeid), label=gsc(step_concrete, step_nodelabel), shape='box', style = style, xlabel=str(step_concrete.status)) def func_storedentity_stepconcrete(stored_entity, step_concrete): if opposite: dag.add_edge(gsc(step_concrete, step_nodeid), gst(stored_entity, se_nodeid)) else: dag.add_edge(gst(stored_entity, se_nodeid), gsc(step_concrete, step_nodeid)) def func_stepconcrete_storedentity(step_concrete, stored_entity): if opposite: dag.add_edge(gst(stored_entity, se_nodeid), gsc(step_concrete, step_nodeid)) else: dag.add_edge(gsc(step_concrete, step_nodeid), gst(stored_entity, se_nodeid)) func(db_id, func_stored_entity, func_stored_sequence, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity) return dag def provenancegraph_storedentity(self, stored_entity, display = GRAPHDISPLAY.STEPLEVEL): return self._graph_storedentity(stored_entity, self.provenancewalk_storedentity, False, # opposite display = display) def destinationgraph_stepconcrete(self, step_concrete, display = GRAPHDISPLAY.STEPLEVEL): return self._graph_storedentity(step_concrete, self.destinationwalk_stepconcrete, True, # opposite display = display) def destinationgraph_storedentity(self, stored_entity, display = GRAPHDISPLAY.STEPLEVEL): return self._graph_storedentity(stored_entity, self.destinationwalk_storedentity, True, # opposite display = display)
[docs] def cleantargets_stepconcrete(self, step_concrete_id): """ Clean the targets downstream of a task (step_concrete), which means erasing the target files and (re)setting the tasks' status to 'TO DO'. :param step_concrete_id: A task :type step_concrete_if: a :class:`DbID` (or anything with an attribute :attr:`id`). """ def func_stored_entity(stored_entity): Cls = getattr(self._persistent_graph._model, stored_entity.clsname) instance = Cls(stored_entity.entityname) if hasattr(instance, 'iterlistfiles'): nameiter = (os.path.join(os.path.dirname(instance.name), basename) \ for elt_cls, basename in instance.iterlistfiles()) else: nameiter = iter(instance) for pathname in nameiter: if os.path.isfile(pathname): os.remove(pathname) elif os.path.isdir(pathname): shutil.rmtree(pathname) else: warnings.warn("The pathname '%s' associated with the store entity '%s' cannot be removed." % (pathname, str(stored_entity))) def func_step_concrete(step_concrete): self._persistent_graph.step_concrete_state(step_concrete, _TASK_STATUS_LIST[_TASK_TODO]) def func_storedentity_stepconcrete(stored_entity, step_concrete): pass def func_stepconcrete_storedentity(step_concrete, stored_entity): pass def func_stored_sequence(step_concrete, stored_entity): pass self.destinationwalk_stepconcrete( step_concrete_id.id, func_stored_entity, func_stored_sequence, func_step_concrete, func_storedentity_stepconcrete, func_stepconcrete_storedentity)
import sqlite3 import os sql_fn = os.path.join(os.path.dirname(__file__), 'cache.sql') class StepConcrete_DbEntry(object): __slots__ = ['id', 'status', 'step_variant_id', 'steptype_id', 'executable', 'clsname', 'version', 'parameters'] def __init__(self, id, status, step_variant_id, steptype_id, executable, clsname, version, parameters=()): self.id = id self.status = status self.step_variant_id = step_variant_id self.steptype_id = steptype_id self.executable = executable self.clsname = clsname self.version = version self.parameters = parameters TaskStatusCount = namedtuple('TaskStatus', 'id label count') class StoredEntity(object): __slots__ = ('id', 'label', 'clsname', 'entityname') def __init__(self, id, label, clsname, entityname): self.id = id self.label = label self.clsname = clsname self.entityname = entityname def resurrect(self, model): if self.clsname == 'NoneType': return None elif '.' in self.clsname: module_name, cls_name = self.clsname.rsplit('.', 1) module = import_module(module_name) else: module = model cls_name = self.clsname Cls = getattr(module, cls_name) return Cls(self.entityname) def __str__(self): return '\n'.join((super(StoredEntity, self).__str__(), 'id: %i' % self.id)) class StoredSequence(object): __slots__ = ('id', 'label', 'clsname', 'storedentities') def __init__(self, id, label, clsname, storedentities): self.id = id self.label = label self.clsname = clsname self.storedentities = storedentities def resurrect(self, model): if '.' in self.clsname: module_name, cls_name = self.clsname.rsplit('.', 1) module = import_module(module_name) else: module = model cls_name = self.clsname Cls = getattr(module, cls_name) assert issubclass(Cls, core.FileSequence), 'Expected a FileSequence but got %s' % str(Cls) return Cls(x.resurrect(model) for x in self.storedentities) def iter_storedentities(self): return iter(self.storedentities) def __iter__(self): return self.iter_storedentities() def __len__(self): return len(self.storedentities) def __str__(self): return '\n'.join((super(StoredSequence, self).__str__(), 'id: %i' % self.id, 'length: %i' % len(self)))
[docs]class PersistentTaskGraph(object): """ List of tasks stored on disk. """ StoredEntityNoLabel = namedtuple('StoredEntityNoLabel', 'id clsname entityname') def __init__(self, db_fn, model, wd='.', force_create=False, isolation_level=None): """ :param db_fn: file name for the database. :param wd: working directory (where derived data files are stored) :param force_create: recreate database if a file with the same name is already present. :param isolation_level: passed to :func:`sqlite3.connection`. """ # check whether the file containing the SQLite database exists. # if os.path.exists(db_fn): if os.path.isfile(db_fn): if force_create: create = True else: create = False else: raise ValueError('db_fn should be a file name, not a directory name.') else: create = True self._db_fn = db_fn self._model = model self._model_steplist = unifex._make_stepdict(model) self._wd = wd self.created = create connection = sqlite3.connect(db_fn, isolation_level=isolation_level) self.connection = connection if create: self._create(db_fn, wd) self._fill_tasklist() self._statuslist = None # lazy evaluation def _create(self, db_fn, wd): with open(sql_fn, 'r') as fh: sql = fh.readlines() sql = ''.join(sql) connection = self.connection with connection: for block in sql.split(';'): connection.execute(block) # misc # version sql = """ INSERT INTO misc (tag, description, val) VALUES (?, ?, ?) """ with connection: res = connection.executemany(sql, (('version','Version for the framework', __version__), ('workingdirectory', 'Directory in which files are saved', wd))) # populate possible status for tasks sql = """ INSERT INTO step_status (label) VALUES (?) """ with connection: res = connection.executemany(sql, ((x,) for x in _TASK_STATUS_LIST.keys())) def _fill_tasklist(self): cursor = self.connection.cursor() sql = """ SELECT id, label FROM step_status """ res = cursor.execute(sql) for x in res: _TASK_STATUS_LIST[x[1]] = x[0] def getversion(self): sql = """ SELECT val FROM misc WHERE tag=='version' """ cursor = self.connection.cursor() res = cursor.execute(sql) return res.fetchone()[0] version = property(getversion, None, None, "Version for the database and package (mixing versions comes at one's own risks)")
[docs] def iter_steps(self): """ Iterate through the concrete steps """ sql = """ SELECT step_concrete.id AS id, step_status.label AS status, sv.id, sv.step_type_id, sv.executable, sv.cls, sv.version FROM step_concrete, step_status, step_variant AS sv WHERE step_concrete.step_status_id=step_status.id AND step_concrete.step_variant_id=sv.id ORDER BY step_concrete.id -- ID is autoincremented, so this is like a chronological order """ # If the SQL above changes, the definition of # StepConcrete_DbEntry might have to be updated cursor = self.connection.cursor() steps = cursor.execute(sql) # loop through the steps for row in steps: # for each step, get the sources sc = StepConcrete_DbEntry(*row) sources = tuple(self.get_srcassets(sc.id)) targets = tuple(self.get_targetassets(sc.id)) parameters = tuple(self.get_parameters(sc.id)) yield Step(sc, sources, targets, parameters, self._model)
def _get_assets(self, concrete_step, kind): if hasattr(concrete_step, 'id'): concrete_step_id = concrete_step.id else: concrete_step_id = int(concrete_step) cursor = self.connection.cursor() # First yield stored entities sql_template = """ SELECT stored_entity.id as id, step_concrete2%(kind)sfile.label as label, stored_entity.classname as classname, stored_entity.entityname as entityname FROM step_concrete2%(kind)sfile, stored_entity WHERE step_concrete_id=? AND stored_entity_id=stored_entity.id """ sql = sql_template % {'kind': kind} res = cursor.execute(sql, (concrete_step_id,)) for src in res: res = StoredEntity(*src) yield res # Now yield sequences (of stored entities) sql_template_ss = """ SELECT stored_sequence.id, sc.label as label, stored_sequence.classname as classname FROM step_concrete2%(kind)sfile AS sc JOIN stored_sequence ON stored_sequence.id=sc.stored_sequence_id WHERE step_concrete_id=? """ sql = sql_template_ss % {'kind': kind} sql_se = """ SELECT stored_entity.id as id, stored_entity.classname as classname, stored_entity.entityname as entityname, pos FROM stored_entity2sequence AS se2s JOIN stored_entity ON se2s.stored_entity_id=stored_entity.id WHERE se2s.stored_sequence_id=? ORDER BY id, classname, pos """ res = cursor.execute(sql, (concrete_step_id,)) cursor2 = self.connection.cursor() for src in res: res2 = cursor2.execute(sql_se, (src[0],)) ses = list() for pos, x in enumerate(res2.fetchall()): if x[-1] != pos: raise Exception('The sequence is missing an item in postion %i (and got position %i instead).' % (pos, x[-1])) ses.append(StoredEntity(x[0], None, x[1], x[2])) res = StoredSequence(src[0], src[1], src[2], ses) yield res def task_time_points(self, task_id): cursor = self.connection.cursor() sql = """ SELECT time_creation, time_t0, time_t1 FROM step_concrete WHERE step_concrete.id=? """ cursor.execute(sql, (task_id.id, )) res = cursor.fetchone() return res def set_time_t0(self, task_id, t0): cursor = self.connection.cursor() sql = """ UPDATE step_concrete SET time_t0=? WHERE step_concrete.id=? """ cursor.execute(sql, (t0, task_id.id, )) self.connection.commit() def set_time_t1(self, task_id, t1): cursor = self.connection.cursor() sql = """ UPDATE step_concrete SET time_t1=? WHERE step_concrete.id=? """ cursor.execute(sql, (t1, task_id.id, )) self.connection.commit() def _get_stepconcrete(self, step_concrete_id): cursor = self.connection.cursor() sql = """ SELECT step_concrete.id, step_concrete.step_status_id, step_concrete.step_variant_id, step_type_id, executable, cls, version, step_parameters.json FROM step_concrete INNER JOIN step_status ON step_concrete.step_status_id=step_status.id INNER JOIN step_variant AS sv ON step_concrete.step_variant_id=sv.id INNER JOIN step_concrete2parameters AS sc2p ON sc2p.step_concrete_id=step_concrete.id INNER JOIN step_parameters ON step_parameters_id=step_parameters.id WHERE step_concrete.id=? """ cursor.execute(sql, (step_concrete_id.id, )) row = cursor.fetchone() if row is None: # Trouble. Trying to provide an informative error message sql = """ SELECT * FROM step_concrete WHERE id=? """ cursor.execute(sql) if len(cursor.fetchall()) == 0: raise ValueError("The task ID %i cannot be found." % step_concrete_id.id) else: raise ValueError("Error with the database. Likely because of interrupted process while entering tasks.") res = StepConcrete_DbEntry(*row) return res _SQL_TASK_FROMASSET_TEMPLATE = """ SELECT step_concrete.id, step_concrete.step_status_id, step_concrete.step_variant_id, step_type_id, executable, cls, version, step_parameters.json FROM step_concrete2%(src_or_target)sfile AS sc2f INNER JOIN step_concrete ON sc2f.step_concrete_id=step_concrete.id INNER JOIN step_variant ON step_variant_id=step_variant.id INNER JOIN step_concrete2parameters AS sc2p ON sc2p.step_concrete_id=step_concrete.id INNER JOIN step_parameters ON step_parameters_id=step_parameters.id WHERE sc2f.stored_%(entity_or_sequence)s_id=? """ _SQL_TASK_TOTARGETSEQUENCE = _SQL_TASK_FROMASSET_TEMPLATE % {'src_or_target': 'target', 'entity_or_sequence': 'sequence'} _SQL_TASK_FROMSRCSEQUENCE = _SQL_TASK_FROMASSET_TEMPLATE % {'src_or_target': 'src', 'entity_or_sequence': 'sequence'} _SQL_TASK_TOTARGETSENTITY = _SQL_TASK_FROMASSET_TEMPLATE % {'src_or_target': 'target', 'entity_or_sequence': 'entity'} _SQL_TASK_FROMSRCSENTITY = _SQL_TASK_FROMASSET_TEMPLATE % {'src_or_target': 'src', 'entity_or_sequence': 'entity'} def _get_stepconcrete_from_storedentity(self, stored_entity_id, sql): """ Retrieve the step_concrete object(s) linked with a stored_entity_id. Whenever :param:`kind` is equal to "target" there will be at most one stored_entity (if there is more one, the database has a consistentcy issue). :param stored_entity_id: the ID for a stored_entity :param kind: either 'src' or 'target' """ cursor = self.connection.cursor() res = cursor.execute(sql, (stored_entity_id,)) for src in res: res = StepConcrete_DbEntry(*src) yield res
[docs] def get_parenttask_of_storedentity(self, stored_entity): """ Return the task producing a stored entity. There should obviously only be one such task, and an Exception is raised if not the case. :param stored_entity: the stored entity in the database :type stored_entity: :class:`StoredEntity` :rtype: a `StepConcrete_DbEntry` :class:`namedtuple`, or None """ assert hasattr(stored_entity, 'id') step_concrete = None if hasattr(stored_entity, 'iter_storedentities'): raise ValueError('Only atomic stored entities are accepted.') else: sql = self._SQL_TASK_TOTARGETSENTITY for i, step_concrete in enumerate(self._get_stepconcrete_from_storedentity(stored_entity.id, sql)): if i > 0: raise Exception(""" Consistency issue with the database. More than one step is claiming to be the source of the stored_entity_id "%s" """ % str(stored_entity.id)) return step_concrete
[docs] def get_targetstepconcrete(self, stored_entity): """ Return the tasks using a given stored entity. :param stored_entity: the stored entity in the database. :type stored_entity: can be :class:`StoredEntity` or :class:`StoredSequence` :rtype: a `SepConcrete_DbEntry` :class:`namedtuple`, or None """ if hasattr(stored_entity, 'iter_storedentities'): raise NotImplementedError('Handling sequence as source assets is not yet implemented.') else: sql = self._SQL_TASK_FROMSRCSENTITY return tuple(self._get_stepconcrete_from_storedentity(stored_entity.id, sql))
[docs] def get_srcassets(self, concrete_step_id): """ Return the source files for a given concrete step ID. :param concrete_step_id: ID for the concrete step in the database. :rtype: generator """ return self._get_assets(concrete_step_id, 'src')
[docs] def get_targetassets(self, concrete_step_id): """ Return the target files for a given concrete step ID. :param concrete_step_id: ID for the concrete step in the database. :rtype: generator """ return self._get_assets(concrete_step_id, 'target')
def get_parameters(self, concrete_step_id): cursor = self.connection.cursor() sql = """ SELECT step_parameters.id as id, step_parameters.json as json FROM step_concrete2parameters, step_parameters WHERE step_concrete_id=? AND step_parameters_id=step_parameters.id """ res = cursor.execute(sql, (concrete_step_id,)) return res.fetchall() def get_statuslist(self): if self._statuslist is None: cursor = self.connection.cursor() sql = """ SELECT * from step_status """ res = cursor.execute(sql) self._statuslist = tuple(res) return self._statuslist statuslist = property(get_statuslist, None, None, """ Status list """)
[docs] def id_stepparameters(self, parameters): """ Conditionally add parameters (add only if not already present) :param parameters: sequence of parameters :rtype: ID for the pattern as a :class:`DbID`. """ cursor = self.connection.cursor() param_json = json.dumps(parameters) sql = """ SELECT id FROM step_parameters WHERE json=? """ res = cursor.execute(sql, (param_json,)) res = cursor.fetchone() if res is None: sql = """ INSERT INTO step_parameters ( json ) VALUES ( ?); """ cursor.execute(sql, (param_json,)) db_id = cursor.lastrowid self.connection.commit() res = DbID(db_id, True) else: res = DbID(res[0], False) return res
[docs] def id_stored_entity(self, cls, name): """ Conditionally add a stored entity (add only if not already present) :param cls: Python class for the stored entity :param name: Parameter "name" for the class "cls". :rtype: ID for the pattern as a :class:`DbID`. """ cursor = self.connection.cursor() sql = """ SELECT id FROM stored_entity WHERE classname=? AND entityname=?; """ res = cursor.execute(sql, (cls.__name__, name)) res = cursor.fetchone() if res is None: sql = """ INSERT INTO stored_entity ( classname, entityname ) VALUES ( ?, ?); """ try: cursor.execute(sql, (cls.__name__, name)) except sqlite3.IntegrityError as ie: # print an informative message before propagating the exception sql = """ SELECT classname FROM stored_entity WHERE entityname=? """ res = cursor.execute(sql, (name,)) res = cursor.fetchone() msg = 'ERROR: The descriptor "%s" for the type "%s" is already in use with type "%s"' % (name, cls.__name__, res[0]) print(msg) raise(ie) db_id = cursor.lastrowid self.connection.commit() res = DbID(db_id, True) else: res = DbID(res[0], False) return res
[docs] def id_stored_sequence(self, cls, clsname_sequence): """ Conditionally add a sequence of stored entities (add only if not already present) :param clsname_sequence: Sequence of pairs (Python class for the stored entity, parameter "name" for the class "cls") :rtype: ID for the sequence as a :class:`DbID`. """ cursor = self.connection.cursor() sql = """ SELECT stored_sequence_id FROM stored_entity2sequence WHERE stored_entity_id=? AND pos=?; """ stored_entity_ids = list() candidates = None # iterate through the entities in the sequence for pos, (cls_elt, name_elt) in enumerate(clsname_sequence): # ensure that the entity is tracked se_id = self.id_stored_entity(cls_elt, name_elt) stored_entity_ids.append(se_id) res = cursor.execute(sql, (se_id.id, pos)) if candidates is None: candidates = set(x[0] for x in cursor.fetchall()) else: candidates = candidates.intersection(set(x[0] for x in cursor.fetchall())) if candidates is None or len(candidates) == 0: sql = """ INSERT INTO stored_sequence ( classname ) VALUES ( ?); """ cursor.execute(sql, (cls.__name__, )) db_id = cursor.lastrowid # sql = """ INSERT INTO stored_entity2sequence ( stored_sequence_id, stored_entity_id, pos ) VALUES ( ?, ?, ?); """ for pos, (se_id) in enumerate(stored_entity_ids): res = cursor.execute(sql, (db_id, se_id.id, pos)) self.connection.commit() res = DbID(db_id, True) elif len(candidates) == 1: res = DbID(next(iter(candidates)), False) else: raise Exception("Consistency issue with the database.") return res
[docs] def id_step_activity(self, activity): """ Conditionally add an activity (add only if not already present) :param activity: one actibity name :rtype: ID for the activity as an integer """ cursor = self.connection.cursor() sql = """ SELECT id FROM step_activity WHERE label=?; """ res = cursor.execute(sql, (activity.value,)) res = cursor.fetchone() if res is None: sql = """ INSERT INTO step_activity (label) VALUES (?); """ cursor.execute(sql, (activity.value,)) res = cursor.lastrowid self.connection.commit() res = DbID(res, True) else: res = DbID(res[0], False) return res
[docs] def id_step_type(self, activities): """ Conditionally add a step type (add only if not already present). :param activities: sequence of activity names :rtype: ID for the step type as an integer """ assert len(activities) > 0, "The number of activities must be > 0" cursor = self.connection.cursor() # first get the activity IDs activity_ids = tuple(self.id_step_activity(x) for x in activities) steptype_id = None if not any(x.new for x in activity_ids): # query if any step_type already has all the associated activities # The if statement saves that trouble if any activity had to be created # (since then the step_type cannot possibly be already in the database) sql_activity = """SELECT step_type_id FROM step_type2activity WHERE step_activity_id = %i""" sql = """ SELECT step_type_id FROM (SELECT step_type_id FROM step_type2activity WHERE step_activity_id IN """ + \ ' (' + '\nINTERSECT\n'.join((sql_activity % dbid.id) for dbid in activity_ids) + ')' + \ """ EXCEPT SELECT step_type_id FROM step_type2activity WHERE step_activity_id NOT IN (%s) ) """ % ','.join(str(dbid.id) for dbid in activity_ids) res = cursor.execute(sql) steptype_id = res.fetchall() if steptype_id is None or len(steptype_id) == 0: # if no activity ID, we should create one sql = """ INSERT INTO step_type VALUES(null); """ res = cursor.execute(sql) steptype_id = cursor.lastrowid sql = """ INSERT INTO step_type2activity (step_type_id, step_activity_id) VALUES (?, ?); """ for activity_id in activity_ids: cursor.execute(sql, (steptype_id, activity_id.id)) self.connection.commit() return DbID(steptype_id, True) elif len(steptype_id) > 1: #FIXME: if more than one steptype_id found, we have a problem raise Exception("Houston, we have problem. We have several step types (%s) with all the activities." % repr(activities)) steptype_id = steptype_id[0][0] return DbID(steptype_id, False)
def _iter_step_type(self): sql = """ SELECT step_type.id, activity FROM step_type INNER JOIN step_type2activity ON step_type2activity.step_type_id=step_type.id INNER JOIN step_activity ON step_type2activity.step_activity_id=step_activity.id GROUP BY step_type.id """ cursor = self.connection.cursor() cursor.execute(sql) while True: rows = cursor.fetchmany(100) if not rows: break for row in rows: yield row
[docs] def id_step_variant(self, step, activities, # list of activities the step is covering (step_id inferred from that) ): """ Return a database ID for the step variant (creating a new ID only of the variant is not already tracked) :param step: a step :type step: :class:`core.StepAbstract` :param activities: a sequence of activity names :rtype: ID for a step variant as an :class:`int`. """ assert isinstance(step, StepAbstract) executable = step._execpath version = step.version step_type_id = self.id_step_type(activities) cursor = self.connection.cursor() step_variant_id = None if not step_type_id.new: # not a new step_id, so may be the step_variant is known as well if executable is None: sql = """ SELECT id FROM step_variant WHERE step_type_id=? AND executable IS NULL AND cls=? AND version=? """ t = type(step) sql_params = (step_type_id.id, '.'.join((t.__module__, t.__name__)), version) else: sql = """ SELECT id FROM step_variant WHERE step_type_id=? AND executable=? AND cls=? AND version=? """ t = type(step) sql_params = (step_type_id.id, executable, '.'.join((t.__module__, t.__name__)), version) res = cursor.execute(sql, sql_params) step_variant_id = res.fetchone() # FIXME: test if more than one ? if step_variant_id is None: step_variant_id = None else: step_variant_id = step_variant_id[0] # ID only if step_variant_id is None: sql = """ INSERT INTO step_variant (step_type_id, executable, cls, version) VALUES (?, ?, ?, ?) """ t = type(step) res= cursor.execute(sql, (step_type_id.id, executable, '.'.join((t.__module__, t.__name__)), version)) step_variant_id = cursor.lastrowid self.connection.commit() return DbID(step_variant_id, True) else: return DbID(step_variant_id, False)
@property def nconcrete_steps(self): cursor = self.connection.cursor() sql = """ SELECT COUNT(id) FROM step_concrete """ res = cursor.execute(sql) return res.fetchone()[0] @property def nconcrete_steps_status(self): cursor = self.connection.cursor() sql = """ SELECT step_status.id, step_status.label, COUNT(*) FROM step_concrete, step_status WHERE step_concrete.step_status_id=step_status.id GROUP BY step_status.id """ res = cursor.execute(sql) return tuple(TaskStatusCount(*x) for x in res.fetchall())
[docs] def id_stepconcrete(self, step_variant_id, sources, targets, parameters, tag = 1): """ Conditionally add a task ("concrete" step), that is a step variant (executable and parameters) to which source and target files, as well as parameters, are added. :param step_variant_id: ID for the step variant :type step_variant_id: integer :param sources: sequence of sources :type sources: :class:`AssetSet` :param targets: sequence of targets :type targets: :class:`AssetSet` :param parameters: list of parameters :type parameters: a sequence of :class:`str` :param tag: a tag, used to performed repetitions of the exact same task :type tag: a sequence of :class:`int` :rtype: :class:`DbID` """ assert isinstance(step_variant_id, int) cursor = self.connection.cursor() # Initialize at step_concrete_id to None. If still None later, # it will mean that there is no matching task already tracked. step_concrete_id = None #FIXME: transaction (in case one of the inserts fails) # DB IDs for all sources # ID for the parameters parameters_id = self.id_stepparameters(parameters) if len(sources) == 0: # special case: no sources # Query whether the task is already known sql = """ SELECT id, step_variant_id, step_status_id FROM step_concrete WHERE step_variant_id=? AND tag=? AND id NOT IN (SELECT id FROM step_concrete2srcfile) GROUP BY step_concrete.id """ cursor.execute(sql, (step_variant_id, tag)) res = cursor.fetchall() if len(res) == 1: step_concrete_id = res[0][0] elif len(res) == 0: # The task is not known (step_concrete_id is already set to None) pass elif len(res) > 1: # Several tasks are matching. This is not good. raise Exception("Serious trouble.") else: raise Exception("We should never be here.") else: # Query whether the task is already known. Information about the provenance # (including parameters) is enough. sources_db = list() sql_template = """ SELECT step_concrete.id, step_variant_id, step_status_id, stored_entity_id, stored_sequence_id FROM step_concrete, step_concrete2srcfile, step_concrete2parameters WHERE step_variant_id=? AND step_parameters_id=? AND label=? AND step_concrete2parameters.step_concrete_id=step_concrete.id AND step_concrete2srcfile.step_concrete_id=step_concrete.id AND stored_entity_id%(se)s AND stored_sequence_id%(ss)s AND step_concrete.tag=? """ sql_se = sql_template % {'se': '=?', 'ss': ' IS NULL'} sql_ss = sql_template % {'ss': '=?', 'se': ' IS NULL'} candidates = None # candidate known tasks for (label, asset, assetattr) in zip(sources._fields, sources, sources._sources): # loop over the source assets if asset is None and assetattr.allownone: continue asset_items = tuple(cn for cn in asset.iteritems()) if isinstance(asset, core.FileSequence): # if a sequence we also need to ensure that the sequence itself is tracked sequence_id = self.id_stored_sequence(type(asset), asset.iteritems()).id se_id = None etid = (step_variant_id, parameters_id.id, label, #se_id, sequence_id, tag) sql = sql_ss else: # if not a sequence we need to ensure that object (file) is tracked sequence_id = None for se_i, cn in enumerate(asset.iteritems()): if se_i > 0: # we should never reach here, because it was tested above raise Exception("Only FileSequence objects should have several saved entities.") # ensure the that the stored ID for this asset is tracked in the DB se_id = self.id_stored_entity(*cn).id # create a tuple of parameters for the query that selects stored entities. etid = (step_variant_id, parameters_id.id, label, se_id, #sequence_id, tag) sql = sql_se # query with parameters cursor.execute(sql, etid) # retrieve the task_id using this source asset tmp = set(x[0] for x in cursor.fetchall()) if candidates is None: # first iteration, set is whatever is retrieved candidates = tmp else: # otherwise, the candidate task_ids are the intersection of candidates so far and what was returned candidates = candidates.intersection(tmp) if len(candidates) == 0: # no candidate left: the task is not yet tracked. break if len(candidates) > 1: raise Exception("DB consistency error: several candidate tasks were found.") elif len(candidates) == 1: step_concrete_id = next(iter(candidates)) else: # step_concrete_id remains None if step_concrete_id is not None: raise Exception('') if step_concrete_id is not None: # the concrete step is already in the database res = DbID(step_concrete_id, False) # The following is a check of the targets. target_entities = tuple(self.get_targetassets(res)) if len(target_entities) != len(targets): raise ValueError(''.join(('The task was already stored but with a different number of targets ', '(%i, but now %i).' % (len(target_entities), len(targets))))) for entity in target_entities: targetasset = getattr(targets, entity.label) if targetasset._defined: # Check that this is matching the same as what is in the database # Being thorough is relatively important here if isinstance(entity, StoredSequence): raise NotImplementedError('Sequence assets in targets is not yet handled.') # check that constructor's attribute is the same: if targetasset.name != entity.entityname or type(targetasset).__name__ != entity.clsname: raise ValueError(''.join(('The task was already stored but with different values for the target "%s"' % entity.label, ' (%s(%s) instead of %s(%s)).' % (entity.clsname, entity.entityname, type(targetasset).__name__, targetasset.name)))) else: if isinstance(entity, StoredSequence): # raise NotImplementedError('Sequence assets in targets is not yet handled.') else: # set it to what is in the database targetasset.name = entity.entityname else: # the concrete step was never seen before param_json = json.dumps(parameters_id) res = self._add_stepconcrete(step_variant_id, sources, targets, parameters_id, tag = tag) return res
def _add_stepconcrete(self, step_variant_id, sources, targets, parameters_id, tag = 1): """ Add a "concrete" step, that is a step variant (executable and parameters) to which source and target files are added. :param step_variant_id: ID for the step variant :type step_variant_id: integer :param sources: sequence of sources :type sources: :class:`AssetSet` :param targets: sequence of targets :type targets: :class:`AssetSet` :param parameters_id: ID for the parameters :type tag: A tag for the task :param tag: :class:`int` :rtype: :class:`DbID` """ assert isinstance(step_variant_id, int) cursor = self.connection.cursor() #FIXME: transaction (in case one of the inserts fails) only committed # before exiting this method, but can this be bypassed by SQLite global settings ? # (I am relying on the current default) sql = """ INSERT INTO step_concrete ( step_variant_id, step_status_id, time_creation, tag ) VALUES ( ?, ?, ?, ?); """ # last update time set to NULL #FIXME: try/catch. since step_variant_id is a foreign key, # this will fail if there is no such step_variant_id cursor.execute(sql, (step_variant_id, _TASK_STATUS_LIST[_TASK_TODO], time.time(), tag)) step_concrete_id = DbID(cursor.lastrowid, True) labelnamepairs = zip(sources._fields, sources) self._insert_stepconcrete2storedentities(labelnamepairs, 'src', step_concrete_id.id) # ensure that defined target are tracked labelnamepairs = list() for field, t in zip(targets._fields, targets): if t._defined: labelnamepairs.append((field, t)) self._insert_stepconcrete2storedentities(labelnamepairs, 'target', step_concrete_id.id) self.connection.commit() self._insert_stepconcrete2parameters(parameters_id.id, step_concrete_id.id) self.connection.commit() return step_concrete_id #return Step(step_concrete_id, sources, targets) def _insert_stepconcrete2parameters(self, parameters_id, step_concrete_id): """ WARNING: add a commit after the call !!!""" cursor = self.connection.cursor() sql = """ INSERT INTO step_concrete2parameters ( step_concrete_id, step_parameters_id ) VALUES ( ?, ?); """ cursor.execute(sql, (step_concrete_id, parameters_id)) def _insert_stepconcrete2storedentities(self, labelnamepairs, what, step_concrete_id): """ WARNING: add a commit in caller after this returns !!!""" assert what in ('src', 'target') cursor = self.connection.cursor() sql_template = """ INSERT INTO step_concrete2%(what)sfile ( label, step_concrete_id, stored_%(the)s_id ) VALUES ( ?, ?, ?); """ # 2 SQL query: one for assets that are sequences and one for assets that are scalars. sql_ss = sql_template % {'what': what, 'the': 'sequence'} sql_se = sql_template % {'what': what, 'the': 'entity'} for (label, asset) in labelnamepairs: if isinstance(asset, core.FileSequence): # if a sequence we also need to create the sequence itself sequence_id = self.id_stored_sequence(type(asset), asset.iteritems()).id se_id = None etid = (label, step_concrete_id, # stored_entity sequence_id) sql = sql_ss else: sequence_id = None if asset is None: # if the asset is not defined here, # it can / should only be the case # because it is allowed to be None continue #se_id = self.id_stored_entity(type(asset), None).id else: for se_i, cn in enumerate(asset.iteritems()): if se_i > 0: raise Exception("Only FileSequence object should have several saved entities.") # ensure the that the stored IDs are tracked in the DB se_id = self.id_stored_entity(*cn).id etid = (label, step_concrete_id, se_id # stored_sequence ) sql = sql_se res = cursor.execute(sql, etid) #labelstepandnames = tuple( for cn in asset.iteritems()) #cursor.executemany(sql, #labelstepandnames) def step_concrete_info(self, step_concrete_id): sql = """ SELECT step_variant_id, step_status.label, time_creation FROM step_concrete, step_status WHERE step_concrete.id=? AND step_status.id = step_status_id """ cursor = self.connection.cursor() cursor.execute(sql, (step_concrete_id.id, )) res = cursor.fetchone() return res
[docs] def step_concrete_state(self, step_concrete_id, state_id): """ Set the state of a task: - step_concrete_id: task ID (DbID) - state_id: state ID """ sql = """ UPDATE step_concrete SET step_status_id=? WHERE step_concrete.id=? """ cursor = self.connection.cursor() res = cursor.execute(sql, (state_id, step_concrete_id.id)) self.connection.commit() #FIXME: return anything ?
def step_concrete_status(self, step_concrete_id): sql = """ SELECT step_status.id, step_status.label FROM step_concrete INNER JOIN step_status ON step_status_id=step_status.id WHERE step_concrete.id=? """ cursor = self.connection.cursor() cursor.execute(sql, (step_concrete_id.id,)) return cursor.fetchall() def _get_assetsofactivity(self, activity, what): """ Find the assets associated with steps performing a specific activity. :param activity: an activity :type activity: :class:`Enum` """ cursor = self.connection.cursor() # -- find step type IDs performing a specific activity sql = """ SELECT DISTINCT step_type_id FROM step_activity, step_type2activity WHERE label=? AND step_activity.id=step_type2activity.step_activity_id """ res = cursor.execute(sql, (activity.value,)) #FIXME: check that activity with that label found ? step_type_ids = res.fetchall() # -- find concrete step IDs with that activity sql = """ SELECT step_concrete.id FROM step_variant, step_concrete, step_concrete2%(what)sfile WHERE step_type_id=? AND step_variant.id=step_variant_id AND step_concrete.id=step_concrete2%(what)sfile.step_concrete_id """ % {'what': what} step_concrete_ids = list() for step_type_id in step_type_ids: res = cursor.execute(sql, step_type_id) step_concrete_ids.extend(res.fetchall()) # -- find the stored entities resulting from the concrete steps above results = list() for task_dbentry in step_concrete_ids: res = self.get_targetassets(task_dbentry[0]) results.extend(res) # -- return tuple(results)
[docs] def get_sourcesofactivity(self, activity): """ Retrieve the sources of steps performing a specific activity. :param activity: an activity :type activity: :class:`Enum` """ return self._get_assetsofactivity(activity, 'src')
def find_targetsofactivity(self, activity): warnings.warn('find_targetsofactivity() is deprecated, use get_targetsofactivity().') return self._get_assetsofactivity(activity, 'target')
[docs] def get_targetsofactivity(self, activity): """ Retrieve the targets of steps performing a specific activity. :param activity: an activity :type activity: :class:`Enum` """ cursor = self.connection.cursor() # -- find step type IDs performing a specific activity sql = """ SELECT DISTINCT step_type_id FROM step_activity, step_type2activity WHERE label=? AND step_activity.id=step_type2activity.step_activity_id """ res = cursor.execute(sql, (activity.value,)) step_type_ids = res.fetchall() if len(step_type_ids) == 0: sql = """ SELECT DISTINCT id FROM step_activity WHERE label=? """ res = cursor.execute(sql, (activity.value,)) step_activity_ids = res.fetchall() if len(step_activity_ids) == 0: raise ValueError("The activity '%s' is not in the database." % activity.value) else: return tuple() # -- find task IDs (concrete steps) with that activity sql = """ SELECT DISTINCT step_concrete.id FROM step_variant, step_concrete, step_concrete2targetfile WHERE step_type_id=? AND step_variant.id=step_variant_id AND step_concrete.id=step_concrete2targetfile.step_concrete_id """ step_concrete_ids = list() for step_type_id in step_type_ids: res = cursor.execute(sql, step_type_id) step_concrete_ids.extend(res.fetchall()) # -- find the stored entities created by the task above results = list() for task_dbentry in step_concrete_ids: res = self.get_targetassets(task_dbentry[0]) results.extend(res) return tuple(results)
[docs] def get_targetsoftype(self, clsname): """ Return all targets of a given type. """ cursor = self.connection.cursor() sql = """ SELECT stored_entity.id, classname, entityname FROM stored_entity INNER JOIN step_concrete2targetfile ON stored_entity.id=step_concrete2targetfile.stored_entity_id WHERE stored_entity.classname=? """ res = cursor.execute(sql, (clsname,)) results = tuple(self.StoredEntityNoLabel(*x) for x in res.fetchall()) return results
[docs] def iter_finaltargets(self): """ Targets not used as source anywhere else. """ cursor = self.connection.cursor() sql = """ SELECT se.id, se.classname, se.entityname, step_concrete_id, label FROM stored_entity AS se INNER JOIN ( -- select targets not used used as source SELECT trg.stored_entity_id, trg.step_concrete_id FROM step_concrete2targetfile AS trg LEFT JOIN step_concrete2srcfile AS src ON trg.stored_entity_id=src.stored_entity_id WHERE src.stored_entity_id IS NULL ) ON se.id=stored_entity_id INNER JOIN step_concrete ON step_concrete.id=step_concrete_id INNER JOIN step_status ON step_status_id=step_status.id ORDER BY se.id """ cursor.execute(sql) Target = namedtuple('Target', 'stored_entity_id stored_entity_classname stored_entity_entityname step_concrete_id status_label') for x in cursor: yield Target(*x)
[docs] def finalsteps(self): """ Concrete steps for which all targets are final """ # concrete steps for which all targets are final raise NotImplementedError()
class CachedPersistentTaskGraph(PersistentTaskGraph): def __init__(self, db_fn, model, wd='.', force_create=False, isolation_level=None): super(CachedPersistentTaskGraph, self).__init__(db_fn, model, wd=wd, force_create=force_create, isolation_level=isolation_level) # cache of DB IDs self._cache_steptype_dbid = dict() self._cache_task_dbid = dict() self._cache_dbid_task_targets = dict() self._cache_stepvariant_dbid = dict() self._cache_dbid_stepvariant = dict() self._cache_stepparameters_dbid = dict() self._cache_storedentity_dbid = dict() self._cache_storedsequence_dbid = dict() def id_step_type(self, activities): hashdb = activities dbid = self._cache_steptype_dbid.get(hashdb) if dbid is not None: res = DbID(dbid, False) else: res = super(CachedPersistentTaskGraph, self).id_step_type(activities) self._cache_steptype_dbid[hashdb] = res.id return res def _cache_id_step_type(self): for dbid, activities in itertools.groupby(operator.itemgetter(0), self._iter_step_type()): hashdb = tuple(activities) self._cache_steptype_dbid[hashdb] = dbid def id_step_variant(self, step, activities): cls = type(step) #FIXME: redundancy between the full class name and the activities) step_hashdb = (cls.__module__ + '.' + cls.__name__, activities) variant_hashdb = (step.version, step._execpath) hashdb = (step_hashdb, variant_hashdb) dbid = self._cache_stepvariant_dbid.get(hashdb) if dbid is not None: res = DbID(dbid, False) else: res = super(CachedPersistentTaskGraph, self).id_step_variant(step, activities) self._cache_stepvariant_dbid[hashdb] = res.id self._cache_dbid_stepvariant[res.id] = (step_hashdb, variant_hashdb) return res def id_stepconcrete(self, step_variant_id, sources, targets, parameters, tag = 1): """ Conditionally add a task ("concrete" step), that is a step variant (executable and parameters) to which source and target files, as well as parameters, are added. :param step_variant_id: ID for the step variant :type step_variant_id: integer :param sources: sequence of sources :type sources: :class:`AssetSet` :param targets: sequence of targets :type targets: :class:`AssetSet` :param parameters: list of parameters :type parameters: a sequence of :class:`str` :param tag: a tag, used to performed repetitions of the exact same task :type tag: a sequence of :class:`int` :rtype: :class:`DbID` """ assert isinstance(step_variant_id, int) step_variant_hashdb = self._cache_dbid_stepvariant.get(step_variant_id) if step_variant_hashdb is None: raise ValueError('There is no step variant with ID %i' % step_variant_id) step_hashdb, variant_hashdb = step_variant_hashdb task_hashdb = (step_hashdb, sources.hashdb, parameters, tag) dbid = self._cache_task_dbid.get(task_hashdb) if dbid is not None: res = DbID(dbid, False) # Some of the targets might be undefined (because the user leaves it to railroadtracks to # fill the blanks). We need to populate these with the values in the database in order to # be able to compute the hash. if any(not getattr(targets, x.label)._defined for x in self.get_targetassets(res)): # FIXME: for now just hit the database (no use of the cache) res = super(CachedPersistentTaskGraph, self).id_stepconcrete(step_variant_id, sources, targets, parameters, tag=tag) elif dbid in self._cache_dbid_task_targets: # check that the target assets have not changed if targets.hashdb != self._cache_dbid_task_targets[dbid]: raise ValueError("Target assets not matching the assets already associated with the task.") else: self._cache_dbid_task_targets[res.id] = targets.hashdb else: res = super(CachedPersistentTaskGraph, self).id_stepconcrete(step_variant_id, sources, targets, parameters, tag=tag) self._cache_task_dbid[task_hashdb] = res.id if all(x._defined for x in targets): self._cache_dbid_task_targets[res.id] = targets.hashdb return res def id_stepparameters(self, parameters): dbid = self._cache_stepparameters_dbid.get(parameters) if dbid is not None: res = DbID(dbid, False) else: res = super(CachedPersistentTaskGraph, self).id_stepparameters(parameters) self._cache_stepparameters_dbid[parameters] = res.id return res def id_stored_entity(self, cls, name): hashdb = core.SavedEntityAbstract._hash_components(cls, name) dbid = self._cache_storedentity_dbid.get(hashdb) if dbid is not None: res = DbID(dbid, False) else: res = super(CachedPersistentTaskGraph, self).id_stored_entity(cls, name) self._cache_storedentity_dbid[hashdb] = res.id return res def id_stored_sequence(self, cls, clsname_sequence): names_hashdb = list() clsname_sequence_tpl = tuple(clsname_sequence) for (elt_cls, elt_name) in clsname_sequence_tpl: dbid = self.id_stored_entity(elt_cls, elt_name) item_hashdb = core.SavedEntityAbstract._hash_components(elt_cls, elt_name) names_hashdb.append(item_hashdb) hashdb = (cls, tuple(names_hashdb)) dbid = self._cache_storedsequence_dbid.get(hashdb) if dbid is not None: res = DbID(dbid, False) else: res = super(CachedPersistentTaskGraph, self).id_stored_sequence(cls, clsname_sequence_tpl) self._cache_storedsequence_dbid[hashdb] = res.id return res