Source code for pymu.client

import socket
import sys
import os
import time

[docs]class Client: """ Client class that creates a client and provides simple functions for connecting to PMUs or PDCs without needing to directly use Python's socket library. Supports INET and UNIX sockets :param theDestIp: IP address to connect to. If using unix socket this is the file name to connect to :type theDestIp: str :param theDestPort: Port to connect to :type theDestPort: int :param proto: Protocol to use. Accepts TCP or UDP :type proto: str :param sockType: Type of socket to create. INET or UNIX :type sockType: str """ def __init__(self, theDestIp, theDestPort, proto="TCP", sockType="INET"): self.srcIp = None self.srcPort = None self.destAddr = None self.srcAddr = None self.theSocket = None self.theConnection = None self.useUdp = False self.unixSock = False self.destIp = theDestIp self.destPort = theDestPort self.destAddr = (theDestIp, theDestPort) if proto.upper() == "UDP": self.useUdp = True if sockType.upper() == "UNIX": self.unixSock = True self.createSocket() self.connectToDest()
[docs] def createSocket(self): """Create socket based on constructor arguments""" if self.useUdp: if self.unixSock: self.theSocket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) else: self.theSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) else: if self.unixSock: self.theSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) else: self.theSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[docs] def connectToDest(self): """Connect socket to destination IP:Port. If UNIX socket then use destIP""" if not self.useUdp: if self.unixSock: self.theSocket.connect(self.destIp) else: self.theSocket.connect(self.destAddr)
[docs] def readSample(self, bytesToRead): """ Read a sample from the socket :param bytesToRead: Number of bytes to read from socket :type bytesToRead: int :return: Byte array of data read from socket """ try: if self.useUdp: return self.theSocket.recvfrom(bytesToRead) else: return self.theSocket.recv(bytesToRead) except (socket.timeout): print("Socket Timeout") return ""
[docs] def sendData(self, bytesToSend): """Send bytes to destination :param bytesToSend: Number of bytes to send :type bytesToSend: int """ if self.useUdp: if self.unixSock: self.theSocket.sendto(bytesToSend, self.destIp) else: self.theSocket.sendto(bytesToSend, self.destAddr) else: self.theSocket.send(bytesToSend)
[docs] def stop(self): """Close the socket connection""" self.theSocket.close()
[docs] def setTimeout(self, numOfSecs): """Set socket timeout :param numOfSecs: Time to wait for socket action to complete before throwing timeout exception :type numOfSecs: int """ self.theSocket.settimeout(numOfSecs)
def __class__(self): return "client"