Source code for pymu.pmuFrame

from datetime import datetime
from .pmuEnum import *
from .pmuLib import *

[docs]class PMUFrame: """ Super class for all C37.118-2005/C37.118-2011 frames :param frameInHexStr: Bytes from frame (any time) in hex str format :type strameInHexStr: str :param debug: Print debug statements :type debug: bool """ def __init__(self, frameInHexStr, debug=False): """ PMUFrame constructor :param frameInHexStr: Hex representation of frame bytes :type frameInHexStr: str :param debug: Print debug statements or not :type debug: bool """ self.length = 0 self.dbg = debug self.frame = frameInHexStr.upper() self.parseSYNC() self.parseFRAMESIZE()
[docs] def finishParsing(self): """ When getting the config frame, the size is unknown. After creating a PMUFrame with the first 4 bytes, the remaining frame bytes are read and added to self.frame. Once that is populated the remaining fields can be parsed """ self.parseIDCODE() self.parseSOC() self.parseFRACSEC() self.parseCHK()
[docs] def parseSYNC(self): """Parse frame synchronization word""" self.sync = SYNC(self.frame[:4]) self.updateLength(4)
[docs] def parseFRAMESIZE(self): """Parse frame size""" framesizeSize = 4 self.framesize = int(self.frame[self.length:self.length+framesizeSize], 16) self.updateLength(framesizeSize) print("FRAMESIZE: ", self.framesize) if self.dbg else None
[docs] def parseIDCODE(self): """Parse data stream ID number""" idcodeSize = 4 self.idcode = int(self.frame[self.length:self.length+idcodeSize], 16) self.updateLength(idcodeSize) print("IDCODE: ", self.idcode) if self.dbg else None
[docs] def parseSOC(self): """Parse second-of-century timestamp""" socSize = 8 self.soc = SOC(self.frame[self.length:self.length+socSize]) self.updateLength(socSize)
[docs] def parseFRACSEC(self): """Parse fraction of second and time quality word""" fracsecSize = 8 self.fracsec = int(self.frame[self.length:self.length+fracsecSize], 16) self.updateLength(fracsecSize) print("FRACSEC: ", self.fracsec) if self.dbg else None
[docs] def parseCHK(self): """Parse CRC-CCITT word""" chkSize = 4 self.chk = self.frame[-chkSize:] print("CHK: ", self.chk) if self.dbg else None
[docs] def updateLength(self, sizeToAdd): """Keeps track of index for overall frame""" self.length = self.length + sizeToAdd
[docs]class SYNC: """Class for describing the frame synchronization word :param syncHexStr: Sync byte array in hex str format :type syncHexStr: str :param debug: Print debug statements :type debug: bool """ def __init__(self, syncHexStr, debug=False): self.dbg = debug self.syncHex = syncHexStr self.parseType() self.parseVers()
[docs] def parseType(self): """Parse frame type""" typeBinStr = hexToBin(self.syncHex[2], 3) typeNum = int(typeBinStr, 2) self.frameType = FrameType(typeNum).name print("Type: ", self.frameType) if self.dbg else None
[docs] def parseVers(self): """Parse frame version""" versBinStr = hexToBin(self.syncHex[3], 4) self.frameVers = int(versBinStr, 2) print("Vers: ", self.frameVers) if self.dbg else None
[docs]class SOC: """Class for second-of-century (SOC) word (32 bit unsigned) :param socHexStr: Second-of-century byte array in hex str format :type socHexStr: str :param debug: Print debug statements :type debug: bool """ def __init__(self, socHexStr, debug=False): self.dbg = debug self.socHex = socHexStr self.secCount = int(socHexStr, 16) self.parseSecCount() print("SOC: ", self.secCount, " - ", self.formatted) if self.dbg else None
[docs] def parseSecCount(self): """Parse SOC into UTC timestamp and pretty formatted timestamp""" parsedDate = datetime.fromtimestamp(self.secCount) self.yyyy = parsedDate.year self.mm = parsedDate.month self.dd = parsedDate.day self.hh = parsedDate.hour self.mi = parsedDate.minute self.ss = parsedDate.second self.ff = 0 self.formatted = "{:0>4}/{:0>2}/{:0>2} {:0>2}:{:0>2}:{:0>2}".format(self.yyyy, self.mm, self.dd, self.hh, self.mi, self.ss) dt = datetime(self.yyyy, self.mm, self.dd, self.hh, self.mi, self.ss) self.utcSec = (dt - datetime(1970, 1, 1)).total_seconds()