Source code for trappy.stats.Correlator

#    Copyright 2015-2016 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""The module responsible for correlation
and related functionality
"""
from trappy.stats import StatConf
from trappy.stats.Indexer import get_unified_indexer
import numpy as np
import math


[docs]class Correlator(object): """Class that allows to align and correlate two traces :param first: First Aggregator :type first: :mod:`trappy.stats.Aggregator` :param second: Second Aggregator :type second: :mod:`trappy.stats.Aggregator` """ def __init__(self, first, second, **kwargs): self._first_agg = first self._second_agg = second self.indexer = get_unified_indexer([first.indexer, second.indexer]) self._corrfunc = kwargs.pop("corrfunc", None) self._agg_kwargs = kwargs self.corr_graphs = {} self._shift = self._align_top_level() def _resample(self, series, delta=StatConf.DELTA_DEFAULT): """Internal method to resample the series to a uniformly spaced index :param series: Series io be resampled :type series: :mod:`pandas.Series` :param delta: spacing between indices :type delta: float :return: resampled :mod:`pandas.Series` """ new_index = self.indexer.get_uniform(delta) return series.reindex(index=new_index, method="pad")
[docs] def correlate(self, level, resample=True): """This function returns the correlation between two traces :param level: The level at which the correlation is required :type level: str :param resample: Resample data :type resample: bool :return: A normalized correlation value is returned for each group in the level """ result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs) result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs) corr_output = [] weights = [] for group_id, result_group in enumerate(result_1): series_x = result_group series_y = result_2[group_id] if resample: series_x = self._resample(series_x) series_y = self._resample(series_y) series_x, series_y = shift_series(series_x, series_y, self._shift) corr_output.append(self._correlate(series_x, series_y)) weights.append(len(series_x[series_x != 0]) + len(series_y[series_y != 0])) total = 0 for weight, corr in zip(weights, corr_output): if math.isnan(corr): continue total += (weight * corr) / sum(weights) return corr_output, total
[docs] def plot(self, level, per_line=3): """Temporary function to plot data. Expected to be implemented in plotter :param level: Topological Level (level in :mod:`trappy.stats.Topology`) :type level: str :param per_line: Number of plots per line :type per_line: int """ from trappy.plotter.PlotLayout import PlotLayout num_plots = self._first_agg.topology.level_span(level) result_1 = self._first_agg.aggregate(level=level, **self._agg_kwargs) result_2 = self._second_agg.aggregate(level=level, **self._agg_kwargs) layout = PlotLayout(per_line, num_plots) plot_index = 0 for group_id, result_group in enumerate(result_1): s_x = result_group s_y = result_2[group_id] s_x = self._resample(s_x) s_y = self._resample(s_y) s_x, s_y = shift_series(s_x, s_y, self._shift) ymax = 1.25 + max(max(s_x.values), max(s_y.values)) + 1 ymin = min(min(s_x.values), min(s_y.values)) - 1 ylim = [ymin, ymax] ylim = [-1, 3] axis = layout.get_axis(plot_index) axis.plot(s_x.index, s_x.values) axis.plot(s_y.index, s_y.values) axis.set_ylim(ylim) plot_index += 1 layout.finish(plot_index)
def _correlate(self, s_x, s_y): if self._corrfunc != None: f = self._corrfunc return f(s_x, s_y) else: return s_x.corr(s_y) def _align_top_level(self): """Temporary function to plot data. Expected to be implemented in plotter """ result_1 = self._first_agg.aggregate(level="all") result_2 = self._second_agg.aggregate(level="all") s_x = self._resample(result_1[0]) s_y = self._resample(result_2[0]) front_x, front_y, front_shift = align(s_x, s_y, mode="front") front_corr = self._correlate(front_x, front_y) back_x, back_y, back_shift = align(s_x, s_y, mode="back") back_corr = self._correlate(back_x, back_y) if math.isnan(back_corr): back_corr = 0 if math.isnan(front_corr): front_corr = 0 if front_corr >= back_corr: return front_shift else: return back_shift
[docs]def align(s_x, s_y, mode="front"): """Function to align the input series :param s_x: First Series :type s_x: :mod:`pandas.Series` :param s_y: Second Series :type s_y: :mod:`pandas.Series` :param mode: Align Front/Back :type mode: str """ p_x = np.flatnonzero(s_x) p_y = np.flatnonzero(s_y) if not len(p_x) or not len(p_y): return s_x, s_y, 0 if mode == "front": p_x = p_x[0] p_y = p_y[0] if mode == "back": p_x = p_x[-1] p_y = p_y[-1] shift = p_x - p_y s_x, s_y = shift_series(s_x, s_y, shift) return s_x, s_y, shift
[docs]def shift_series(s_x, s_y, shift): """Shift series to align :param s_x: First Series :type s_x: :mod:`pandas.Series` :param s_y: Second Series :type s_y: :mod:`pandas.Series` :param shift: The number of index positions to be shifted :type shift: int """ if shift > 0: s_y = s_y.shift(shift) else: s_x = s_x.shift(-1 * shift) return s_x, s_y