"""
GObj :class:`OnServer` and :class:`OnClient`
============================================
Utility for check a server/client :term:`gaplic`'s
running as thread or subprocesses and stress the connections.
It uses :class:`ginsfsm.c_srv_sock.GServerSock`
and :class:`ginsfsm.c_sock.GSock`.
Run two gaplics. One is the server, the other the client.
Stress with many connections.
Configuration:
* The server can run as thread o subprocess.
* Limit of client connections to be reached.
.. autoclass:: OnServer
:members:
.. autoclass:: OnClient
:members:
"""
import time
import logging
logging.basicConfig(level=logging.INFO)
from ginsfsm.gobj import GObj
from ginsfsm.gaplic import GAplic
from ginsfsm.c_timer import GTimer
from ginsfsm.c_connex import GConnex
from ginsfsm.c_srv_sock import GServerSock
#===============================================================
# Server
#===============================================================
def ac_clisrv_timeout(self, event):
self.set_timeout(5)
pass
def ac_clisrv_connected(self, event):
if len(self.dl_childs) == 3:
self.start_time = time.clock()
print('connected FROM %s' % str(event.peername))
print("Server's clients: %d" % len(self.dl_childs))
if len(self.dl_childs) == self.config.connections + 2:
end_time = time.clock()
print("Time for %d connections: %f seconds" % (
self.config.connections,
end_time - self.start_time))
def ac_clisrv_disconnected(self, event):
self.destroy_gobj(event.source[-1])
SERVER_FSM = {
'event_list': (
'EV_SET_TIMER: bottom output',
'EV_TIMEOUT: bottom input',
'EV_CONNECTED: bottom input',
'EV_DISCONNECTED: bottom input',
),
'state_list': ('ST_IDLE',),
'machine': {
'ST_IDLE':
(
('EV_TIMEOUT', ac_clisrv_timeout, None),
('EV_CONNECTED', ac_clisrv_connected, None),
('EV_DISCONNECTED', ac_clisrv_disconnected, None),
),
}
}
SERVER_GCONFIG = { # type, default_value, flag, validate_function, desc
'verbose': [int, 0, 0, None, "Increase output verbosity. Values [0,1,2]"],
'connections': [int, 0, 0, None, "Limit of connections to be reached."],
}
[docs]class OnServer(GObj):
""" Server GObj.
.. ginsfsm::
:fsm: SERVER_FSM
:gconfig: SERVER_GCONFIG
*Input-Events:*
* :attr:`'EV_TIMEOUT'`: Timer over.
Start the machine.
* :attr:`'EV_CONNECTED'`: New client.
* :attr:`'EV_DISCONNECTED'`: Client disconnected.
*Output-Events:*
* :attr:`'EV_START_TIMER'`: Start timer.
"""
def __init__(self):
GObj.__init__(self, SERVER_FSM, SERVER_GCONFIG)
self.start_time = 0
def start_up(self):
self.timer = self.create_gobj(
None,
GTimer,
self)
self.server = self.create_gobj(
None,
GServerSock,
self,
host='127.0.0.1',
port=8000,
# only want receive EV_CONNECTED/EV_DISCONNECTED event
rx_data_event_name=None,
transmit_ready_event_name=None,
)
self.set_timeout(5)
def set_timeout(self, seconds):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=seconds)
def clear_timeout(self):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=-1)
#===============================================================
# Client
#===============================================================
def ac_client_timeout(self, event):
if self.connex is None:
self.connex = list(range(self.config.connections))
for i in self.connex:
self.connex[i] = self.create_gobj(
'client-%02d' % i,
GConnex,
self,
destinations=[('127.0.0.1', 8000)],
# only want receive EV_CONNECTED/EV_DISCONNECTED event
rx_data_event_name=None,
transmit_ready_event_name=None,
)
self.set_timeout(10)
def ac_client_connected(self, event):
pass
def ac_client_disconnected(self, event):
pass
CLIENT_FSM = {
'event_list': (
'EV_SET_TIMER: bottom output',
'EV_TIMEOUT: bottom input',
'EV_CONNECTED: bottom input',
'EV_DISCONNECTED: bottom input',
),
'state_list': ('ST_IDLE',),
'machine': {
'ST_IDLE':
(
('EV_TIMEOUT', ac_client_timeout, None),
('EV_CONNECTED', ac_client_connected, None),
('EV_DISCONNECTED', ac_client_disconnected, None),
),
}
}
CLIENT_GCONFIG = { # type, default_value, flag, validate_function, desc
'verbose': [int, 0, 0, None, "Increase output verbosity. Values [0,1,2]"],
'connections': [int, 0, 0, None, "Limit of connections to be reached."],
}
[docs]class OnClient(GObj):
""" Client GObj.
.. ginsfsm::
:fsm: CLIENT_FSM
:gconfig: CLIENT_GCONFIG
*Input-Events:*
* :attr:`'EV_TIMEOUT'`: Timer over.
Start the machine.
* :attr:`'EV_CONNECTED'`: Client connected.
* :attr:`'EV_DISCONNECTED'`: Client disconnected.
*Output-Events:*
* :attr:`'EV_START_TIMER'`: Start timer.
"""
def __init__(self):
GObj.__init__(self, CLIENT_FSM, CLIENT_GCONFIG)
def start_up(self):
self.timer = self.create_gobj(
None,
GTimer,
self,
)
self.connex = None
self.set_timeout(1)
def set_timeout(self, seconds):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=seconds)
def clear_timeout(self):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=-1)
#===============================================================
# Main
#===============================================================
from ginsfsm.gaplic import setup_gaplic_thread, setup_gaplic_process
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"connections",
type=int,
nargs='?', default=400,
help="Limit of connections to be reached."
)
parser.add_argument(
"--run-as-process",
action="store_true",
help="Run the server as subprocess. By default it runs as thread."
)
parser.add_argument(
"-v",
"--verbose",
help="Increase output verbosity",
type=int,
choices=[0, 1, 2],
default=0,
)
args = parser.parse_args()
run_as_process = args.run_as_process
local_conf = {
'GObj.trace_mach': True if args.verbose else False,
'GObj.logger': logging,
}
if run_as_process:
# run server gaplic as child daemon process
ga_srv = GAplic(name='Server', roles='', **local_conf)
ga_srv.create_gobj(
'server',
OnServer,
ga_srv,
connections=args.connections,
verbose=args.verbose,
)
srv_worker = setup_gaplic_process(ga_srv)
srv_worker.start()
# run client gaplic as main process
ga_cli = GAplic(name='Client', roles='', **local_conf)
ga_cli.create_gobj(
'client',
OnClient,
ga_cli,
connections=args.connections,
verbose=args.verbose,
)
try:
ga_cli.start()
except (KeyboardInterrupt, SystemExit):
ga_srv.stop()
srv_worker.join()
print('Program stopped')
else:
# run server gaplic as thread
ga_srv = GAplic(name='Server', roles='', **local_conf)
ga_srv.create_gobj(
'server',
OnServer,
ga_srv,
connections=args.connections,
verbose=args.verbose,
)
srv_worker = setup_gaplic_thread(ga_srv)
srv_worker.start()
# run client gaplic as main process
ga_cli = GAplic(name='Client', roles='', **local_conf)
ga_cli.create_gobj(
'client',
OnClient,
ga_cli,
connections=args.connections,
verbose=args.verbose,
)
try:
ga_cli.start()
except (KeyboardInterrupt, SystemExit):
ga_srv.stop()
srv_worker.join()
print('Program stopped')