fireworks.core package

Submodules

fireworks.core.firework module

class fireworks.core.firework.FWAction(stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None, detours=None, defuse_children=False, defuse_workflow=False)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A FWAction encapsulates the output of a FireTask (it is returned by a FireTask after the FireTask completes). The

FWAction allows a user to store rudimentary output data as well as return commands that alter the workflow.
__init__(stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None, detours=None, defuse_children=False, defuse_workflow=False)
Parameters:stored_data – (dict) data to store from the run. Does not

affect the operation of FireWorks. :param exit: (bool) if set to True, any remaining FireTasks within the same Firework are skipped. :param update_spec: (dict) specifies how to update the child FW’s spec :param mod_spec: ([dict]) update the child FW’s spec using the DictMod language (more flexible than update_spec) :param additions: ([Workflow]) a list of WFs/FWs to add as children :param detours: ([Workflow]) a list of WFs/FWs to add as children ( they will inherit the current FW’s children) :param defuse_children: (bool) defuse all the original children of this Firework :param defuse_workflow: (bool) defuse all incomplete steps of this workflow

classmethod from_dict(*args, **kwargs)
skip_remaining_tasks

If the FWAction gives any dynamic action, we skip the subsequent FireTasks

Returns:(bool)
to_dict(*args, **kwargs)
class fireworks.core.firework.FireTaskBase(*args, **kwargs)

Bases: collections.defaultdict, fireworks.utilities.fw_serializers.FWSerializable

FireTaskBase is used like an abstract class that defines a computing task (FireTask). All FireTasks should inherit from FireTaskBase.

You can set parameters of a FireTask like you’d use a dict.

__init__(*args, **kwargs)
classmethod from_dict(*args, **kwargs)
required_params = []
run_task(fw_spec)

This method gets called when the FireTask is run. It can take in a Firework spec, perform some task using that data, and then return an output in the form of a FWAction.

Args:
fw_spec (dict): A Firework spec. This comes from the master spec.
In addition, this spec contains a special “_fw_env” key that contains the env settings of the FWorker calling this method. This provides for abstracting out certain commands or settings. For example, “foo” may be named “foo1” in resource 1 and “foo2” in resource 2. The FWorker env can specify { “foo”: “foo1”}, which maps an abstract variable “foo” to the relevant “foo1” or “foo2”. You can then write a task that uses fw_spec[“_fw_env”][“foo”] that will work across all these multiple resources.
Returns:
(FWAction)
to_dict(*args, **kwargs)
class fireworks.core.firework.FireTaskMeta

Bases: abc.ABCMeta

