Source code for vacca.plot

#!/usr/bin/env python

#############################################################################
##
## This file is part of Taurus, a Tango User Interface Library
## 
## http://www.tango-controls.org/static/taurus/latest/doc/html/index.html
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
## 
## Taurus is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
## 
## Taurus is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
## 
## You should have received a copy of the GNU Lesser General Public License
## along with Taurus.  If not, see <http://www.gnu.org/licenses/>.
##
#############################################################################

import traceback,time
import fandango
import PyTango
import taurus
from taurus.qt import Qt
from taurus.qt.qtgui.plot import TaurusTrend,TaurusPlot
from taurus.qt.qtgui.panel import TaurusDevicePanel
from PyQt4 import Qwt5
from fandango.qt import Draggable

try:
  from PyTangoArchiving.widget.trend import ArchivingTrend
  TREND_CLASS = TaurusTrend  #TaurusArchivingTrend
except:
  TREND_CLASS = TaurusTrend

__doc__ = """
Plots in VACCA have several setups:

 * ApplyConfig is disabled, to avoid previous configs to override trend behavior
 * Default Trend length has several hours instead of seconds.
 * Archiving is still disabled, but also Scale change warnings when re-enabled.
 
"""

[docs]class PressureTrend(TREND_CLASS):
[docs] def showEvent(self, event): if not getattr(self, '_tuned', False): setup_pressure_trend(self) setattr(self, '_tuned', True) TREND_CLASS.showEvent(self, event)
[docs]class VaccaTrend(TREND_CLASS):
[docs] def showEvent(self, event): if not getattr(self, '_tuned', False): setup_pressure_trend(self,log=False,length=4*3600) setattr(self, '_tuned', True) TREND_CLASS.showEvent(self, event)
[docs]def setup_pressure_trend(tt,log=True,length=12*3600): print '*'*80 print 'in setup_pressure_trend(%s,length=%s s)' % (tt, length) from PyQt4 import Qwt5 try: #n,w = str(qtw.tabText(qtw.count()-1)),qtw.widget(qtw.count()-1) #qtw.insertTab(0,w,n) tt.setXDynScale(True) xMax = time.time() #tt.axisScaleDiv(Qwt5.QwtPlot.xBottom).upperBound() #xMin = tt.axisScaleDiv(Qwt5.QwtPlot.xBottom).lowerBound() rg = length #abs(self.str2deltatime(str(self.ui.xRangeCB.currentText()))) xMin = xMax-rg tt.setAxisScale(Qwt5.QwtPlot.xBottom, xMin, xMax) if log: tt.setAxisScaleType(Qwt5.QwtPlot.yLeft, Qwt5.QwtScaleTransformation.Log10) #tt.setAxisScale(Qwt5.QwtPlot.yLeft,1e-11,1e-2) tt.setUseArchiving(False) tt.setModelInConfig(False) tt.disconnect(tt.axisWidget(tt.xBottom), Qt.SIGNAL("scaleDivChanged ()"),tt._scaleChangeWarning) #tt.setCurvesYAxis([tcs],tt.yRight) #: Disabling loading of configuration from file; to avoid a faulty setup to continuously crash the application. setattr(tt, 'applyConfig', (lambda *k, **kw: None)) except: print 'Exception in set_pressure_trend(%s)' % tt print traceback.format_exc() print '*'*80
[docs]class VaccaProfilePlot(Draggable(TaurusPlot)): #def getModelClass(self): #return taurus.core.TaurusDevice def __init__(self, *args, **kwargs): TaurusPlot.__init__(self, *args, **kwargs) self._profile_loaded = False self._positions = [] self._labels = [] self.setModelInConfig(False)
[docs] def setModel(self, model): print '*'*80 self.info('VaccaProfilePlot.setModel(%s)' % model) print '*'*80 try: #if self._profile_loaded: return if fandango.isSequence(model) or 'attributename' in fandango.tango.parse_tango_model(model): self.info('setting an attribute model') TaurusPlot.setModel(self, model)# model = model[0]# str( # model).rsplit('/',1)[0] else: self.info('setting a composer model') assert fandango.check_device(model) dev = taurus.Device(model) if all(a in map(str.lower, dev.get_attribute_list()) for a in ('ccgaxxis', 'ccgpressures', 'ipaxxis', 'ippressures', 'thermoaxxis', 'thermocouples', 'axxispositions', 'axxislabels')): TaurusPlot.setModel(self, []) setup_profile_plot(self, model) else: self.warning('%s has not all required attributes' % model) if len(self._positions) and len(self._labels): self.info('Setting CustomXLabels ...') self.setAxisCustomLabels(Qwt5.QwtPlot.xBottom, zip(self._positions, self._labels), 60) except Exception, e: self.warning('VaccaProfilePlot.setModel(%s) failed!: %s' % (model, e))
[docs]def setup_profile_plot(tp, composer, picker=True): #,gauges,pumps,tcs,positions=None,labels=None): """ This method configures a tau plot to show pressure/temperature profiles """ tp.info('setup_profile_plot(%s,%s)' % (composer, picker)) #tp.setModel('BO/VC/Composer/IPAxxis|BO/VC/Composer/IPPressures') #tp.setModel('BO/VC/Composer/CCGAxxis|BO/VC/Composer/CCGPressures,BO/VC/Composer/IPPressures') #positions = [0,17,31,33,49,63,80,94,111,114,116,122] #labels=['Q1A','Q1B','Q1LTB','Q2A','Q2B','Q3A','Q3B','Q4A','Q4B','Q4BT','Q4C','Q4D+RF'] gauges = '%s/CCGAxxis|%s/CCGPressures' % (composer, composer) pumps = '%s/IPAxxis|%s/IPPressures' % (composer, composer) tcs = '%s/ThermoAxxis|%s/Thermocouples' % (composer, composer) positions = '%s/AxxisPositions' % composer labels = '%s/AxxisLabels' % composer pumps, gauges, tcs = [str(s).lower() for s in (pumps, gauges, tcs)] tp.setModelInConfig(False) tp.setModifiableByUser(False) tp.setModel(pumps) tp.addModels([tcs]) tcs = tcs.split('|')[-1] tp.setCurvesYAxis([tcs], tp.yRight) tp.addModels([gauges]) gauges = gauges.split('|')[-1] Curves = {} for cname in (pumps, gauges, tcs): tp.info('set curve %s' % cname) try: Curves[cname] = tp.curves[cname] except: Curves[cname] = tp.curves[cname.split('|')[-1]] if positions and labels: tp.info('Setting CustomXLabels ...') try: tp._positions = list(int(i) for i in taurus.Attribute(positions).read().value) tp._labels = list(str(i) for i in taurus.Attribute(labels).read().value) tp.setAxisCustomLabels(Qwt5.QwtPlot.xBottom,zip(tp._positions, tp._labels), 60) except: print 'Unable to read pressure profile labels' tp._positions = [] tp._labels = [] #print traceback.format_exc() for cname, color, width in [(gauges, Qt.Qt.blue, 3), (pumps, Qt.Qt.magenta, 1), (tcs, Qt.Qt.red, 2)]: cap = Curves[cname].getAppearanceProperties() cap.lColor = color cap.lWidth = width tp.setCurveAppearanceProperties({Curves[cname].getCurveName(): cap}) Curves[gauges].setSymbol(Qwt5.QwtSymbol(Qwt5.QwtSymbol.Diamond, Qt.QBrush(Qt.Qt.yellow), Qt.QPen(Qt.Qt.green), Qt.QSize(7, 7) ) ) #f1 = Qwt5.QwtSplineCurveFitter() #f1.setSplineSize(2000) #Curves[pumps].setCurveFitter(f1) #Curves[pumps].setCurveAttribute(Curves[pumps].Fitted,True) tp.enableAxis(tp.yRight) tp.setAxisScaleType(Qwt5.QwtPlot.yLeft, Qwt5.QwtScaleTransformation.Log10) #tp.setAxisScale(Qwt5.QwtPlot.yLeft,1e-11,1e-4) tp.setCanvasBackground(Qt.Qt.white) tp.toggleDataInspectorMode(True) tp.showMaxPeaks(True) Curves[pumps].showMaxPeak(False) tp.setAllowZoomers(False) tp.toggleDataInspectorMode(True) if picker: tp.info('setting Picker ...') tp.disconnect(tp._pointPicker, Qt.SIGNAL('selected(QwtPolygon)'), tp.pickDataPoint) tp.connect( tp._pointPicker, Qt.SIGNAL('selected(QwtPolygon)'), lambda pos, s = tp: pickPlotPoint(s, pos, xlabels=(zip( tp._positions, tp._labels) if tp._positions and tp._labels else None)) ) tp._profile_loaded = True return tp
[docs]def pickPlotPoint(self, pos, scope=20, showMarker=True, targetCurveNames=None, xlabels=None): '''Finds the pyxel-wise closest data point to the given position. The valid search space is constrained by the scope and targetCurveNames parameters. :param pos: (Qt.QPoint or Qt.QPolygon) the position around which to look for a data point. The position should be passed as a Qt.QPoint (if a Qt.QPolygon is given, the first point of the polygon is used). The position is expected in pixel units, with (0,0) being the top-left corner of the plot canvas. :param scope: (int) defines the area around the given position to be considered when searching for data points. A data point is considered within scope if its manhattan distance to position (in pixels) is less than the value of the scope parameter. (default=20) :param showMarker: (bool) If True, a marker will be put on the picked data point. (default=True) :param targetCurveNames: (sequence<str>) the names of the curves to be searched. If None passed, all curves will be searched :return: (tuple<Qt.QPointF,str,int> or tuple<None,None,None>) if a point was picked within the scope, it returns a tuple containing the picked point (as a Qt.QPointF), the curve name and the index of the picked point in the curve data. If no point was found within the scope, it returns None,None,None ''' print "pickPlotPoint(...)" if isinstance(pos, Qt.QPolygon): pos = pos.first() scopeRect = Qt.QRect(0, 0, scope, scope) scopeRect.moveCenter(pos) mindist = scope picked = None pickedCurveName = None pickedIndex = None self.curves_lock.acquire() try: if targetCurveNames is None: targetCurveNames = self.curves.keys() #print '%d targetCurveNames'%len(targetCurveNames) for name in targetCurveNames: curve = self.curves.get(name, None) if curve is None: #print("Curve '%s' not found"%name) continue if not curve.isVisible(): #print("Curve '%s' not visible"%name) continue data=curve.data() #print("Curve '%s' has %d points"%(name,data.size())) for i in xrange(data.size()): point = Qt.QPoint(self.transform(curve.xAxis(), data.x(i)), self.transform(curve.yAxis(), data.y(i)) ) if scopeRect.contains(point): #print( 'Comparing %s,%s vs %s'%(pos,scopeRect,point)) dist = (pos-point).manhattanLength() if dist < mindist: mindist = dist picked = Qt.QPointF(data.x(i), data.y(i)) pickedCurveName = name pickedIndex = i pickedAxes = curve.xAxis(), curve.yAxis() found = True #if picked: print("Curve '%s' contains %s"%(name,pos)) finally: self.curves_lock.release() if picked is None: print 'pickPlotPoint(%s)> No matching point found for any curve' % pos return xlabels = xlabels or [] print("pickPlotPoint(x=%s,y=%s,xlabels=[%s])" % (picked.x(), picked.y(), len(xlabels) )) if showMarker and picked is not None: print 'showing pickedMarker' self._pickedMarker.detach() self._pickedMarker.setValue(picked) self._pickedMarker.setAxis(*pickedAxes) self._pickedMarker.attach(self) self._pickedCurveName = pickedCurveName self._pickedMarker.pickedIndex = pickedIndex self.replot() label = self._pickedMarker.label() try: xi, xl = ([(x[0], x[1]) for x in (xlabels or []) if x[ 0] <= picked.x()] or [[-1, None]])[-1] c = self.curves[pickedCurveName] #xc = [self.transform(c.xAxis(),x) for x in c.data().xData()] xc = [x for x in c.data().xData()] xx = len([x for x in xc if x >= xi and x <= picked.x()]) #print [x[0] for x in xlabels] #print 'picked point: %s'%picked.x() #print 'last label %s at %s'%(xl,xi) #print 'data point are %s'%(xc) #print 'data points between pick and label: %s'%xx index = xx tag = '%s-%02d' % (xl, index) if xl and '/ip' in pickedCurveName.lower(): try: tag = PyTango.AttributeProxy(tag.replace('-', '/VC/IP-') +'/Controller').read().value except: import traceback print traceback.format_exc() except: import traceback print traceback.format_exc() index, xl = (-1, None) tag = '' if xl: infotxt = "'%s'[%i]:\n\t (%s,%.2e)" % (pickedCurveName, pickedIndex, tag, picked.y()) elif self.getXIsTime(): infotxt = "'%s'[%i]:\n\t (x=%s,%.3g)"%(pickedCurveName, pickedIndex, datetime.fromtimestamp( picked.x()).ctime(), picked.y() ) else: infotxt = "'%s'[%i]:\n\t (x=%.3g, y=%.3g)" % (pickedCurveName, pickedIndex, picked.x(), picked.y()) label.setText(infotxt) print '\t%s' % infotxt fits = label.textSize().width() < self.size().width() if fits: label.setText(infotxt) self._pickedMarker.setLabel(Qwt5.QwtText (label)) self._pickedMarker.alignLabel() self.replot() else: popup = Qt.QWidget(self, Qt.Qt.Popup) popup.setLayout(Qt.QVBoxLayout()) popup.layout().addWidget(Qt.QLabel(infotxt)) #@todo: make the widget background semitransparent green! popup.setWindowOpacity(self._pickedMarker.labelOpacity) popup.show() popup.move(self.pos().x()-popup.size().width(), self.pos().y()) popup.move(self.pos()) Qt.QTimer.singleShot(5000, popup.hide) return picked,pickedCurveName,pickedIndex
import doc __doc__ = doc.get_autodoc(__name__,vars()) if __name__ == '__main__': import taurus.qt.qtgui.application app = taurus.qt.qtgui.application.TaurusApplication() cmps = fandango.get_matching_devices('*/vc/all') tp = VaccaProfilePlot() tp.setWindowTitle(cmps[0]) tp.setModel(cmps[0]) tp.show() app.exec_()