Source code for bart.sched.functions

#    Copyright 2015-2016 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Scheduler specific Functionality for the
stats framework

The Scheduler stats aggregation is based on a signal
which is generated by the combination of two triggers
from the events with the following parameters

========================= ============ =============
EVENT                       VALUE          FILTERS
========================= ============ =============
:func:`sched_switch`        1           next_pid
:func:`sched_switch`      -1           prev_pid
========================= ============ =============

Both these Triggers are provided by the event
:mod:`trappy.sched.SchedSwitch` which correspond to
the :code:`sched_switch` unique word in the trace

.. seealso:: :mod:`trappy.stats.Trigger.Trigger`

Using the above information the following signals are
generated.

**EVENT SERIES**

This is a combination of the two triggers as specified
above and has alternating +/- 1 values and is merely
a representation of the position in time when the process
started or stopped running on a CPU

**RESIDENCY SERIES**

This series is a cumulative sum of the event series and
is a representation of the continuous residency of the
process on a CPU

The pivot for the aggregators is the CPU on which the
event occurred on. If N is the number of CPUs in the
system, N signal for each CPU are generated. These signals
can then be aggregated by specifying a Topology

.. seealso:: :mod:`trappy.stats.Topology.Topology`
"""

import numpy as np
from trappy.stats.Trigger import Trigger

WINDOW_SIZE = 0.0001
"""A control config for filter events. Some analyses
may require ignoring of small interruptions"""

# Trigger Values
SCHED_SWITCH_IN = 1
"""Value of the event when a task is **switch in**
or scheduled on a CPU"""
SCHED_SWITCH_OUT = -1
"""Value of the event when a task is **switched out**
or relinquishes a CPU"""
NO_EVENT = 0
"""Signifies no event on an event trace"""

# Field Names
CPU_FIELD = "__cpu"
"""The column in the sched_switch event that
indicates the CPU on which the event occurred
"""
NEXT_PID_FIELD = "next_pid"
"""The column in the sched_switch event that
indicates the PID of the next process to be scheduled
"""
PREV_PID_FIELD = "prev_pid"
"""The column in the sched_switch event that
indicates the PID of the process that was scheduled
in
"""
TASK_RUNNING = 1
"""The column in the sched_switch event that
indicates the CPU on which the event occurred
"""
TASK_NOT_RUNNING = 0
"""In a residency series, a zero indicates
that the task is not running
"""
TIME_INVAL = -1
"""Standard Value to indicate invalid time data"""
SERIES_SANTIZED = "_sched_sanitized"
"""A memoized flag which is set when an event series
is checked for boundary conditions
"""


[docs]def sanitize_asymmetry(series, window=None): """Sanitize the cases when a :code:`SWITCH_OUT` happens before a :code:`SWITCH_IN`. (The case when a process is already running before the trace started) :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple """ if not hasattr(series, SERIES_SANTIZED): events = series[series != 0] if len(series) >= 2 and len(events): if series.values[0] == SCHED_SWITCH_OUT: series.values[0] = TASK_NOT_RUNNING elif events.values[0] == SCHED_SWITCH_OUT: series.values[0] = SCHED_SWITCH_IN if window: series.index.values[0] = window[0] if series.values[-1] == SCHED_SWITCH_IN: series.values[-1] = TASK_NOT_RUNNING elif events.values[-1] == SCHED_SWITCH_IN: series.values[-1] = SCHED_SWITCH_OUT if window: series.index.values[-1] = window[1] # No point if the series just has one value and # one event. We do not have sufficient data points # for any calculation. We should Ideally never reach # here. elif len(series) == 1: series.values[0] = 0 setattr(series, SERIES_SANTIZED, True) return series
[docs]def csum(series, window=None, filter_gaps=False): """:func:`aggfunc` for the cumulative sum of the input series data :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :param filter_gaps: If set, a process being switched out for :mod:`bart.sched.functions.WINDOW_SIZE` is ignored. This is helpful when small interruptions need to be ignored to compare overall correlation :type filter_gaps: bool """ if filter_gaps: series = filter_small_gaps(series) series = series.cumsum() return select_window(series, window)
[docs]def filter_small_gaps(series): """A helper function that does filtering of gaps in residency series < :mod:`bart.sched.functions.WINDOW_SIZE` :param series: Input Time Series data :type series: :mod:`pandas.Series` """ start = None for index, value in series.iteritems(): if value == SCHED_SWITCH_IN: if start == None: continue if index - start < WINDOW_SIZE: series[start] = NO_EVENT series[index] = NO_EVENT start = None if value == SCHED_SWITCH_OUT: start = index return series
[docs]def first_cpu(series, window=None): """:func:`aggfunc` to calculate the time of the first switch in event in the series This is returned as a vector of unit length so that it can be aggregated and reduced across nodes to find the first cpu of a task :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple """ series = select_window(series, window) series = series[series == SCHED_SWITCH_IN] if len(series): return [series.index.values[0]] else: return [float("inf")]
[docs]def last_cpu(series, window=None): """:func:`aggfunc` to calculate the time of the last switch out event in the series This is returned as a vector of unit length so that it can be aggregated and reduced across nodes to find the last cpu of a task :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple """ series = select_window(series, window) series = series[series == SCHED_SWITCH_OUT] if len(series): return [series.index.values[-1]] else: return [0]
[docs]def select_window(series, window): """Helper Function to select a portion of pandas time series :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple """ if not window: return series start, stop = window ix = series.index selector = ((ix >= start) & (ix <= stop)) window_series = series[selector] return window_series
[docs]def residency_sum(series, window=None): """:func:`aggfunc` to calculate the total residency The input series is processed for intervals between a :mod:`bart.sched.functions.SCHED_SWITCH_OUT` and :mod:`bart.sched.functions.SCHED_SWITCH_IN` to track additive residency of a task .. math:: S_{in} = i_{1}, i_{2}...i_{N} \\\\ S_{out} = o_{1}, o_{2}...o_{N} \\\\ R_{total} = \sum_{k}^{N}\Delta_k = \sum_{k}^{N}(o_{k} - i_{k}) :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :return: A scalar float value """ if not len(series): return 0.0 org_series = series series = select_window(series, window) series = sanitize_asymmetry(series, window) s_in = series[series == SCHED_SWITCH_IN] s_out = series[series == SCHED_SWITCH_OUT] if not (len(s_in) and len(s_out)): try: org_series = sanitize_asymmetry(org_series) running = select_window(org_series.cumsum(), window) if running.values[0] == TASK_RUNNING and running.values[-1] == TASK_RUNNING: return window[1] - window[0] except Exception,e: pass if len(s_in) != len(s_out): raise RuntimeError( "Unexpected Lengths: s_in={}, s_out={}".format( len(s_in), len(s_out))) else: return np.sum(s_out.index.values - s_in.index.values)
[docs]def first_time(series, value, window=None): """:func:`aggfunc` to: - Return the first index where the series == value - If no such index is found +inf is returned :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :return: A vector of Unit Length """ series = select_window(series, window) series = series[series == value] if not len(series): return [float("inf")] return [series.index.values[0]]
[docs]def period(series, align="start", window=None): """This :func:`aggfunc` returns a tuple of the average duration between two triggers: - When :code:`align=start` the :code:`SCHED_IN` trigger is used - When :code:`align=end` the :code:`SCHED_OUT` trigger is used .. math:: E = e_{1}, e_{2}...e_{N} \\\\ T_p = \\frac{\sum_{j}^{\lfloor N/2 \\rfloor}(e_{2j + 1} - e_{2j})}{N} :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :return: A list of deltas of successive starts/stops of a task """ series = select_window(series, window) series = sanitize_asymmetry(series, window) if align == "start": series = series[series == SCHED_SWITCH_IN] elif align == "end": series = series[series == SCHED_SWITCH_OUT] if len(series) % 2 == 0: series = series[:1] if not len(series): return [] return list(np.diff(series.index.values))
[docs]def last_time(series, value, window=None): """:func:`aggfunc` to: - The first index where the series == value - If no such index is found :mod:`bart.sched.functions.TIME_INVAL` is returned :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :return: A vector of Unit Length """ series = select_window(series, window) series = series[series == value] if not len(series): return [TIME_INVAL] return [series.index.values[-1]]
[docs]def binary_correlate(series_x, series_y): """Helper function to Correlate binary Data Both the series should have same indices For binary time series data: .. math:: \\alpha_{corr} = \\frac{N_{agree} - N_{disagree}}{N} :param series_x: First time Series data :type series_x: :mod:`pandas.Series` :param series_y: Second time Series data :type series_y: :mod:`pandas.Series` """ if len(series_x) != len(series_y): raise ValueError("Cannot compute binary correlation for \ unequal vectors") agree = len(series_x[series_x == series_y]) disagree = len(series_x[series_x != series_y]) return (agree - disagree) / float(len(series_x))
[docs]def get_pids_for_process(ftrace, execname, cls=None): """Get the PIDs for a given process :param ftrace: A ftrace object with a sched_switch event :type ftrace: :mod:`trappy.ftrace.FTrace` :param execname: The name of the process :type execname: str :param cls: The SchedSwitch event class (required if a different event is to be used) :type cls: :mod:`trappy.base.Base` :return: The set of PIDs for the execname """ if not cls: try: df = ftrace.sched_switch.data_frame except AttributeError: raise ValueError("SchedSwitch event not found in ftrace") if len(df) == 0: raise ValueError("SchedSwitch event not found in ftrace") else: event = getattr(ftrace, cls.name) df = event.data_frame mask = df["next_comm"].apply(lambda x : True if x == execname else False) return list(np.unique(df[mask]["next_pid"].values))
[docs]def get_task_name(ftrace, pid, cls=None): """Returns the execname for pid :param ftrace: A ftrace object with a sched_switch event :type ftrace: :mod:`trappy.ftrace.FTrace` :param pid: The PID of the process :type pid: int :param cls: The SchedSwitch event class (required if a different event is to be used) :type cls: :mod:`trappy.base.Base` :return: The execname for the PID """ if not cls: try: df = ftrace.sched_switch.data_frame except AttributeError: raise ValueError("SchedSwitch event not found in ftrace") else: event = getattr(ftrace, cls.name) df = event.data_frame df = df[df["next_pid"] == pid] if not len(df): return "" else: return df["next_comm"].values[0]
[docs]def sched_triggers(ftrace, pid, sched_switch_class): """Returns the list of sched_switch triggers :param ftrace: A ftrace object with a sched_switch event :type ftrace: :mod:`trappy.ftrace.FTrace` :param pid: The PID of the associated process :type pid: int :param sched_switch_class: The SchedSwitch event class :type sched_switch_class: :mod:`trappy.base.Base` :return: List of triggers, such that :: triggers[0] = switch_in_trigger triggers[1] = switch_out_trigger """ if not hasattr(ftrace, "sched_switch"): raise ValueError("SchedSwitch event not found in ftrace") triggers = [] triggers.append(sched_switch_in_trigger(ftrace, pid, sched_switch_class)) triggers.append(sched_switch_out_trigger(ftrace, pid, sched_switch_class)) return triggers
[docs]def sched_switch_in_trigger(ftrace, pid, sched_switch_class): """ :param ftrace: A ftrace object with a sched_switch event :type ftrace: :mod:`trappy.ftrace.FTrace` :param pid: The PID of the associated process :type pid: int :param sched_switch_class: The SchedSwitch event class :type sched_switch_class: :mod:`trappy.base.Base` :return: :mod:`trappy.stats.Trigger.Trigger` on the SchedSwitch: IN for the given PID """ task_in = {} task_in[NEXT_PID_FIELD] = pid return Trigger(ftrace, sched_switch_class, # trappy Event Class task_in, # Filter Dictionary SCHED_SWITCH_IN, # Trigger Value CPU_FIELD) # Primary Pivot
[docs]def sched_switch_out_trigger(ftrace, pid, sched_switch_class): """ :param ftrace: A ftrace object with a sched_switch event :type ftrace: :mod:`trappy.ftrace.FTrace` :param pid: The PID of the associated process :type pid: int :param sched_switch_class: The SchedSwitch event class :type sched_switch_class: :mod:`trappy.base.Base` :return: :mod:`trappy.stats.Trigger.Trigger` on the SchedSwitch: OUT for the given PID """ task_out = {} task_out[PREV_PID_FIELD] = pid return Trigger(ftrace, sched_switch_class, # trappy Event Class task_out, # Filter Dictionary SCHED_SWITCH_OUT, # Trigger Value CPU_FIELD) # Primary Pivot
[docs]def trace_event(series, window=None): """ :func:`aggfunc` to be used for plotting the process residency data using :mod:`trappy.plotter.EventPlot` :param series: Input Time Series data :type series: :mod:`pandas.Series` :param window: A tuple indicating a time window :type window: tuple :return: A list of events of the type: :: [ [start_time_1, stop_time_1], [start_time_2, stop_time_2], # # [start_time_N, stop_time_N], ] """ rects = [] series = select_window(series, window) series = sanitize_asymmetry(series, window) s_in = series[series == SCHED_SWITCH_IN] s_out = series[series == SCHED_SWITCH_OUT] if not len(s_in): return rects if len(s_in) != len(s_out): raise RuntimeError( "Unexpected Lengths: s_in={}, s_out={}".format( len(s_in), len(s_out))) return np.column_stack((s_in.index.values, s_out.index.values))