fireworks.core package

Submodules

fireworks.core.firework module

class fireworks.core.firework.FWAction(stored_data=None, exit=False, update_spec=None, mod_spec=None, additions=None, detours=None, defuse_children=False, defuse_workflow=False)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A FWAction encapsulates the output of a Firetask (it is returned by a Firetask after the Firetask completes). The FWAction allows a user to store rudimentary output data as well as return commands that alter the workflow.

classmethod from_dict(*args, **kwargs)
skip_remaining_tasks

If the FWAction gives any dynamic action, we skip the subsequent Firetasks

Returns:bool
to_dict(*args, **kwargs)
class fireworks.core.firework.FireTaskBase(*args, **kwargs)

Bases: fireworks.core.firework.FiretaskBase

class fireworks.core.firework.FiretaskBase(*args, **kwargs)

Bases: collections.defaultdict, fireworks.utilities.fw_serializers.FWSerializable

FiretaskBase is used like an abstract class that defines a computing task (Firetask). All Firetasks should inherit from FiretaskBase.

You can set parameters of a Firetask like you’d use a dict.

classmethod from_dict(*args, **kwargs)
required_params = []
run_task(fw_spec)

This method gets called when the Firetask is run. It can take in a Firework spec, perform some task using that data, and then return an output in the form of a FWAction.

Parameters:fw_spec (dict) – A Firework spec. This comes from the master spec. In addition, this spec contains a special “_fw_env” key that contains the env settings of the FWorker calling this method. This provides for abstracting out certain commands or settings. For example, “foo” may be named “foo1” in resource 1 and “foo2” in resource 2. The FWorker env can specify { “foo”: “foo1”}, which maps an abstract variable “foo” to the relevant “foo1” or “foo2”. You can then write a task that uses fw_spec[“_fw_env”][“foo”] that will work across all these multiple resources.
Returns:(FWAction)
to_dict(*args, **kwargs)
class fireworks.core.firework.FiretaskMeta

Bases: abc.ABCMeta

