"""
GObj :class:`OnGConnex`
=======================
Utility for check :class:`ginsfsm.c_connex.GConnex`.
Send a query to two url's alternatively.
How? send the query each 10 seconds, but set a timeout_inactivity of 5.
In each disconnection the next connection will switch between the
two destinations.
.. autoclass:: OnGConnex
:members:
"""
import logging
logging.basicConfig(level=logging.DEBUG)
from ginsfsm.gobj import GObj
from ginsfsm.gaplic import GAplic
from ginsfsm.c_timer import GTimer
from ginsfsm.c_connex import GConnex
from ginsfsm.utils import hexdump
QUERY = b"GET / HTTP/1.1\r\n" + \
b"Host: www\r\n" + \
b"\r\n"
def ac_rx_data(self, event):
data = event.kw['data']
if self.config.verbose:
print('Receiving %d bytes' % len(data))
print(hexdump('<=', data))
def ac_timeout(self, event):
self.send_event(self.connex, 'EV_SEND_DATA', data=QUERY)
self.set_timeout(10)
ONGCONNEX_FSM = {
'event_list': (
'EV_SET_TIMER: bottom output',
'EV_TIMEOUT:bottom input',
'EV_RX_DATA:bottom input',
),
'state_list': ('ST_IDLE',),
'machine': {
'ST_IDLE':
(
('EV_TIMEOUT', ac_timeout, 'ST_IDLE'),
('EV_RX_DATA', ac_rx_data, 'ST_IDLE'),
),
}
}
ONGCONNEX_GCONFIG = { # type, default_value, flag, validate_function, desc
'verbose': [int, 0, 0, None, "Increase output verbosity. Values [0,1,2]"],
'url1': [str, 'www.google.com', 0, None, "Url one to connect."],
'url2': [str, 'www.twitter.com', 0, None, "Url two to connect."],
}
[docs]class OnGConnex(GObj):
""" OnGConnex GObj.
.. ginsfsm::
:fsm: ONGCONNEX_FSM
:gconfig: ONGCONNEX_GCONFIG
*Input-Events:*
* :attr:`'EV_TIMEOUT'`: Timer over.
Start the machine.
* :attr:`'EV_RX_DATA'`: Receiving data.
Receiving data from urls.
*Output-Events:*
* :attr:`'EV_START_TIMER'`: Start timer.
"""
def __init__(self):
GObj.__init__(self, ONGCONNEX_FSM, ONGCONNEX_GCONFIG)
[docs] def start_up(self):
""" Initialization zone."""
self.timer = self.create_gobj(
None,
GTimer,
self,
)
self.connex = self.create_gobj(
'connex',
GConnex,
self,
destinations=[
(self.config.url1, 80),
(self.config.url2, 80),
],
timeout_inactivity=5,
# only want receive EV_RX_DATA event
connected_event_name=None,
disconnected_event_name=None,
transmit_ready_event_name=None,
)
self.set_timeout(5)
def set_timeout(self, seconds):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=seconds)
def clear_timeout(self):
self.send_event(self.timer, 'EV_SET_TIMER', seconds=-1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"url1",
nargs='?', default='www.google.es',
help="First url to connect."
)
parser.add_argument(
"url2",
nargs='?', default='www.twitter.es',
help="Second url to connect."
)
parser.add_argument(
"-v",
"--verbose",
help="Increase output verbosity",
type=int,
choices=[0, 1, 2],
default=1,
)
args = parser.parse_args()
local_conf = {
'GObj.trace_mach': True if args.verbose else False,
'GObj.logger': logging,
}
ga = GAplic(name='', roles='', **local_conf)
ga.create_gobj(
'ongconnex',
OnGConnex,
ga,
verbose=args.verbose,
url1=args.url1,
url2=args.url2,
)
try:
ga.start()
except (KeyboardInterrupt, SystemExit):
print('Program stopped')