class fireworks.core.firework.Firework(tasks, spec=None, name=None, launches=None, archived_launches=None, state=u'WAITING', created_on=None, fw_id=None, parents=None, updated_on=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A Firework is a workflow step and might be contain several FireTasks.

STATE_RANKS = {u'RUNNING': 4, u'ARCHIVED': -2, u'WAITING': 1, u'FIZZLED': -1, u'READY': 2, u'RESERVED': 3, u'COMPLETED': 5, u'DEFUSED': 0}
__init__(tasks, spec=None, name=None, launches=None, archived_launches=None, state=u'WAITING', created_on=None, fw_id=None, parents=None, updated_on=None)
Parameters:
  • tasks – ([FireTask]) a list of FireTasks to run in sequence
  • spec – (dict) specification of the job to run. Used by the

FireTask :param launches: ([Launch]) a list of Launch objects of this Firework :param archived_launches: ([Launch]) a list of archived Launch objects of this Firework :param state: (str) the state of the FW (e.g. WAITING, RUNNING, COMPLETED, ARCHIVED) :param created_on: (datetime) - time of creation :param fw_id: (int) an identification number for this Firework :param parents: (Firework or [Firework]) list of parent FWs this FW depends on :param updated_on: (datetime) - last time the STATE was updated

classmethod from_dict(*args, **kwargs)
state
Returns:(str) The current state of the Firework
to_db_dict()
to_dict(*args, **kwargs)
class fireworks.core.firework.Launch(state, launch_dir, fworker=None, host=None, ip=None, trackers=None, action=None, state_history=None, launch_id=None, fw_id=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable, object

A Launch encapsulates data about a specific run of a Firework on a computing resource

__init__(state, launch_dir, fworker=None, host=None, ip=None, trackers=None, action=None, state_history=None, launch_id=None, fw_id=None)
Parameters:
  • state – (str) the state of the Launch (e.g. RUNNING, COMPLETED)
  • launch_dir – (str) the directory where the Launch takes place
  • fworker – (FWorker) The FireWorker running the Launch
  • host – (str) the hostname where the launch took place (set

automatically if None) :param ip: (str) the IP address where the launch took place (set automatically if None) :param trackers: ([Tracker]) File Trackers for this Launch :param action: (FWAction) the output of the Launch :param state_history: ([dict]) a history of all states of the Launch and when they occurred :param launch_id: (int) launch_id set by the LaunchPad :param fw_id: (int) id of the Firework this Launch is running

classmethod from_dict(*args, **kwargs)
last_pinged
Returns:(datetime) the time the Launch last pinged a heartbeat that

it was still running

reservedtime_secs
Returns:(int) number of seconds the Launch was stuck as RESERVED in

a queue

runtime_secs
Returns:(int) the number of seconds that the Launch ran for
set_reservation_id(reservation_id)

Adds the job_id to the reservation

Parameters:reservation_id – (str) the id of the reservation (e.g.,

queue reservation)

state
Returns:(str) The current state of the Launch.
time_end
Returns:(datetime) the time the Launch was COMPLETED or FIZZLED
time_reserved
Returns:(datetime) the time the Launch was RESERVED in the queue
time_start
Returns:(datetime) the time the Launch started RUNNING
to_db_dict(*args, **kwargs)
to_dict(*args, **kwargs)
touch_history(update_time=None)
Updates the update_at field of the state history of a Launch. Used to
ping that a Launch is still alive.
class fireworks.core.firework.Tracker(filename, nlines=25, content=u'', allow_zipped=False)

Bases: fireworks.utilities.fw_serializers.FWSerializable, object

A Tracker monitors a file and returns the last N lines for updating the Launch object

MAX_TRACKER_LINES = 1000
__init__(filename, nlines=25, content=u'', allow_zipped=False)
classmethod from_dict(m_dict)
to_dict()
track_file(launch_dir=None)

Reads the monitored file and returns back the last N lines :param launch_dir: directory where job was launched in case of relative filename :return:

class fireworks.core.firework.Workflow(fireworks, links_dict=None, name=None, metadata=None, created_on=None, updated_on=None, fw_states=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A Workflow connects a group of FireWorks in an execution order

Bases: dict, fireworks.utilities.fw_serializers.FWSerializable

An inner class for storing the DAG links between FireWorks

__init__(*args, **kwargs)
classmethod from_dict(m_dict)
nodes
to_db_dict()
to_dict()
Workflow.__init__(fireworks, links_dict=None, name=None, metadata=None, created_on=None, updated_on=None, fw_states=None)
Parameters:
  • fireworks – ([Firework]) - all FireWorks in this workflow
  • links_dict – (dict) links between the FWs as (parent_id):[(

child_id1, child_id2)] :param name: (str) naem of workflow :param metadata: (dict) metadata for this Workflow :param created_on: (datetime) :param updated_on: (datetime) :param fw_states: (dict) - leave alone unless you are purposefully creating a Lazy-style WF

Workflow.append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=False)

Method to add a workflow as a child to a Firework Note: detours must have children that have STATE_RANK that is WAITING or below

Parameters:
  • new_wf – (Workflow) New Workflow to add
  • fw_ids – ([int]) ids of the parent Fireworks on which to add the Workflow
  • detour – (bool) add children of the current Firework to the Workflow’s leaves
  • pull_spec_mods – (bool) pull spec mods of COMPLETED parents, refreshes the WF states.
Returns:

([int]) list of Firework ids that were updated or new

Workflow.apply_action(action, fw_id)

Apply a FWAction on a Firework in the Workflow

Parameters:
  • action – (FWAction) action to apply
  • fw_id – (int) id of Firework on which to apply the action
Returns:

([int]) list of Firework ids that were updated or new

classmethod Workflow.from_Firework(fw, name=None, metadata=None)
classmethod Workflow.from_dict(m_dict)
classmethod Workflow.from_wflow(wflow)

Create a fresh Workflow from an existing one.

Workflow.fws
Workflow.leaf_fw_ids

Gets leaf FireWorks of this workflow (those with no children)

Returns:([int]) Firework ids of leaf FWs
Workflow.refresh(fw_id, updated_ids=None)

Refreshes the state of a Firework and any affected children.

Parameters:
  • fw_id – (int) id of the Firework on which to perform the refresh
  • updated_ids – ([int])
Returns:

(set(int)) list of Firework ids that were updated

Workflow.rerun_fw(fw_id, updated_ids=None)

Archives the launches of a Firework so that it can be re-run. :param fw_id: (int) :return: ([int]) list of Firework ids that were updated

Workflow.reset(reset_ids=True)

Reset the states of all Fireworks in this workflow to ‘WAITING’.

Parameters:reset_ids – (bool) if True, give each Firework a new id.
Workflow.root_fw_ids

Gets root FireWorks of this workflow (those with no parents)

Returns:([int]) Firework ids of root FWs
Workflow.state
Workflow.to_db_dict()
Workflow.to_dict()
Workflow.to_display_dict()

fireworks.core.fworker module

class fireworks.core.fworker.FWorker(name=u'Automatically generated Worker', category=u'', query=None, env=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

__init__(name=u'Automatically generated Worker', category=u'', query=None, env=None)
Args:

name: the name of the resource, should be unique category: a String describing the computing resource, does not

need to be unique
query: a dict query that restricts the type of Firework this
resource will run
env: a dict of special environment variables for the resource.
This env is passed to running FireTasks as a _fw_env in the fw_spec, which provides for abstraction of resource-specific commands or settings. See fireworks.core.firework.FireTaskBase for information on how to use this env variable in FireTasks.
classmethod auto_load()
classmethod from_dict(*args, **kwargs)
query
to_dict(*args, **kwargs)

fireworks.core.launchpad module

class fireworks.core.launchpad.LaunchPad(host=u'localhost', port=27017, name=u'fireworks', username=None, password=None, logdir=None, strm_lvl=None, user_indices=None, wf_user_indices=None, ssl_ca_file=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

The LaunchPad manages the FireWorks database.

__init__(host=u'localhost', port=27017, name=u'fireworks', username=None, password=None, logdir=None, strm_lvl=None, user_indices=None, wf_user_indices=None, ssl_ca_file=None)
Parameters:
  • host
  • port
  • name
  • username
  • password
  • logdir
  • strm_lvl
  • user_indices
  • wf_user_indices
  • ssl_ca_file
add_offline_run(launch_id, fw_id, name)
add_wf(wf, reassign_all=True)
Parameters:wf – a Workflow object.
append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=True)

Append a new workflow on top of an existing workflow

Parameters:
  • new_wf – (Workflow) The new workflow to append
  • fw_ids – ([int]) The parent fw_ids at which to append the workflow
  • detour – (bool) Whether to connect the new Workflow in a “detour” style, i.e., move original children of the parent fw_ids to the new_wf
  • pull_spec_mods – (bool) Whether the new Workflow should pull the FWActions of the parent fw_ids
archive_wf(fw_id)
classmethod auto_load()
cancel_reservation(launch_id)
cancel_reservation_by_reservation_id(reservation_id)
change_launch_dir(launch_id, launch_dir)
checkout_fw(fworker, launch_dir, fw_id=None, host=None, ip=None)

(internal method) Finds a Firework that’s ready to be run, marks it as running, and returns it to the caller. The caller is responsible for running the Firework.

Parameters:
  • fworker – A FWorker instance
  • host – the host making the request (for creating a Launch object)
  • ip – the ip making the request (for creating a Launch object)
  • launch_dir – the dir the FW will be run in (for creating a Launch object)
Returns:

a Firework, launch_id tuple

complete_launch(launch_id, action, state=u'COMPLETED')

(internal method) used to mark a Firework’s Launch as completed. :param launch_id: :param action: the FWAction of what to do next

defuse_fw(fw_id, rerun_duplicates=True)
defuse_wf(fw_id, defuse_all_states=True)
delete_wf(fw_id)
detect_lostruns(expiration_secs=14400, fizzle=False, rerun=False, max_runtime=None, min_runtime=None, refresh=False)
detect_unreserved(expiration_secs=1209600, rerun=False)
forget_offline(fw_id)
classmethod from_dict(d)
get_fw_by_id(fw_id)

Given a Firework id, give back a Firework object

Parameters:fw_id – Firework id (int)
Returns:Firework object
get_fw_dict_by_id(fw_id)
get_fw_ids(query=None, sort=None, limit=0, count_only=False, launches_mode=False)

Return all the fw ids that match a query, :param query: (dict) representing a Mongo query :param sort: [(str,str)] sort argument in Pymongo format :param limit: (int) limit the results :param count_only: (bool) only return the count rather than explicit ids :param launches_mode: (bool) query the launches collection instead of fireworks

get_fw_ids_from_reservation_id(reservation_id)
get_launch_by_id(launch_id)

Given a Launch id, return details of the Launch

Parameters:launch_id – launch id
Returns:Launch object
get_logdir()
get_new_fw_id()

Checkout the next Firework id

get_new_launch_id()

Checkout the next Launch id

get_reservation_id_from_fw_id(fw_id)
get_tracker_data(fw_id)
get_wf_by_fw_id(fw_id)

Given a Firework id, give back the Workflow containing that Firework :param fw_id: :return: A Workflow object

get_wf_by_fw_id_lzyfw(fw_id)

Given a FireWork id, give back the Workflow containing that FireWork :param fw_id: :return: A Workflow object

get_wf_ids(query=None, sort=None, limit=0, count_only=False)

Return one fw id for all workflows that match a query, :param query: (dict) representing a Mongo query :param sort: [(str,str)] sort argument in Pymongo format :param limit: (int) limit the results :param count_only: (bool) only return the count rather than explicit ids

get_wf_summary_dict(fw_id, mode=u'more')

A much faster way to get summary information about a Workflow by querying only for needed information.

Args:

fw_id (int): A Firework id. mode (str): Choose between “more”, “less” and “all” in terms of

quantity of information.
Returns:
(dict) of information about Workflow.
log_message(level, message)
maintain(infinite=True, maintain_interval=None)
mark_fizzled(launch_id)
ping_launch(launch_id, ptime=None)
recover_offline(launch_id, ignore_errors=False, print_errors=False)
reignite_fw(fw_id)
reignite_wf(fw_id)
rerun_fw(fw_id, rerun_duplicates=True)
rerun_fws_task_level(fw_id, rerun_duplicates=True, launch_id=None, recover_mode=None)

Rerun a fw at the task level :param fw_id: (int) fw_id to rerun :param rerun_duplicates: (bool) also rerun duplicate FWs :param launch_id: (int) launch id to rerun, if known. otherwise the last launch_id will be used :param recover_mode: (str) use “prev_dir” to run again in previous dir, “cp” to try to copy data to new dir, or None to start from scratch :return: ([int]) list of rerun fw_ids

reserve_fw(fworker, launch_dir, host=None, ip=None)
reset(password, require_password=True, max_reset_wo_password=25)

Create a new FireWorks database. This will overwrite the existing FireWorks database! To safeguard against accidentally erasing an existing database, a password must be entered.

Parameters:
  • password – A String representing today’s date, e.g. ‘2012-12-31’
  • require_password – Whether a password is required to reset the DB. Setting to false is dangerous because running code unintentionally could clear your DB - use max_reset_wo_password to minimize risk.
  • max_reset_wo_password – A failsafe: when require_password is set to False, FWS will not clear DBs that contain more workflows than this parameter
restore_backup_data(launch_id, fw_id)
run_exists(fworker=None)

Checks to see if the database contains any FireWorks that are ready to run :return: (T/F)

set_priority(fw_id, priority)
set_reservation_id(launch_id, reservation_id)
to_dict()

Note: usernames/passwords are exported as unencrypted Strings!

tuneup(bkground=True)
update_spec(fw_ids, spec_document)

Update fireworks with a spec. Sometimes you need to modify a firework in progress.

Args:

fw_ids: All fw_ids to modify. spec_document: The spec document. Note that only modifications to

the spec key are allowed. So if you supply { “_tasks.1.parameter”: “hello”}, you are effectively modifying spec._tasks.1.parameter in the actual fireworks collection.
class fireworks.core.launchpad.LazyFirework(fw_id, fw_coll, launch_coll)

Bases: object

A LazyFirework only has the fw_id, and grabs other data just-in-time. This representation can speed up Workflow loading as only “important” FWs need to be fully loaded. :param fw_id: :param fw_coll: :param launch_coll:

__init__(fw_id, fw_coll, launch_coll)
archived_launches
created_on
db_fields = (u'name', u'fw_id', u'spec', u'created_on', u'state')
db_launch_fields = (u'launches', u'archived_launches')
full_fw
launches
name
parents
partial_fw
spec
state
tasks
to_db_dict()
to_dict()
updated_on
exception fireworks.core.launchpad.LockedWorkflowError

Bases: exceptions.ValueError

Error raised if the context manager WFLock can’t acquire the lock on the WF within the selected time interval (WFLOCK_EXPIRATION_SECS), if the killing of the lock is disabled (WFLOCK_EXPIRATION_KILL)

class fireworks.core.launchpad.WFLock(lp, fw_id, expire_secs=300, kill=False)

Bases: object

Lock a Workflow, i.e. for performing update operations Raises a LockedWorkflowError if the lock couldn’t be acquired withing expire_secs and kill==False. Calling functions are responsible for handling the error in order to avoid database inconsistencies.

__init__(lp, fw_id, expire_secs=300, kill=False)

fireworks.core.rocket module

class fireworks.core.rocket.Rocket(launchpad, fworker, fw_id)

The Rocket fetches a workflow step from the FireWorks database and executes it.

__init__(launchpad, fworker, fw_id)
Parameters:
  • launchpad – (LaunchPad) A LaunchPad object for interacting with the FW database. If none, reads FireWorks from FW.json and writes to FWAction.json
  • fworker – (FWorker) A FWorker object describing the computing resource
  • fw_id – (int) id of a specific Firework to run (quit if it cannot be found)
decorate_fwaction(fwaction, my_spec, m_fw, launch_dir)
run()

Run the rocket (check out a job from the database and execute it)

fireworks.core.rocket.background_task(btask, spec, stop_event, master_thread)
fireworks.core.rocket.do_ping(launchpad, launch_id)
fireworks.core.rocket.ping_launch(launchpad, launch_id, stop_event, master_thread)
fireworks.core.rocket.start_background_task(btask, spec)
fireworks.core.rocket.start_ping_launch(launchpad, launch_id)
fireworks.core.rocket.stop_backgrounds(ping_stop, btask_stops)

fireworks.core.rocket_launcher module

fireworks.core.rocket_launcher.get_fworker(fworker)
fireworks.core.rocket_launcher.launch_rocket(launchpad, fworker=None, fw_id=None, strm_lvl=u'INFO')

Run a single rocket in the current directory :param launchpad: (LaunchPad) :param fworker: (FWorker) :param fw_id: (int) if set, a particular Firework to run :param strm_lvl: (str) level at which to output logs to stdout

fireworks.core.rocket_launcher.rapidfire(launchpad, fworker=None, m_dir=None, nlaunches=0, max_loops=-1, sleep_time=None, strm_lvl=u'INFO', timeout=None)

Keeps running Rockets in m_dir until we reach an error. Automatically creates subdirectories for each Rocket. Usually stops when we run out of FireWorks from the LaunchPad.

Parameters:
  • launchpad – (LaunchPad)
  • fworker – (FWorker object)
  • m_dir – (str) the directory in which to loop Rocket running
  • nlaunches – (int) 0 means ‘until completion’, -1 or “infinite” means to loop until max_loops
  • max_loops – (int) maximum number of loops (default -1 is infinite)
  • sleep_time – (int) secs to sleep between rapidfire loop iterations
  • strm_lvl – (str) level at which to output logs to stdout
  • timeout – (int) # of seconds after which to stop the rapidfire process

Module contents