Source code for bart.sched.SchedMultiAssert

#    Copyright 2015-2016 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A library for asserting scheduler scenarios based on the
statistics aggregation framework"""

import re
import inspect
import trappy
from bart.sched import functions as sched_funcs
from bart.sched.SchedAssert import SchedAssert
from bart.common import Utils

[docs]class SchedMultiAssert(object): """This is vector assertion class built on top of :mod:`bart.sched.SchedAssert.SchedAssert` :param ftrace: A single trappy.FTrace object or a path that can be passed to trappy.FTrace :type ftrace: :mod:`trappy.ftrace.FTrace` :param topology: A topology that describes the arrangement of CPU's on a system. This is useful for multi-cluster systems where data needs to be aggregated at different topological levels :type topology: :mod:`trappy.stats.Topology.Topology` :param execnames: The execnames of the task to be analysed A single execname or a list of execnames can be passed. There can be multiple processes associated with a single execname parameter. The execnames are searched using a prefix match. :type execname: list, str :param pids: The process IDs of the tasks to be analysed :type pids: list, int Consider the following processes which need to be analysed ===== ============== PID execname ===== ============== 11 task_1 22 task_2 33 task_3 ===== ============== A :mod:`bart.sched.SchedMultiAssert.SchedMultiAssert` instance be created following different ways: - Using execname prefix match :: SchedMultiAssert(ftrace, topology, execnames="task_") - Individual Task names :: SchedMultiAssert(ftrace, topology, execnames=["task_1", "task_2", "task_3"]) - Using Process IDs :: SchedMultiAssert(ftrace, topology, pids=[11, 22, 33]) All the functionality provided in :mod:`bart.sched.SchedAssert.SchedAssert` is available in this class with the addition of handling vector assertions. For example consider the use of :func:`getDutyCycle` :: >>> s = SchedMultiAssert(ftrace, topology, execnames="task_") >>> s.getDutyCycle(window=(start, end)) { "11": { "task_name": "task_1", "dutycycle": 10.0 }, "22": { "task_name": "task_2", "dutycycle": 20.0 }, "33": { "task_name": "task_3", "dutycycle": 30.0 }, } The assertions can be used in a similar way :: >>> import operator as op >>> s = SchedMultiAssert(ftrace, topology, execnames="task_") >>> s.assertDutyCycle(15, op.ge, window=(start, end)) { "11": { "task_name": "task_1", "dutycycle": False }, "22": { "task_name": "task_2", "dutycycle": True }, "33": { "task_name": "task_3", "dutycycle": True }, } The above result can be coalesced using a :code:`rank` parameter As we know that only 2 processes have duty cycles greater than 15% we can do the following: :: >>> import operator as op >>> s = SchedMultiAssert(ftrace, topology, execnames="task_") >>> s.assertDutyCycle(15, op.ge, window=(start, end), rank=2) True See :mod:`bart.sched.SchedAssert.SchedAssert` for the available functionality """ def __init__(self, ftrace, topology, execnames=None, pids=None): self._ftrace = Utils.init_ftrace(ftrace) self._topology = topology if execnames and pids: raise ValueError('Either pids or execnames must be specified') if execnames: self._execnames = Utils.listify(execnames) self._pids = self._populate_pids() elif pids: self._pids = pids else: raise ValueError('One of PIDs or execnames must be specified') self._asserts = self._populate_asserts() self._populate_methods() def _populate_asserts(self): """Populate SchedAsserts for the PIDs""" asserts = {} for pid in self._pids: asserts[pid] = SchedAssert(self._ftrace, self._topology, pid=pid) return asserts def _populate_pids(self): """Map the input execnames to PIDs""" if len(self._execnames) == 1: return sched_funcs.get_pids_for_process(self._ftrace, self._execnames[0]) pids = [] for proc in self._execnames: pids += sched_funcs.get_pids_for_process(self._ftrace, proc) return list(set(pids)) def _create_method(self, attr_name): """A wrapper function to create a dispatch function""" return lambda *args, **kwargs: self._dispatch(attr_name, *args, **kwargs) def _populate_methods(self): """Populate Methods from SchedAssert""" for attr_name in dir(SchedAssert): attr = getattr(SchedAssert, attr_name) valid_method = attr_name.startswith("get") or \ attr_name.startswith("assert") if inspect.ismethod(attr) and valid_method: func = self._create_method(attr_name) setattr(self, attr_name, func)
[docs] def get_task_name(self, pid): """Get task name for the PID""" return self._asserts[pid].execname
def _dispatch(self, func_name, *args, **kwargs): """The dispatch function to call into the SchedAssert Method """ assert_func = func_name.startswith("assert") num_true = 0 rank = kwargs.pop("rank", None) result = kwargs.pop("result", {}) param = kwargs.pop("param", re.sub(r"assert|get", "", func_name, count=1).lower()) for pid in self._pids: if pid not in result: result[pid] = {} result[pid]["task_name"] = self.get_task_name(pid) attr = getattr(self._asserts[pid], func_name) result[pid][param] = attr(*args, **kwargs) if assert_func and result[pid][param]: num_true += 1 if assert_func and rank: return num_true == rank else: return result
[docs] def getCPUBusyTime(self, level, node, window=None, percent=False): """Get the amount of time the cpus in the system were busy executing the tasks :param level: The topological level to which the group belongs :type level: string :param node: The group of CPUs for which to calculate busy time :type node: list :param window: A (start, end) tuple to limit the scope of the calculation. :type window: tuple :param percent: If True the result is normalized to the total time of the period, either the window or the full lenght of the trace. :type percent: bool .. math:: R = \\frac{T_{busy} \\times 100}{T_{total}} """ residencies = self.getResidency(level, node, window=window) busy_time = sum(v["residency"] for v in residencies.itervalues()) if percent: if window: total_time = window[1] - window[0] else: total_time = self._ftrace.get_duration() num_cpus = len(node) return busy_time / (total_time * num_cpus) * 100 else: return busy_time
[docs] def generate_events(self, level, window=None): """Generate Events for the trace plot .. note:: This is an internal function for plotting data """ events = {} for s_assert in self._asserts.values(): events[s_assert.name] = s_assert.generate_events(level, window=window) return events
[docs] def plot(self, level="cpu", window=None, xlim=None): """ :return: :mod:`trappy.plotter.AbstractDataPlotter` instance Call :func:`view` to draw the graph """ if not xlim: if not window: xlim = [0, self._ftrace.get_duration()] else: xlim = list(window) events = self.generate_events(level, window) names = [s.name for s in self._asserts.values()] num_lanes = self._topology.level_span(level) lane_prefix = level.upper() + ": " return trappy.EventPlot(events, names, xlim, lane_prefix=lane_prefix, num_lanes=num_lanes)