Source code for turberfield.dialogue.model

#!/usr/bin/env python3
# encoding: UTF-8

# This file is part of turberfield.
#
# Turberfield is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Turberfield is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with turberfield.  If not, see <http://www.gnu.org/licenses/>.


import argparse
from collections import defaultdict
from collections import namedtuple
from collections import OrderedDict
import itertools
import logging
import operator
import os.path
import sys

from turberfield.dialogue.directives import Entity as EntityDirective
from turberfield.dialogue.directives import FX as FXDirective
from turberfield.dialogue.directives import Property as PropertyDirective
from turberfield.dialogue.directives import Memory as MemoryDirective
from turberfield.utils.assembly import Assembly
from turberfield.utils.misc import group_by_type

import pkg_resources
import docutils


[docs]class Model(docutils.nodes.GenericNodeVisitor): """This class registers the necessary extensions to the docutils document model. It also defines the types which are returned on iterating over a scene script file. """ Shot = namedtuple("Shot", ["name", "scene", "items"]) Property = namedtuple("Property", ["entity", "object", "attr", "val"]) Audio = namedtuple("Audio", ["package", "resource", "offset", "duration", "loop"]) Memory = namedtuple("Memory", ["subject", "object", "state", "text", "html"]) Line = namedtuple("Line", ["persona", "text", "html"]) def __init__(self, fP, document): super().__init__(document) self.fP = fP self.optional = tuple( i.__name__ for i in ( EntityDirective.Declaration, MemoryDirective.Definition, PropertyDirective.Getter, PropertyDirective.Setter, FXDirective.Cue ) ) self.log = logging.getLogger("turberfield.dialogue.{0}".format(os.path.basename(self.fP))) self.section_level = 0 self.scenes = [] self.shots = [] self.speaker = None self.memory = None def __iter__(self): for shot in self.shots: for item in shot.items: yield shot, item def get_entity(self, ref): return next(( entity for entity in self.document.citations if ref and ref.lower() in entity.attributes["names"]), None) def default_visit(self, node): self.log.debug(node) def default_departure(self, node): pass def visit_section(self, node): self.section_level += 1 def depart_section(self, node): self.section_level -= 1 def visit_Setter(self, node): ref, attr = node["arguments"][0].split(".") entity = self.get_entity(ref) s = node["arguments"][1] val = int(s) if s.isdigit() else node.string_import(s) self.shots[-1].items.append(Model.Property(self.speaker, entity.persona, attr, val)) def visit_Definition(self, node): state = node.string_import(node["arguments"][0]) subj = self.get_entity(node["options"].get("subject")) obj = self.get_entity(node["options"].get("object")) self.memory = Model.Memory(subj and subj.persona, obj and obj.persona, state, None, None) def visit_Cue(self, node): pkg = node["arguments"][0] rsrc = node["arguments"][1] offset = node["options"].get("offset") duration = node["options"].get("duration") loop = node["options"].get("loop") item = Model.Audio(pkg, rsrc, offset, duration, loop) self.shots[-1].items.append(item) def visit_title(self, node): self.log.debug(self.section_level) if isinstance(node.parent, docutils.nodes.section): if self.section_level == 1: self.scenes.append(node.parent.attributes["names"][0]) elif self.section_level == 2: self.shots.append(Model.Shot( node.parent.attributes["names"][0], self.scenes[-1], [])) def visit_paragraph(self, node): text = [] html = [] for c in node.children: if isinstance(c, docutils.nodes.substitution_reference): try: defn = self.document.substitution_defs[c.attributes["refname"]] except KeyError: self.log.warning("Bad substitution ref before line {0}: {1.rawsource}".format(node.line, c)) raise for tgt in defn.children: if isinstance(tgt, PropertyDirective.Getter): ref, dot, attr = tgt["arguments"][0].partition(".") entity = self.get_entity(ref) if getattr(entity, "persona", None) is not None: val = operator.attrgetter(attr)(entity.persona) text.append(val) html.append('<span class="ref">{0}</span>'.format(val)) elif isinstance(c, docutils.nodes.strong): text.append(c.rawsource) html.append('<strong class="text">{0}</strong>'.format(c.rawsource.replace("*", ""))) elif isinstance(c, docutils.nodes.Text): text.append(c.rawsource) html.append('<span class="text">{0}</span>'.format(c.rawsource)) if self.memory: self.shots[-1].items.append(self.memory._replace(text=" ".join(text), html="\n".join(html))) self.memory = None elif (text or html) and self.section_level == 2: self.shots[-1].items.append(Model.Line(self.speaker, " ".join(text), "\n".join(html))) def visit_citation_reference(self, node): entity = self.get_entity(node.attributes["refname"]) self.speaker = entity.persona
[docs]class SceneScript: """Gives access to a Turberfield scene script (.rst) file. This class allows discovery and classification of scene files prior to loading them in memory. Once loaded, it allows entity selection based on the role definitions in the file. Casting a selection permits the script to be iterated as a sequence of dialogue items. """ Folder = namedtuple("Folder", ["pkg", "description", "metadata", "paths", "interludes"]) log = logging.getLogger("turberfield.dialogue.model.scenescript") settings=argparse.Namespace( character_level_inline_markup=False, debug = False, error_encoding="utf-8", error_encoding_error_handler="backslashreplace", halt_level=4, auto_id_prefix="", id_prefix="", language_code="en", pep_references=1, report_level=2, rfc_references=1, strict_visitor=False, tab_width=4, warning_stream=sys.stderr ) docutils.parsers.rst.directives.register_directive( "entity", EntityDirective ) docutils.parsers.rst.directives.register_directive( "property", PropertyDirective ) docutils.parsers.rst.directives.register_directive( "fx", FXDirective ) docutils.parsers.rst.directives.register_directive( "memory", MemoryDirective )
[docs] @classmethod def scripts(cls, pkg, metadata, paths=[], **kwargs): """This class method is the preferred way to create SceneScript objects. :param str pkg: The dotted name of the package containing the scripts. :param metadata: A mapping or data object. This parameter permits searching among scripts against particular criteria. Its use is application specific. :param list(str) paths: A sequence of file paths to the scripts relative to the package. You can satisfy all parameter requirements by passing in a :py:class:`~turberfield.dialogue.model.SceneScript.Folder` object like this:: SceneScript.scripts(**folder._asdict()) The method generates a sequence of :py:class:`~turberfield.dialogue.model.SceneScript` objects. """ for path in paths: try: fP = pkg_resources.resource_filename(pkg, path) except ImportError: cls.log.warning( "No package called {}".format(pkg) ) else: if not os.path.isfile(fP): cls.log.warning( "No script file at {}".format(os.path.join(*pkg.split(".") + [path])) ) else: yield cls(fP, metadata)
[docs] @staticmethod def read(text, name=None): """Read a block of text as a docutils document. :param str text: Scene script text. :param str name: An optional name for the document. :return: A document object. """ doc = docutils.utils.new_document(name, SceneScript.settings) parser = docutils.parsers.rst.Parser() parser.parse(text, doc) return doc
def __init__(self, fP, metadata=None, doc=None): self.fP = fP self.metadata = metadata self.doc = doc def __enter__(self): with open(self.fP, "r") as script: self.doc = self.read(script.read()) return self def __exit__(self, exc_type, exc_value, traceback): return False
[docs] def select(self, personae, relative=False, roles=1): """Select a persona for each entity declared in the scene. :param personae: A sequence of Personae. :param bool relative: Affects imports from namespace packages. Used for testing only. :param int roles: The maximum number of roles allocated to each persona. :return: An OrderedDict of {Entity: Persona}. """ def constrained(entity): return ( len(entity["options"].get("types", [])) + len(entity["options"].get("states", [])) ) rv = OrderedDict() performing = defaultdict(set) pool = list(personae) self.log.debug(pool) entities = OrderedDict([ ("".join(entity.attributes["names"]), entity) for entity in sorted( group_by_type(self.doc)[EntityDirective.Declaration], key=constrained, reverse=True ) ]) for e in entities.values(): types = tuple(filter( None, (e.string_import(t, relative) for t in e["options"].get("types", []) ) )) states = tuple(filter( None, (int(t) if t.isdigit() else e.string_import(t, relative) for t in e["options"].get("states", []) ) )) otherRoles = {i.lower() for i in e["options"].get("roles", [])} typ = types or object persona = next( (i for i in pool if isinstance(i, typ) and all(str(i.get_state(type(s))).startswith(str(s)) for s in states) and (performing[i].issubset(otherRoles) or not otherRoles) ), None ) rv[e] = persona performing[persona].update(set(e.attributes["names"])) if not otherRoles or list(rv.values()).count(persona) == roles: try: pool.remove(persona) except ValueError: self.log.debug( "No persona for type {0} and states {1} with {2} {3}.".format( typ, states, roles, "role" if roles == 1 else "roles" ) ) return rv
[docs] def cast(self, mapping): """Allocate the scene script a cast of personae for each of its entities. :param mapping: A dictionary of {Entity, Persona} :return: The SceneScript object. """ # See 'citation' method in # http://docutils.sourceforge.net/docutils/parsers/rst/states.py for c, p in mapping.items(): self.doc.note_citation(c) self.doc.note_explicit_target(c, c) c.persona = p self.log.debug("{0} to be played by {1}".format( c["names"][0].capitalize(), p) ) return self
[docs] def run(self): """Parse the script file. :rtype: :py:class:`~turberfield.dialogue.model.Model` """ model = Model(self.fP, self.doc) self.doc.walkabout(model) return model
Assembly.register(Model.Audio, Model.Line, Model.Memory, Model.Property)