Source code for stalker.models.schedulers

# -*- coding: utf-8 -*-
# Stalker a Production Asset Management System
# Copyright (C) 2009-2017 Erkan Ozgur Yilmaz
#
# This file is part of Stalker.
#
# Stalker is free software: you can redistribute it and/or modify
# it under the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# Stalker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# Lesser GNU General Public License for more details.
#
# You should have received a copy of the Lesser GNU General Public License
# along with Stalker.  If not, see <http://www.gnu.org/licenses/>

import os
import subprocess
import tempfile
import datetime
import time
import csv

import pytz

from stalker.log import logging_level

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging_level)


[docs]class SchedulerBase(object): """This is the base class for schedulers. All the schedulers should be derived from this class. """
[docs] def __init__(self, studio=None): self._studio = None self.studio = studio
def _validate_studio(self, studio_in): """validates the given studio_in value """ if studio_in is not None: from stalker import Studio if not isinstance(studio_in, Studio): raise TypeError( '%s.studio should be an instance of ' 'stalker.models.studio.Studio, not %s' % (self.__class__.__name__, studio_in.__class__.__name__) ) return studio_in @property def studio(self): """studio getter """ return self._studio @studio.setter def studio(self, studio_in): """studio setter """ self._studio = self._validate_studio(studio_in)
[docs] def schedule(self): """the main scheduling function should be implemented in the derivatives """ raise NotImplementedError
[docs]class TaskJugglerScheduler(SchedulerBase): """This is the main scheduler for Stalker right now. This class prepares the data for TaskJuggler and let it solve the scheduling problem, and then retrieves the solved date and resource data back. TaskJugglerScheduler needs a :class:`.Studio` instance to work with, it will create a .tjp file and then solve the tasks and restore the computed_start and computed_end dates and the computed_resources attributes for each task. Stalker will pass all its data to TaskJuggler by creating a tjp file that TaskJuggler can parse. This tjp file has all the Projects, Tasks, Users, Departments, TimeLogs, Vacations and everything that TJ need for solving the tasks. With every new version of it, Stalker tries to cover more and more TaskJuggler directives. .. note:: .. versionadded:: 0.2.5 Alternative Resources Stalker is now able to pass alternative resources to TaskJuggler. Although, per resource alternatives are not yet possible, it will be implemented in future versions of Stalker. .. note:: .. versionadded:: 0.2.5 Task Dependency Relation Attributes Stalker now can use 'gapduration', 'gaplength', 'onstart' and 'onend' TaskJuggler directives for each dependent task of a task. Use the TaskDependency instance in Task.task_dependency attribute to control how a particular task is depending to another task. .. warning:: **Task.computed_resources Attribute Content** After the scheduling is finished, TaskJuggler will create a ``csv`` report that TaskJugglerScheduler will parse. This csv file contains the ``id``, ``start date``, ``end date`` and ``resources`` data. The resources reported back by TJ will be stored in :attr:`.Task.computed_resources` attribute. TaskJuggler will put all the resources who may have entered a :class:`.TimeLog` previously to the csv file. But the resources from the csv file may not be in :attr:`.Task.resources` or :attr:`.Task.alternative_resources` anymore. Because of that, TaskJugglerScheduler will only store the resources those are both in csv file and in :attr:`.Task.resources` or :attr:`.Task.alternative_resources` attributes. Stalker will export each Project to tjp as the highest task in the hierarchy and all the projects will be combined in to the same tjp file. Combining all the Projects in one tjp file has a very nice side effect, projects using the same resources will respect their allocations to the resource. So that when a TaskJugglerScheduler instance is used to schedule the project, all projects are scheduled together. The following table shows which Stalker data type is converted to which TaskJuggler type: +------------+-------------+ | Stalker | TaskJuggler | +============+=============+ | Studio | Project | +------------+-------------+ | Project | Task | +------------+-------------+ | Task | Task | +------------+-------------+ | Asset | Task | +------------+-------------+ | Shot | Task | +------------+-------------+ | Sequence | Task | +------------+-------------+ | Departmemt | Resource | +------------+-------------+ | User | Resource | +------------+-------------+ | TimeLog | Booking | +------------+-------------+ | Vacation | Vacation | +------------+-------------+ :param bool compute_resources: When set to True it will also consider :attr:`.Task.alternative_resources` attribute and will fill :attr:`.Task.computed_resources` attribute for each Task. With :class:`.TaskJugglerScheduler` when the total number of Task is around 15k it will take around 7 minutes to generate this data, so by default it is False. :param int parsing_method: Choose between SQL (0) or Pure Python (1) parsing. The default is SQL. """
[docs] def __init__(self, studio=None, compute_resources=False, parsing_method=0, projects=None): super(TaskJugglerScheduler, self).__init__(studio) self.tjp_content = '' self.temp_file_full_path = None self.temp_file_path = None self.temp_file_name = None self.tjp_file_full_path = None self.tjp_file = None self.csv_file_full_path = None self.csv_file = None self.compute_resources = compute_resources self.parsing_method = parsing_method self._projects = [] self.projects = projects
def _create_tjp_file(self): """creates the tjp file """ self.temp_file_full_path = tempfile.mktemp(prefix='Stalker_') self.temp_file_path = os.path.dirname(self.temp_file_full_path) self.temp_file_name = os.path.basename(self.temp_file_full_path) self.tjp_file_full_path = self.temp_file_full_path + ".tjp" self.csv_file_full_path = self.temp_file_full_path + ".csv" def _create_tjp_file_content(self): """creates the tjp file content """ from jinja2 import Template start = time.time() # use new way of doing it, it will just work with PostgreSQL import json from stalker import defaults from stalker.db.session import DBSession template = Template(defaults.tjp_main_template2) if not self.projects: project_ids = DBSession.connection().execute( 'select id, code from "Projects"' ).fetchall() else: project_ids = [[project.id] for project in self.projects] sql_query = """select "Tasks".id, tasks.path, coalesce("Tasks".parent_id, "Tasks".project_id) as parent_id, tasks.entity_type, tasks.name, "Tasks".priority, "Tasks".schedule_timing, "Tasks".schedule_unit, "Tasks".schedule_model, "Tasks".allocation_strategy, "Tasks".persistent_allocation, tasks.depth, task_resources.resource_ids, task_alternative_resources.resource_ids as alternative_resource_ids, time_logs.time_log_array, task_dependencies.dependency_info, not exists ( select 1 from "Tasks" as "Child_Tasks" where "Child_Tasks".parent_id = "Tasks".id ) as is_leaf from "Tasks" join ( with recursive recursive_task(id, parent_id, path_as_text, path, depth) as ( select id, parent_id, id::text as path_as_text, array[project_id] as path, 0 from "Tasks" where parent_id is NULL and project_id = :id union all select task.id, task.parent_id, (parent.path_as_text || '-' || task.id) as path_as_text, (parent.path || task.parent_id) as path, parent.depth + 1 as depth from "Tasks" as task join recursive_task as parent on task.parent_id = parent.id ) select recursive_task.id, recursive_task.parent_id, recursive_task.path_as_text, recursive_task.path, "SimpleEntities".name as name, "SimpleEntities".entity_type, recursive_task.depth from recursive_task join "SimpleEntities" on recursive_task.id = "SimpleEntities".id --order by path_as_text ) as tasks on "Tasks".id = tasks.id -- resources left outer join ( select task_id, array_agg(resource_id order by resource_id) as resource_ids from "Task_Resources" group by task_id ) as task_resources on "Tasks".id = task_resources.task_id -- alternative resources left outer join ( select task_id, array_agg(resource_id order by resource_id) as resource_ids from "Task_Alternative_Resources" group by task_id ) as task_alternative_resources on "Tasks".id = task_alternative_resources.task_id -- time logs left outer join ( select "TimeLogs".task_id, array_agg(('User_' || "TimeLogs".resource_id, to_char(cast("TimeLogs".start at time zone 'utc' as timestamp), 'YYYY-MM-DD-HH24:MI:00'), to_char(cast("TimeLogs".end at time zone 'utc' as timestamp), 'YYYY-MM-DD-HH24:MI:00'))) as time_log_array from "TimeLogs" group by task_id ) as time_logs on "Tasks".id = time_logs.task_id -- dependencies left outer join ( select task_id, array_agg((tasks.alt_path, dependency_target, gap_timing, gap_unit, gap_model)) dependency_info from "Task_Dependencies" join ( with recursive recursive_task(id, parent_id, alt_path) as ( select id, parent_id, project_id::text as alt_path from "Tasks" where parent_id is NULL union all select task.id, task.parent_id, (parent.alt_path || '-' || task.parent_id) as alt_path from "Tasks" as task join recursive_task as parent on task.parent_id = parent.id ) select recursive_task.id, recursive_task.parent_id, recursive_task.alt_path || '-' || recursive_task.id as alt_path from recursive_task join "SimpleEntities" on recursive_task.id = "SimpleEntities".id ) as tasks on "Task_Dependencies".depends_to_id = tasks.id group by task_id ) as task_dependencies on "Tasks".id = task_dependencies.task_id --order by "Tasks".id order by path_as_text""" result_buffer = [] num_of_records = 0 # run it per project from sqlalchemy import text for pr in project_ids: p_id = pr[0] #p_code = pr[1] result = DBSession.connection().execute(text(sql_query), id=p_id) # start by adding the project first result_buffer.append( 'task Project_%s "Project_%s" {' % (p_id, p_id) ) # now start jumping around previous_level = 0 for r in result.fetchall(): # start by appending task tjp id first task_id = r[0] # path = r[1] # parent_id = r[2] # entity_type = r[3] #name = r[4] priority = r[5] schedule_timing = r[6] schedule_unit = r[7] schedule_model = r[8] allocation_strategy = r[9] persistent_allocation = r[10] depth = r[11] + 1 resource_ids = r[12] alternative_resource_ids = r[13] time_log_array = r[14] dependency_info = r[15] is_leaf = r[16] tab = ' ' * depth # close the previous level if necessary for i in range(previous_level - depth + 1): i_tab = ' ' * (previous_level - i) result_buffer.append('%s}' % i_tab) result_buffer.append( """%(tab)stask Task_%(id)s "Task_%(id)s" {""" % { 'tab': tab, 'id': task_id } ) # append priority if it is different then 500 if priority != 500: result_buffer.append( '%s priority %s' % (tab, priority)) # append dependency information if dependency_info: dep_buffer = ['%s depends ' % tab] json_data = json.loads( dependency_info.replace('{', '[') .replace('}', ']') .replace('(', '') .replace(')', '') ) # it is an array of string for i, dep in enumerate(json_data): if i > 0: dep_buffer.append(', ') dep_full_ids, \ dependency_target, \ gap_timing, \ gap_unit, \ gap_model = dep.split(',') dep_full_path = '.'.join( map(lambda x: 'Task_%s' % x, dep_full_ids.split('-')) ) # fix for Project id dep_full_path = 'Project_%s' % dep_full_path[5:] dep_string = '%s {%s}' % ( dep_full_path, dependency_target) dep_buffer.append(dep_string) result_buffer.append(''.join(dep_buffer)) # append schedule model and timing information # if this is a leaf task and has resources if is_leaf and resource_ids: result_buffer.append( '%s %s %s%s' % ( tab, schedule_model, schedule_timing, schedule_unit ) ) resource_buffer = ['%s allocate ' % tab] for i, resource_id in enumerate(resource_ids): if i > 0: resource_buffer.append(', ') resource_buffer.append('User_%s' % resource_id) # now go through alternatives if alternative_resource_ids: resource_buffer.append(' { alternative ') for j, alt_resource_id in \ enumerate(alternative_resource_ids): if j > 0: resource_buffer.append(', ') resource_buffer.append( 'User_%s' % alt_resource_id) # set the allocation strategy resource_buffer.append( ' select %s' % allocation_strategy) # is is persistent if persistent_allocation: resource_buffer.append(' persistent') resource_buffer.append(' }') result_buffer.append(''.join(resource_buffer)) # append any time log information if time_log_array: json_data = json.loads( time_log_array.replace('{', '[') .replace('}', ']') .replace('(', '') .replace(')', '') ) # it is an array of string for tlog in json_data: user_id, t_start, t_end = tlog.split(',') result_buffer.append( '%s booking %s %s - %s { overtime 2 }' % ( tab, user_id, t_start, t_end ) ) previous_level = depth num_of_records += 1 # and close the brackets per project depth = 0 # current depth is 0 (Project) # previous_level is the last task for i in range(previous_level - depth + 1): i_tab = ' ' * (previous_level - i) result_buffer.append('%s}' % i_tab) tasks_buffer = '\n'.join(result_buffer) import stalker self.tjp_content = template.render({ 'stalker': stalker, 'studio': self.studio, 'csv_file_name': self.temp_file_name, 'csv_file_full_path': self.temp_file_full_path, 'compute_resources': self.compute_resources, 'tasks_buffer': tasks_buffer }) logger.debug( 'total number of records: %s' % num_of_records ) end = time.time() logger.debug( 'rendering the whole tjp file took : %s seconds' % (end - start) ) def _fill_tjp_file(self): """fills the tjp file with content """ with open(self.tjp_file_full_path, 'w+') as self.tjp_file: self.tjp_file.write(self.tjp_content) def _delete_tjp_file(self): """deletes the temp tjp file """ try: os.remove(self.tjp_file_full_path) except OSError: pass def _delete_csv_file(self): """deletes the temp csv file """ try: os.remove(self.csv_file_full_path) except OSError: pass def _clean_up(self): """removes the temp files """ self._delete_tjp_file() self._delete_csv_file() def _parse_csv_file(self): """parses back the csv file and fills the tasks with computes_start and computed_end values """ parsing_start = time.time() logger.debug('csv_file_full_path : %s' % self.csv_file_full_path) if not os.path.exists(self.csv_file_full_path): logger.debug('could not find CSV file, ' 'returning without updating db!') return from stalker import Task, Project from stalker.models.task import Task_Computed_Resources entity_ids = [] update_data = [] update_user_data = [] with open(self.csv_file_full_path, 'r') as self.csv_file: csv_content = csv.reader(self.csv_file, delimiter=';') lines = [line for line in csv_content] lines.pop(0) for data in lines: id_line = data[0] entity_id = int(id_line.split('.')[-1].split('_')[-1]) if entity_id: entity_ids.append(entity_id) start_date = datetime.datetime.strptime( data[1], "%Y-%m-%d-%H:%M" ) end_date = datetime.datetime.strptime( data[2], "%Y-%m-%d-%H:%M" ) # implement time zone info start_date = start_date.replace(tzinfo=pytz.utc) end_date = end_date.replace(tzinfo=pytz.utc) # computed_resources if self.compute_resources: if data[3] != '': resources_data = map( lambda x: x.split('_')[-1].split(')')[0], data[3].split(',') ) for rid in resources_data: update_user_data.append({ 'task_id': entity_id, 'resource_id': rid }) update_data.append({ 'b_id': entity_id, 'start': start_date, 'end': end_date, 'computed_start': start_date, 'computed_end': end_date }) from sqlalchemy import bindparam # update date values update_statement = Task.__table__.update()\ .where(Task.__table__.c.id == bindparam('b_id'))\ .values( start=bindparam('start'), end=bindparam('end'), computed_start=bindparam('computed_start'), computed_end=bindparam('computed_end') ) from stalker.db.session import DBSession DBSession.connection().execute( update_statement, update_data ) # update project dates update_project_statement = Project.__table__.update()\ .where(Project.__table__.c.id == bindparam('b_id'))\ .values( start=bindparam('start'), end=bindparam('end'), computed_start=bindparam('computed_start'), computed_end=bindparam('computed_end') ) DBSession.connection().execute( update_project_statement, update_data ) # update computed resources data # first delete everything if self.compute_resources: delete_resources_statement = Task_Computed_Resources.delete() update_resources_statement = Task_Computed_Resources.insert()\ .values( task_id=bindparam('task_id'), resource_id=bindparam('resource_id') ) DBSession.connection().execute(delete_resources_statement) DBSession.connection().execute( update_resources_statement, update_user_data ) parsing_end = time.time() logger.debug( 'completed parsing csv file in (SQL): %s seconds' % (parsing_end - parsing_start) )
[docs] def schedule(self): """Does the scheduling. """ # check the studio attribute from stalker import Studio if not isinstance(self.studio, Studio): raise TypeError( '%s.studio should be an instance of ' 'stalker.models.studio.Studio, not %s' % (self.__class__.__name__, self.studio.__class__.__name__) ) # create a tjp file self._create_tjp_file() # create tjp file content self._create_tjp_file_content() # fill it with data self._fill_tjp_file() logger.debug('tjp_file_full_path: %s' % self.tjp_file_full_path) # pass it to tj3 from stalker import defaults if os.name == 'nt': logger.debug('tj3 using fallback mode for Windows!') command = '%s %s -o %s' % ( defaults.tj_command, self.tjp_file_full_path, self.temp_file_path, ) logger.debug('tj3 command: %s' % command) returncode = os.system(command) stderr_buffer = '' else: process = subprocess.Popen( [defaults.tj_command, self.tjp_file_full_path, '-o', self.temp_file_path], stderr=subprocess.PIPE ) # loop until process finishes and capture stderr output stderr_buffer = [] while True: stderr = process.stderr.readline() if stderr == b'' and process.poll() is not None: break if stderr != b'': stderr = stderr.strip() stderr_buffer.append(stderr) logger.debug(stderr.strip()) # flatten the buffer stderr_buffer = '\n'.join(stderr_buffer) returncode = process.returncode if returncode: # there is an error raise RuntimeError(stderr_buffer) # read back the csv file self._parse_csv_file() logger.debug('tj3 return code: %s' % returncode) # remove the tjp file self._clean_up() return stderr_buffer
def _validate_projects(self, projects): """validates the given projects value """ if projects is None: projects = [] msg = '%(class)s.projects should be a list of ' \ 'stalker.models.project.Project instances, not ' \ '%(projects_class)s' if not isinstance(projects, list): raise TypeError( msg % { 'class': self.__class__.__name__, 'projects_class': projects.__class__.__name__ } ) from stalker import Project for item in projects: if not isinstance(item, Project): raise TypeError( msg % { 'class': self.__class__.__name__, 'projects_class': item.__class__.__name__ } ) return projects @property def projects(self): """getter for the _project attribute """ return self._projects @projects.setter def projects(self, projects): """setter for the _project attribute """ self._projects = self._validate_projects(projects)