class fireworks.core.firework.Firework(tasks, spec=None, name=None, launches=None, archived_launches=None, state=u'WAITING', created_on=None, fw_id=None, parents=None, updated_on=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A Firework is a workflow step and might be contain several Firetasks.

STATE_RANKS = {u'PAUSED': 0, u'RUNNING': 4, u'ARCHIVED': -2, u'WAITING': 1, u'FIZZLED': -1, u'READY': 2, u'RESERVED': 3, u'COMPLETED': 5, u'DEFUSED': 0}
classmethod from_dict(*args, **kwargs)
state

**Returns* – str* – The current state of the Firework

to_db_dict()

Return firework dict with updated launches and state.

to_dict(*args, **kwargs)
class fireworks.core.firework.Launch(state, launch_dir, fworker=None, host=None, ip=None, trackers=None, action=None, state_history=None, launch_id=None, fw_id=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable, object

A Launch encapsulates data about a specific run of a Firework on a computing resource.

classmethod from_dict(*args, **kwargs)
last_pinged

**Returns* – datetime* – the time the Launch last pinged a heartbeat that it was still running

reservedtime_secs

**Returns* – int* – number of seconds the Launch was stuck as RESERVED in a queue.

runtime_secs

**Returns* – int* – the number of seconds that the Launch ran for.

set_reservation_id(reservation_id)

Adds the job_id to the reservation.

Parameters:reservation_id (int or str) – the id of the reservation (e.g., queue reservation)
state

**Returns* – str* – The current state of the Launch.

time_end

**Returns* – datetime* – the time the Launch was COMPLETED or FIZZLED

time_reserved

**Returns* – datetime* – the time the Launch was RESERVED in the queue

time_start

**Returns* – datetime* – the time the Launch started RUNNING

to_db_dict(*args, **kwargs)
to_dict(*args, **kwargs)
touch_history(update_time=None)

Updates the update_on field of the state history of a Launch. Used to ping that a Launch is still alive.

Parameters:update_time (datetime) –
class fireworks.core.firework.Tracker(filename, nlines=25, content=u'', allow_zipped=False)

Bases: fireworks.utilities.fw_serializers.FWSerializable, object

A Tracker monitors a file and returns the last N lines for updating the Launch object.

MAX_TRACKER_LINES = 1000
classmethod from_dict(m_dict)
to_dict()
track_file(launch_dir=None)

Reads the monitored file and returns back the last N lines

Parameters:launch_dir (str) – directory where job was launched in case of relative filename
Returns:the content(last N lines)
Return type:str
class fireworks.core.firework.Workflow(fireworks, links_dict=None, name=None, metadata=None, created_on=None, updated_on=None, fw_states=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

A Workflow connects a group of FireWorks in an execution order.

Bases: dict, fireworks.utilities.fw_serializers.FWSerializable

An inner class for storing the DAG links between FireWorks

classmethod from_dict(m_dict)
nodes

Return list of all nodes

Return a dict of child and its parents.

Note: if performance of parent_links becomes an issue, override delitem/setitem to update parent_links

to_db_dict()

Convert to str form for Mongo, which cannot have int keys .

Returns:dict
to_dict()

Convert to str form for Mongo, which cannot have int keys.

Returns:dict
Workflow.append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=False)

Method to add a workflow as a child to a Firework Note: detours must have children that have STATE_RANK that is WAITING or below

Parameters:
  • new_wf (Workflow) – New Workflow to add.
  • fw_ids ([int]) – ids of the parent Fireworks on which to add the Workflow.
  • detour (bool) – add children of the current Firework to the Workflow’s leaves.
  • pull_spec_mods (bool) – pull spec mods of COMPLETED parents, refreshes the WF states.
Returns:

list of Firework ids that were updated or new

Return type:

[int]

Workflow.apply_action(action, fw_id)

Apply a FWAction on a Firework in the Workflow.

Parameters:
  • action (FWAction) – action to apply
  • fw_id (int) – id of Firework on which to apply the action
Returns:

list of Firework ids that were updated or new

Return type:

[int]

classmethod Workflow.from_Firework(fw, name=None, metadata=None)

Return Workflow from the given Firework.

Parameters:
  • fw (Firework) –
  • name (str) – New workflow’s name. if not provided, the firework name is used
  • metadata (dict) – New workflow’s metadata.
Returns:

Workflow

classmethod Workflow.from_dict(m_dict)

Return Workflow from its dict representation.

Parameters:m_dict (dict) – either a Workflow dict or a Firework dict
Returns:Workflow
classmethod Workflow.from_wflow(wflow)

Create a fresh Workflow from an existing one.

Parameters:wflow (Workflow) –
Returns:Workflow
Workflow.fws

Return list of all fireworks

Workflow.leaf_fw_ids

Gets leaf FireWorks of this workflow (those with no children).

Returns:Firework ids of leaf FWs
Return type:[int]
Workflow.refresh(fw_id, updated_ids=None)

Refreshes the state of a Firework and any affected children.

Parameters:
  • fw_id (int) – id of the Firework on which to perform the refresh
  • updated_ids ([int]) –
Returns:

list of Firework ids that were updated

Return type:

set(int)

Workflow.rerun_fw(fw_id, updated_ids=None)

Archives the launches of a Firework so that it can be re-run.

Parameters:
  • fw_id (int) – id of firework to tbe rerun
  • updated_ids (set(int)) – set of fireworks id to rerun
Returns:

list of Firework ids that were updated

Return type:

[int]

Workflow.reset(reset_ids=True)

Reset the states of all Fireworks in this workflow to ‘WAITING’.

Parameters:reset_ids (bool) – if True, give each Firework a new id.
Workflow.root_fw_ids

Gets root FireWorks of this workflow (those with no parents).

Returns:Firework ids of root FWs
Return type:[int]
Workflow.state

**Returns* – state (str)* – state of workflow

Workflow.to_db_dict()
Workflow.to_dict()
Workflow.to_display_dict()

fireworks.core.fworker module

class fireworks.core.fworker.FWorker(name=u'Automatically generated Worker', category=u'', query=None, env=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

classmethod auto_load()

Returns FWorker object from settings file(my_fworker.yaml).

classmethod from_dict(*args, **kwargs)
query

Returns updated query dict.

to_dict(*args, **kwargs)

fireworks.core.launchpad module

class fireworks.core.launchpad.LaunchPad(host=u'localhost', port=27017, name=u'fireworks', username=None, password=None, logdir=None, strm_lvl=None, user_indices=None, wf_user_indices=None, ssl_ca_file=None)

Bases: fireworks.utilities.fw_serializers.FWSerializable

The LaunchPad manages the FireWorks database.

add_offline_run(launch_id, fw_id, name)

Add the launch and firework to the offline_run collection.

Parameters:
  • launch_id (int) – launch id
  • fw_id (id) – firework id
  • name (str) –
add_wf(wf, reassign_all=True)

Add workflow(or firework) to the launchpad. The firework ids will be reassigned.

Parameters:wf (Workflow/Firework) –
Returns:mapping between old and new Firework ids
Return type:dict
append_wf(new_wf, fw_ids, detour=False, pull_spec_mods=True)

Append a new workflow on top of an existing workflow.

Parameters:
  • new_wf (Workflow) – The new workflow to append
  • fw_ids ([int]) – The parent fw_ids at which to append the workflow
  • detour (bool) – Whether to connect the new Workflow in a “detour” style, i.e., move
  • children of the parent fw_ids to the new_wf (original) –
  • pull_spec_mods (bool) – Whether the new Workflow should pull the FWActions of the parent fw_ids
archive_wf(fw_id)

Archive the workflow containing the given firework id.

Parameters:fw_id (int) – firework id
classmethod auto_load()
cancel_reservation(launch_id)

given the launch id, cancel the reservation and rerun the fireworks

cancel_reservation_by_reservation_id(reservation_id)

Given the reservation id, cancel the reservation and rerun the corresponding fireworks.

change_launch_dir(launch_id, launch_dir)

Change the launch directory corresponding to the given launch id.

Parameters:
  • launch_id (int) –
  • launch_dir (str) – path to the new launch directory.
checkout_fw(fworker, launch_dir, fw_id=None, host=None, ip=None, state=u'RUNNING')

Checkout the next ready firework, mark it with the given state(RESERVED or RUNNING) and return it to the caller. The caller is responsible for running the Firework.

Parameters:
  • fworker (FWorker) – A FWorker instance
  • launch_dir (str) – the dir the FW will be run in (for creating a Launch object)
  • fw_id (int) – Firework id
  • host (str) – the host making the request (for creating a Launch object)
  • ip (str) – the ip making the request (for creating a Launch object)
  • state (str) – RESERVED or RUNNING, the fetched firework’s state will be set to this value.
Returns:

firework and the new launch id

Return type:

(Firework, int)

complete_launch(launch_id, action=None, state=u'COMPLETED')

Internal method used to mark a Firework’s Launch as completed.

Parameters:
  • launch_id (int) –
  • action (FWAction) – the FWAction of what to do next
  • state (str) – COMPLETED or FIZZLED
Returns:

updated launch

Return type:

dict

defuse_fw(fw_id, rerun_duplicates=True)

Given the firework id, defuse the firework and refresh the workflow.

Parameters:
  • fw_id (int) – firework id
  • rerun_duplicates (bool) – if True, duplicate fireworks(ones with the same launch) are marked for rerun and then defused.
defuse_wf(fw_id, defuse_all_states=True)

Defuse the workflow containing the given firework id.

Parameters:
  • fw_id (int) – firework id
  • defuse_all_states (bool) –
delete_wf(fw_id)

Delete the workflow containing firework with the given id.

Parameters:fw_id (int) – Firework id
detect_lostruns(expiration_secs=14400, fizzle=False, rerun=False, max_runtime=None, min_runtime=None, refresh=False)

Detect lost runs i.e running fireworks that haven’t been updated within the specified time limit or running firework whose launch has been marked fizzed or completed.

Parameters:
  • expiration_secs (seconds) – expiration time in seconds
  • fizzle (bool) – if True, mark the lost runs fizzed
  • rerun (bool) – if True, mark the lost runs fizzed and rerun
  • max_runtime (seconds) – maximum run time
  • min_runtime (seconds) – minimum run time
  • refresh (bool) – if True, refresh the workflow with inconsistent fireworks.
Returns:

tuple of list of lost launch ids, lost firework ids and

inconsistent firework ids.

Return type:

([int], [int], [int])

detect_unreserved(expiration_secs=1209600, rerun=False)

Return the reserved launch ids that have not been updated for a while.

Parameters:
  • expiration_secs (seconds) – time limit
  • rerun (bool) – if True, the expired reservations are cancelled and the fireworks rerun.
Returns:

list of expired lacunh ids

Return type:

[int]

forget_offline(launchid_or_fwid, launch_mode=True)

Unmark the offline run for the given launch or firework id.

Parameters:
  • launchid_or_fwid (int) – launch od or firework id
  • launch_mode (bool) – if True then launch id is given.
classmethod from_dict(d)
get_fw_by_id(fw_id)

Given a Firework id, give back a Firework object.

Parameters:fw_id (int) – Firework id.
Returns:Firework object
get_fw_dict_by_id(fw_id)

Given firework id, return firework dict.

Parameters:fw_id (int) – firework id
Returns:dict
get_fw_ids(query=None, sort=None, limit=0, count_only=False, launches_mode=False)

Return all the fw ids that match a query.

Parameters:
  • query (dict) – representing a Mongo query
  • [ (sort) – sort argument in Pymongo format
  • limit (int) – limit the results
  • count_only (bool) – only return the count rather than explicit ids
  • launches_mode (bool) – query the launches collection instead of fireworks
Returns:

list of firework ids matching the query

Return type:

list

get_fw_ids_from_reservation_id(reservation_id)

Given the reservation id, return the list of firework ids.

Parameters:reservation_id (int) –
Returns:list of firework ids.
Return type:[int]
get_launch_by_id(launch_id)

Given a Launch id, return details of the Launch.

Parameters:launch_id (int) – launch id
Returns:Launch object
get_logdir()

Return the log directory.

AJ: This is needed for job packing due to Proxy objects not being fully featured...

get_new_fw_id()

Checkout the next Firework id

get_new_launch_id()

Checkout the next Launch id

get_reservation_id_from_fw_id(fw_id)

Given the firework id, return the reservation id

get_tracker_data(fw_id)
Parameters:fw_id (id) – firework id
Returns:list tracker dicts
Return type:[dict]
get_wf_by_fw_id(fw_id)

Given a Firework id, give back the Workflow containing that Firework.

Parameters:fw_id (int) –
Returns:A Workflow object
get_wf_by_fw_id_lzyfw(fw_id)

Given a FireWork id, give back the Workflow containing that FireWork.

Parameters:fw_id (int) –
Returns:A Workflow object
get_wf_ids(query=None, sort=None, limit=0, count_only=False)

Return one fw id for all workflows that match a query.

Parameters:
  • query (dict) – representing a Mongo query
  • [ (sort) – sort argument in Pymongo format
  • limit (int) – limit the results
  • count_only (bool) – only return the count rather than explicit ids
Returns:

list of firework ids

Return type:

list

get_wf_summary_dict(fw_id, mode=u'more')

A much faster way to get summary information about a Workflow by querying only for needed information.

Parameters:
  • fw_id (int) – A Firework id.
  • mode (str) – Choose between “more”, “less” and “all” in terms of quantity of information.
Returns:

information about Workflow.

Return type:

dict

log_message(level, message)

Support for job packing

Parameters:
  • level (str) –
  • message (str) –
maintain(infinite=True, maintain_interval=None)

Perform launchpad maintenance: detect lost runs and unreserved RESERVE launches.

Parameters:
  • infinite (bool) –
  • maintain_interval (seconds) – sleep time
mark_fizzled(launch_id)

Mark the launch corresponding to the given id as FIZZLED.

Parameters:launch_id (int) – launch id
Returns:updated launch
Return type:dict
pause_fw(fw_id)

Given the firework id, pauses the firework and refresh the workflow

Parameters:fw_id (int) – firework id
pause_wf(fw_id)

Pause the workflow containing the given firework id.

Parameters:
  • fw_id (int) – firework id
  • defuse_all_states (bool) –
ping_launch(launch_id, ptime=None)

Ping that a Launch is still alive: updates the ‘update_on ‘field of the state history of a Launch.

Parameters:
recover_offline(launch_id, ignore_errors=False, print_errors=False)

Update the launch state using the offline data in FW_offline.json file.

Parameters:
  • launch_id (int) – launch id
  • ignore_errors (bool) –
  • print_errors (bool) –
Returns:

firework id if the recovering fails otherwise None

reignite_fw(fw_id)

Given the firework id, re-ignite(set state=WAITING) the defused firework.

Parameters:fw_id (int) – firework id
reignite_wf(fw_id)

Reignite the workflow containing the given firework id.

Parameters:fw_id (int) – firework id
rerun_fw(fw_id, rerun_duplicates=True, clear_recovery=False)

Rerun the firework corresponding to the given id.

Parameters:
  • fw_id (int) – firework id
  • rerun_duplicates (bool) –
Returns:

list of firework ids that were rerun

Return type:

[int]

rerun_fws_task_level(fw_id, rerun_duplicates=True, launch_id=None, recover_mode=None)

Rerun a fw at the task level.

Parameters:
  • fw_id (int) – fw_id to rerun
  • rerun_duplicates (bool) – also rerun duplicate FWs
  • launch_id (int) – launch id to rerun, if known. otherwise the last launch_id will be used
  • recover_mode (str) – use “prev_dir” to run again in previous dir, “cp” to try to copy data to new dir, or None to start from scratch
Returns:

list of rerun firework ids.

Return type:

[int]

reserve_fw(fworker, launch_dir, host=None, ip=None, fw_id=None)

Checkout the next ready firework and mark the launch reserved.

Parameters:
  • fworker (FWorker) –
  • launch_dir (str) – path to the launch directory.
  • host (str) – hostname
  • ip (str) – ip address
  • fw_id (int) – fw_id to be reserved, if desired
Returns:

the checked out firework and the new launch id

Return type:

(Firework, int)

reset(password, require_password=True, max_reset_wo_password=25)

Create a new FireWorks database. This will overwrite the existing FireWorks database! To safeguard against accidentally erasing an existing database, a password must be entered.

Parameters:
  • password (str) – A String representing today’s date, e.g. ‘2012-12-31’
  • require_password (bool) – Whether a password is required to reset the DB. Setting to false is dangerous because running code unintentionally could clear your DB - use max_reset_wo_password to minimize risk.
  • max_reset_wo_password (int) – A failsafe; when require_password is set to False, FWS will not clear DBs that contain more workflows than this parameter
restore_backup_data(launch_id, fw_id)

For the given launch id and firework id, restore the back up data.

resume_fw(fw_id)

Given the firework id, resume (set state=WAITING) the paused firework.

Parameters:fw_id (int) – firework id
run_exists(fworker=None)

Checks to see if the database contains any FireWorks that are ready to run.

Returns:True if the database contains any FireWorks that are ready to run.
Return type:bool
set_priority(fw_id, priority)

Set priority to the firework with the given id.

Parameters:
  • fw_id (int) – firework id
  • priority
set_reservation_id(launch_id, reservation_id)

Set reservation id to the launch corresponding to the given launch id.

Parameters:
  • launch_id (int) –
  • reservation_id (int) –
to_dict()

Note: usernames/passwords are exported as unencrypted Strings!

tuneup(bkground=True)

Database tuneup: build indexes

update_spec(fw_ids, spec_document, mongo=False)

Update fireworks with a spec. Sometimes you need to modify a firework in progress.

Parameters:
  • [int] (fw_ids) – All fw_ids to modify.
  • spec_document (dict) – The spec document. Note that only modifications to the spec key are allowed. So if you supply {“_tasks.1.parameter”: “hello”}, you are effectively modifying spec._tasks.1.parameter in the actual fireworks collection.
  • mongo (bool) – spec_document uses mongo syntax to directly update the spec
class fireworks.core.launchpad.LazyFirework(fw_id, fw_coll, launch_coll)

Bases: object

A LazyFirework only has the fw_id, and grabs other data just-in-time. This representation can speed up Workflow loading as only “important” FWs need to be fully loaded.

archived_launches
created_on
db_fields = (u'name', u'fw_id', u'spec', u'created_on', u'state')
db_launch_fields = (u'launches', u'archived_launches')
full_fw
launches
name
parents
partial_fw
spec
state
tasks
to_db_dict()
to_dict()
updated_on
exception fireworks.core.launchpad.LockedWorkflowError

Bases: exceptions.ValueError

Error raised if the context manager WFLock can’t acquire the lock on the WF within the selected time interval (WFLOCK_EXPIRATION_SECS), if the killing of the lock is disabled (WFLOCK_EXPIRATION_KILL)

class fireworks.core.launchpad.WFLock(lp, fw_id, expire_secs=300, kill=False)

Bases: object

Lock a Workflow, i.e. for performing update operations Raises a LockedWorkflowError if the lock couldn’t be acquired withing expire_secs and kill==False. Calling functions are responsible for handling the error in order to avoid database inconsistencies.

fireworks.core.rocket module

class fireworks.core.rocket.Rocket(launchpad, fworker, fw_id)

The Rocket fetches a workflow step from the FireWorks database and executes it.

decorate_fwaction(fwaction, my_spec, m_fw, launch_dir)
run()

Run the rocket (check out a job from the database and execute it)

fireworks.core.rocket.background_task(btask, spec, stop_event, master_thread)
fireworks.core.rocket.do_ping(launchpad, launch_id)
fireworks.core.rocket.ping_launch(launchpad, launch_id, stop_event, master_thread)
fireworks.core.rocket.start_background_task(btask, spec)
fireworks.core.rocket.start_ping_launch(launchpad, launch_id)
fireworks.core.rocket.stop_backgrounds(ping_stop, btask_stops)

fireworks.core.rocket_launcher module

fireworks.core.rocket_launcher.get_fworker(fworker)
fireworks.core.rocket_launcher.launch_rocket(launchpad, fworker=None, fw_id=None, strm_lvl=u'INFO')

Run a single rocket in the current directory.

Parameters:
  • launchpad (LaunchPad) –
  • fworker (FWorker) –
  • fw_id (int) – if set, a particular Firework to run
  • strm_lvl (str) – level at which to output logs to stdout
Returns:

bool

fireworks.core.rocket_launcher.rapidfire(launchpad, fworker=None, m_dir=None, nlaunches=0, max_loops=-1, sleep_time=None, strm_lvl=u'INFO', timeout=None)

Keeps running Rockets in m_dir until we reach an error. Automatically creates subdirectories for each Rocket. Usually stops when we run out of FireWorks from the LaunchPad.

Parameters:
  • launchpad (LaunchPad) –
  • fworker (FWorker object) –
  • m_dir (str) – the directory in which to loop Rocket running
  • nlaunches (int) – 0 means ‘until completion’, -1 or “infinite” means to loop until max_loops
  • max_loops (int) – maximum number of loops (default -1 is infinite)
  • sleep_time (int) – secs to sleep between rapidfire loop iterations
  • strm_lvl (str) – level at which to output logs to stdout
  • timeout (int) – of seconds after which to stop the rapidfire process

Module contents