# -*- coding: utf-8 -*-
from __future__ import print_function
from __future__ import absolute_import
from builtins import filter
from builtins import range
from past.builtins import basestring
from builtins import object
import socket
import threading
from errno import ECONNREFUSED
from functools import partial
from multiprocessing import Pool
from time import time
import numpy as np
from .root import TimeOutException, TransferExeption
from .config import FLAG_DEBUG, serializer
host ='localhost'
port = 50007
addr = (host,port)
NUM_CORES = 4
[docs]def ping(host, port):
"""
Ping to.
:param host: IP address
:param port: port address
:return:
"""
try:
socket.socket().connect((host, port))
return port
except socket.error as err:
if err.errno == ECONNREFUSED:
return False
raise
[docs]def scan_ports(host):
"""
Scan opened ports in address.
:param host: host IP to filter opened ports.
:return: generator
"""
# http://codereview.stackexchange.com/questions/38452/python-port-scanner
p = Pool(NUM_CORES)
ping_host = partial(ping, host)
return list(filter(bool, p.map(ping_host, list(range(1, 65536)))))
[docs]class Conection(object):
"""
represent a connection to interchange objects between servers and clients.
"""
def __init__(self, conn):
self.conn = conn
self.len = 0
[docs] def sendLen(self,length, timeout = None):
dest = self.conn
ans = "False"
t1 = time()
while ans != "True": # get length of recipient for length
if ans=="False":
txt = "({},)".format(length)
dest.send(txt)
if FLAG_DEBUG: print("size",txt,"sent")
ans = dest.recvfrom(5)[0] # get True or False
if FLAG_DEBUG: print("received",ans)
if timeout is not None and time()-t1 > timeout:
raise TimeOutException("Timeout sending length")
[docs] def getLen(self, timeout = None):
source = self.conn
size = None
t1 = time()
while not size: # sent length of recipient for length
try:
if FLAG_DEBUG: print("waiting size...")
size = eval(source.recvfrom(1024)[0])
if FLAG_DEBUG: print("received size", size)
except Exception as e:
print(e)
if isinstance(size,tuple):
source.send("True")
else:
source.send("False")
if timeout is not None and time()-t1 > timeout:
raise TimeOutException("Timeout of {} receiving length".format(timeout))
self.len = size[0]
return size[0]
[docs] def recvall(self):
buf = []
l = self.len
while l:
newbuf = self.conn.recv(l)
if not newbuf: break
buf.append(newbuf)
l = self.len = l - len(newbuf)
return "".join(buf)
[docs] def send(self, obj):
pass
[docs]def initServer(addr):
"""
Inits a simple server from address.
:param addr: (host, port)
:return: socket
"""
# server
# Symbolic name meaning all available interfaces
# Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(addr)# (host, port) host ='', port = 50007
s.listen(1)
return s # conn, addr = s.accept()
[docs]def initClient(addr, timeout = None):
"""
Inits a simple client from address.
:param addr: (host, port)
:return: socket
"""
# client
# The remote host
# The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
t1 = time()
while True:
try:
s.connect(addr)# (host, port) host ='localhost', port = 50007
return s
except socket.error as e:
if timeout is None:
raise e
elif e.errno != ECONNREFUSED:
raise e
if time()-t1 > timeout:
raise TimeOutException("Timeout connecting to server in {}".format(addr))
[docs]def send_from(viewable, socket):
"""
Send from viewable object.
:param viewable: viewable object
:param socket: destine socket
:return: None
"""
view = memoryview(viewable)
while len(view):
nsent = socket.send(view)
view = view[nsent:]
[docs]def recv_into(viewable, socket):
"""
Receive from socket into viewable object.
:param viewable: viewable object
:param socket: source socket
:return: None
"""
view = memoryview(viewable)
while len(view):
nrecv = socket.recv_into(view)
view = view[nrecv:]
[docs]def generateServer(host = host, to = 63342):
"""
generates a simple Server in available address.
:param to: until port.
:return: socket, address
"""
s = None
while True:
port = int(np.random.rand()*to)
addr = (host,port)
try:
s = initServer(addr)
return s,addr
except:
try:
s.close()
except:
pass
[docs]def sendPickle(obj,addr = addr, timeout = None, threaded = False):
"""
Send potentially any data using sockets.
:param obj: packable object.
:param addr: socket or address.
:param timeout: NotImplemented
:return: True if sent successfully, else Throw error.
"""
notToClose = isinstance(addr,socket.socket)
if notToClose:
s = addr
if FLAG_DEBUG: print("address is a connection")
else:
if FLAG_DEBUG: print("initializing Server at {}".format(addr))
s = initServer(addr)
def helper():
try:
s.settimeout(timeout)
if FLAG_DEBUG: print("waiting for connection...")
conn, addr1 = s.accept()
s.settimeout(None)
if FLAG_DEBUG: print("connection accepted..")
tosend = serializer.dumps(obj)
if FLAG_DEBUG: print("waiting to confirm sending len")
Conection(conn).sendLen(len(tosend),timeout=timeout)
if FLAG_DEBUG: print("sending data")
conn.send(tosend)
return True
except Exception as e:
raise e
finally:
if not notToClose: # do not close if it was not opened in function.
try:
s.close() # tries to close socket
except:
pass
if threaded:
t = threading.Thread(target=helper)
t.daemon = True
t.start()
else:
return helper()
[docs]def rcvPickle(addr=addr, timeout = None):
"""
Receive potentially any data using sockets.
:param addr: socket or address.
:param timeout: NotImplemented
:return: data, else throws error.
"""
notToClose = isinstance(addr,socket.socket)
if notToClose:
s = addr
else:
if FLAG_DEBUG: print("initializing client at {}".format(addr))
s = initClient(addr, timeout)
try:
#s.settimeout(timeout)
if FLAG_DEBUG: print("creating connection...")
c = Conection(s)
length = c.getLen(timeout=timeout)
if FLAG_DEBUG: print("loading data...")
rcvdata = c.recvall()
if len(rcvdata) != length:
raise TransferExeption("Data was transferred incomplete. Expected {} and got {} bytes".format(length, len(rcvdata)))
if FLAG_DEBUG: print("received data of len {}".format(len(rcvdata)))
data = serializer.loads(rcvdata)
s.close()
return data
except Exception as e:
raise e
finally:
if not notToClose: # do not close if it was not opened in function.
try:
s.close() # tries to close socket
except:
pass
[docs]def string_is_socket_address(string):
try:
host,addr = string.split(":")
int(addr)
return True
except:
return False
[docs]def parseString(string, timeout=3):
"""
:param string:
:param timeout:
:return:
"""
if isinstance(string,basestring):
host,addr = string.split(":")
return rcvPickle((host,int(addr)),timeout=timeout)
else:
return [parseString(i, timeout=timeout) for i in string]
if __name__ == "__main__":
initClient(addr,3)
#print generateServer(to = 63342)