Source code for pymu.pmuDataFrame

import math
import struct
import codecs
from datetime import datetime
from .pmuLib import *
from .pmuFrame import PMUFrame
from .pmuEnum import *

[docs]class DataFrame(PMUFrame): """ Class for creating a Data Frame based on C37.118-2011 :param frameInHexStr: Data frame bytes as hex str :type frameInHexStr: str :param theConfigFrame: Config frame describing the data frame :type theConfigFrame: ConfigFrame :param debug: Print debug statements :type debug: bool """ def __init__(self, frameInHexStr, theConfigFrame, debug=False): self.stat = None self.pmus = None self.freq = None self.dfreq = None self.analog = None self.digital = None self.configFrame = theConfigFrame self.dbg = debug super().__init__(frameInHexStr, self.dbg) super().finishParsing() self.parsePmus() self.updateSOC()
[docs] def parsePmus(self): """Parses each PMU present in the data frame""" print("***** PHASORS *****") if self.dbg else None nextPmuStartingPos = 28 self.pmus = [None]*self.configFrame.num_pmu for i in range(0, len(self.pmus)): print("*** ", self.configFrame.stations[i].stn.strip(), " ***", sep="") if self.dbg else None self.pmus[i] = PMU(self.frame[nextPmuStartingPos:], self.configFrame.stations[i]) print("Len =", self.pmus[i].length) if self.dbg else None print(self.frame[nextPmuStartingPos:(nextPmuStartingPos+self.pmus[i].length)]) if self.dbg else None nextPmuStartingPos = nextPmuStartingPos + self.pmus[i].length
[docs] def updateSOC(self): self.soc.ff = self.fracsec / self.configFrame.time_base.baseDecStr self.soc.formatted = "{:0>4}/{:0>2}/{:0>2} {:0>2}:{:0>2}:{:0>2}{}".format(self.soc.yyyy, self.soc.mm, self.soc.dd, self.soc.hh, self.soc.mi, self.soc.ss, "{:f}".format(self.soc.ff).lstrip('0')) dt = datetime(self.soc.yyyy, self.soc.mm, self.soc.dd, self.soc.hh, self.soc.mi, self.soc.ss, int(self.soc.ff * 10 ** 6)) self.soc.utcSec = (dt - datetime(1970, 1, 1)).total_seconds()
[docs]class PMU: """Class for a PMU in a data frame :param pmuHexStr: Bytes of PMU fields in hex str format :type pmuHexStr: str :param theStationFrame: Station fields from config frame describing PMU data :type theStationFrame: Station :param debug: Print debug statements :type debug: bool """ def __init__(self, pmuHexStr, theStationFrame, debug=False): self.stat = None self.phasors = None self.freq = None self.dfreq = None self.analogs = None self.digitals = None self.length = 0 self.dbg = debug self.stationFrame = theStationFrame self.numOfPhsrs = self.stationFrame.phnmr self.fmtOfPhsrs = self.stationFrame.phsrFmt self.typeOfPhsrs = self.stationFrame.phsrType self.numOfAnlg = self.stationFrame.annmr self.numOfDgtl = self.stationFrame.dgnmr print("DIG:", self.numOfDgtl) if self.dbg else None self.pmuHex = pmuHexStr print(pmuHexStr) if self.dbg else None self.parseStat() self.parsePhasors() self.parseFreq() self.parseDfreq() self.parseAnalog() self.parseDigital()
[docs] def updateLength(self, sizeToAdd): """Keeps track of length for PMU frame only""" self.length = self.length + sizeToAdd
[docs] def parseStat(self): """Parse bit mapped flags field""" l = 4 print("STAT:", self.pmuHex[self.length:self.length+l]) if self.dbg else None self.stat = Stat(self.pmuHex[self.length:self.length+l]) self.updateLength(l)
[docs] def parsePhasors(self): """Parse phasor estimates from PMU""" print("Phasors") if self.dbg else None self.phasors = [None]*self.numOfPhsrs print("NumOfPhsrs:", self.numOfPhsrs) if self.dbg else None for i in range(0, self.numOfPhsrs): self.phasors[i] = Phasor(self.pmuHex[self.length:], self.stationFrame, self.stationFrame.channels[i]) print("PHASOR:", self.pmuHex[self.length:self.length+self.phasors[i].length]) if self.dbg else None self.updateLength(self.phasors[i].length)
[docs] def parseFreq(self): """Parse frequency""" l = 4 if self.stationFrame.freqType == "INTEGER" else 8 print("FREQ:", self.pmuHex[self.length:self.length+l]) if self.dbg else None unpackStr = '!h' if l == 4 else '!f' self.freq = struct.unpack(unpackStr, bytes.fromhex(self.pmuHex[self.length:self.length+l]))[0] self.updateLength(l) print("FREQ:", self.freq) if self.dbg else None
[docs] def parseDfreq(self): """Parse rate of change of frequency (ROCOF)""" l = 4 if self.stationFrame.freqType == "INTEGER" else 8 print("DFREQ: ", self.pmuHex[self.length:self.length+l]) if self.dbg else None unpackStr = '!h' if l == 4 else '!f' self.dfreq = (struct.unpack(unpackStr, bytes.fromhex(self.pmuHex[self.length:self.length+l]))[0]) / 100 self.updateLength(l) print("DFREQ:", self.dfreq) if self.dbg else None
[docs] def parseAnalog(self): """Parse analog data""" self.analogs = [None]*self.numOfAnlg l = 4 if self.stationFrame.anlgType == "INTEGER" else 8 unpackStr = "!h" if l == 4 else "!f" for i in range(0, self.numOfAnlg): name = self.stationFrame.channels[self.numOfPhsrs+i].strip() print("ANALOG:", self.pmuHex[self.length:self.length+l]) if self.dbg else None val = struct.unpack(unpackStr, bytes.fromhex(self.pmuHex[self.length:self.length+l]))[0] print (name, "=", val) if self.dbg else None self.analogs[i] = (name, val) self.updateLength(l)
[docs] def parseDigital(self): """Parse digital data""" self.digitals = [None]*self.numOfDgtl l = 4 totValBin = hexToBin(self.pmuHex[self.length:self.length+l], 16) for i in range(0, self.numOfDgtl): name = self.stationFrame.channels[self.numOfPhsrs+self.numOfAnlg+i].strip() print("DIGITAL:", self.pmuHex[self.length:self.length+l]) if self.dbg else None val = totValBin[i] print (name, "=", val) if self.dbg else None self.digitals[i] = (name, val) self.updateLength(l)
[docs]class Phasor: """Class for holding phasor information :param thePhsrValHex: Phasor values in hex str format :type thePhsrValHex: str :param theStationFrame: Station frame which describe data format :type theStationFrame: Station :param theName: Name of phasor channel :type theName: str :param debug: Print debug statements :type debug: bool """ def __init__(self, thePhsrValHex, theStationFrame, theName, debug=False): self.phsrFmt = None self.phsrType = None self.real = None self.imag = None self.mag = None self.deg = None self.rad = None self.name = None self.length = 0 self.dbg = debug self.phsrValHex = thePhsrValHex self.stationFrame = theStationFrame self.voltORCurr = self.stationFrame.phunits self.name = theName print("*", theName.strip(), "*") if self.dbg else None self.parseFmt() self.parseVal()
[docs] def parseFmt(self): """Parse format and type of phasor""" self.phsrFmt = self.stationFrame.phsrFmt self.phsrType = self.stationFrame.phsrType self.length = 8 if self.phsrType == "INTEGER" else 16
[docs] def parseVal(self): """Parse phasor value""" if self.phsrFmt == "RECT": self.toRect(self.phsrValHex[:self.length]) else: self.toPolar(self.phsrValHex[:self.length])
[docs] def toRect(self, hexVal): """Convert bytes to rectangular values""" hex1 = hexVal[:int(self.length/2)] hex2 = hexVal[int(self.length/2):] unpackStr = "!h" if self.phsrType == "INTEGER" else "!f" self.real = struct.unpack(unpackStr, bytes.fromhex(hex1))[0] self.imag = struct.unpack(unpackStr, bytes.fromhex(hex2))[0] self.mag = math.hypot(self.real, self.imag) self.rad = math.atan2(self.imag, self.real) self.deg = math.degrees(self.rad) print("Real:", hex1, "=", self.real) if self.dbg else None print("Imag:", hex2, "=", self.imag) if self.dbg else None print("Mag:", "=", self.mag) if self.dbg else None print("Rad:", "=", self.rad) if self.dbg else None print("Deg:", "=", self.deg) if self.dbg else None
[docs] def toPolar(self, hexVal): """Convert bytes to polar values""" hex1 = hexVal[:int(self.length/2)] hex2 = hexVal[int(self.length/2):] unpackStr = "!h" if self.phsrType == "INTEGER" else "!f" self.mag = struct.unpack(unpackStr, bytes.fromhex(hex1))[0] self.rad = struct.unpack(unpackStr, bytes.fromhex(hex2))[0] if unpackStr == '!h': self.rad = self.rad / 10000 self.deg = math.degrees(self.rad) self.real = self.mag * math.cos(self.deg) self.imag = self.mag * math.sin(self.deg) print("Real:", hex1, "=", self.real) if self.dbg else None print("Imag:", hex2, "=", self.imag) if self.dbg else None print("Mag:", "=", self.mag) if self.dbg else None print("Rad:", "=", self.rad) if self.dbg else None print("Deg:", "=", self.deg) if self.dbg else None
[docs]class Stat: """Class for foling bit mapped flags :param statHexStr: Stat field in hex string format :type statHexStr: str :param debug: Print debug statements :type debug: bool """ def __init__(self, statHexStr, debug=False): self.dataError = None self.pmuSync = None self.sorting = None self.pmuTrigger = None self.configChange = None self.dataModified = None self.timeQuality = None self.unlockedTime = None self.triggerReason = None self.dbg = debug self.statHex = statHexStr print(statHexStr) if self.dbg else None self.parseDataError() self.parsePmuSync() self.parseSorting() self.parsePmuTrigger() self.parseConfigChange() self.parseDataModified() self.parseTimeQuality() self.parseUnlockTime() self.parseTriggerReason()
[docs] def parseDataError(self): """Parse data error bits""" self.dataError = DataError(int(hexToBin(self.statHex[0], 4)[:2], 2)).name print("STAT: ", self.dataError) if self.dbg else None
[docs] def parsePmuSync(self): """Parse PMU sync bit""" self.pmuSync = PmuSync(int(hexToBin(self.statHex[0], 4)[2], 2)).name print("PMUSYNC: ", self.pmuSync) if self.dbg else None
[docs] def parseSorting(self): """Parse data sorting bit""" self.sorting = Sorting(int(hexToBin(self.statHex[0], 4)[3], 2)).name print("SORTING: ", self.sorting) if self.dbg else None
[docs] def parsePmuTrigger(self): """Parse PMU trigger bit""" self.pmuTrigger = Trigger(int(hexToBin(self.statHex[1], 4)[0], 2)).name print("PMUTrigger: ", self.pmuTrigger) if self.dbg else None
[docs] def parseConfigChange(self): """Parse config change bit""" self.configChange = ConfigChange(int(hexToBin(self.statHex[1], 4)[1], 2)).name print("ConfigChange: ", self.configChange) if self.dbg else None
[docs] def parseDataModified(self): """Parse data modified bit""" self.dataModified = DataModified(int(hexToBin(self.statHex[1], 4)[2], 2)).name print("DataModified: ", self.dataModified) if self.dbg else None
[docs] def parseTimeQuality(self): """Parse time quality bits""" self.unlockedTime = TimeQuality(int(hexToBin(self.statHex[1:3], 8)[3:6], 2)).name print("TimeQuality: ", self.unlockedTime) if self.dbg else None
[docs] def parseUnlockTime(self): """Parse unlocked time bits""" self.unlockedTime = UnlockedTime(int(hexToBin(self.statHex[2], 4)[2:], 2)).name print("UnlockTime: ", self.unlockedTime) if self.dbg else None
[docs] def parseTriggerReason(self): """Parse trigger reason bits""" self.triggerReason = TriggerReason(int(hexToBin(self.statHex[3], 4), 2)).name print("TriggerReason: ", self.triggerReason) if self.dbg else None