Source code for ginsfsm.examples.essential_gobjs.stress_client_echo

"""
GObj :class:`OnClient`
======================

Stress with many connections and many data.

To run against :mod:`ginsfsm.examples.stress_server_echo`

The server echo the data.

It uses :class:`ginsfsm.c_connex.GConnex`.

.. autoclass:: OnClient
    :members:

"""

import datetime

import logging
logging.basicConfig(level=logging.DEBUG)

from ginsfsm.gobj import GObj
from ginsfsm.gaplic import GAplic
from ginsfsm.c_timer import GTimer
from ginsfsm.c_connex import GConnex

#===============================================================
#                   Client
#===============================================================
query = b"GET / HTTP/1.1\r\n" + \
    b"Host: \r\n" + \
    b"\r\n"


def ac_client_timeout(self, event):
    if self.connex is None:
        self.connex = list(range(self.config.connections))

    if self.n_clients < self.config.connections:
        for i in list(range(self.n_clients, self.config.connections)):
            self.n_clients += 1
            self.connex[i] = self.create_gobj(
                'client-%d' % i,
                GConnex,  # GSock,
                self,
                destinations=[(self.config.host, self.config.port)],
                transmit_ready_event_name=None,
            )

            self.connex[i].idx = i
            self.connex[i].conectado = 0
            self.connex[i].sended_msgs = 0
            self.connex[i].received_msgs = 0
            #self.connex[i].mt_connect(host='172.21.228.211', port=8084)
            #ret = self.connex[i].mt_connect(host='127.0.0.1', port=8000) #8082
            #if not ret:
            #    break
    print("conectados: %d" % self.n_connected_clients)
    if self.n_connected_clients == self.config.connections:
        n_echoes = 0
        n_total = 0
        diff = datetime.timedelta(seconds=0)
        for cli in self.connex:
            #print('cli %d txed_msgs %d, rxed_msgs %d' % (cli.idx, cli.gsock.txed_msgs, cli.gsock.rxed_msgs))
            if cli.gsock.rxed_msgs == cli.gsock.txed_msgs:
                n_total += 1
            if cli.sended_msgs == cli.received_msgs:
                n_echoes += 1
            diff += self.diff
        print("Echoes OK: %d of %d, taverage %s" % (n_echoes, self.config.connections, diff/self.config.connections))

        for cli in self.connex:
            if cli.gsock.connected:
                cli.sended_msgs = 1
                cli.received_msgs = 0
                cli.tx_time = datetime.datetime.now()
                self.send_event(cli, 'EV_SEND_DATA', data=query) #data="HOLA")

    self.set_timeout(10)


def ac_client_connected(self, event):
    if not event.source[-1].conectado:
        self.n_connected_clients += 1
        event.source[-1].conectado = 1
    print("C: conectados: %d" % self.n_connected_clients)


def ac_client_disconnected(self, event):
    if event.source[-1].conectado:
        self.n_connected_clients -= 1
        event.source[-1].conectado = 0
    print("D: conectados: %d" % self.n_connected_clients)


def ac_client_rx_data(self, event):
    cli = self.connex[event.source[-1].idx]
    cli.received_msgs += 1
    cli.rx_time = datetime.datetime.now()
    diff = cli.rx_time - cli.tx_time
    if diff < self.min_response_time:
        self.min_response_time = diff
    if diff > self.max_response_time:
        self.max_response_time = diff
    self.diff = diff
    #print('recibo:', event.data)
    #print('diff %s, min %s, max %s' % (diff, self.min_response_time, self.max_response_time))


CLIENT_FSM = {
    'event_list': (
        'EV_SET_TIMER: bottom output',
        'EV_TIMEOUT: bottom input',
        'EV_CONNECTED: bottom input',
        'EV_DISCONNECTED: bottom input',
        'EV_RX_DATA:bottom input',
    ),
    'state_list': ('ST_IDLE',),
    'machine': {
        'ST_IDLE':
        (
            ('EV_TIMEOUT',          ac_client_timeout,         'ST_IDLE'),
            ('EV_CONNECTED',        ac_client_connected,       'ST_IDLE'),
            ('EV_DISCONNECTED',     ac_client_disconnected,    'ST_IDLE'),
            ('EV_RX_DATA',          ac_client_rx_data,         'ST_IDLE'),
        ),
    }
}


CLIENT_GCONFIG = {  # type, default_value, flag, validate_function, desc
    'verbose': [int, 0, 0, None, "Increase output verbosity. Values [0,1,2]"],
    'connections': [int, 0, 0, None, "Limit of connections to be reached."],
    'host': [str, '127.0.0.1', 0, None, "Port."],
    'port': [int, 8000, 0, None, "Port."],
}


[docs]class OnClient(GObj): """ Client GObj. .. ginsfsm:: :fsm: CLIENT_FSM :gconfig: CLIENT_GCONFIG *Input-Events:* * :attr:`'EV_TIMEOUT'`: Timer over. Start the machine. * :attr:`'EV_CONNECTED'`: New client. * :attr:`'EV_DISCONNECTED'`: Client disconnected. * :attr:`'EV_RX_DATA'`: Receiving data. *Output-Events:* * :attr:`'EV_START_TIMER'`: Start timer. """ def __init__(self): GObj.__init__(self, CLIENT_FSM, CLIENT_GCONFIG) self.n_clients = 0 self.n_connected_clients = 0 def start_up(self): self.timer = self.create_gobj( None, GTimer, self ) self.connex = None self.sended_msgs = 0 self.received_msgs = 0 self.min_response_time = datetime.timedelta(seconds=100) self.max_response_time = datetime.timedelta(seconds=0) self.diff = datetime.timedelta(seconds=0) self.set_timeout(1) def set_timeout(self, seconds): self.send_event(self.timer, 'EV_SET_TIMER', seconds=seconds) def clear_timeout(self): self.send_event(self.timer, 'EV_SET_TIMER', seconds=-1) #=============================================================== # Main #===============================================================
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "host", nargs='?', default='127.0.0.1', help="Host." ) parser.add_argument( "port", type=int, nargs='?', default=8000, help="Port." ) parser.add_argument( "connections", type=int, nargs='?', default=100, help="Limit of connections to be reached." ) parser.add_argument( "--run-as-process", action="store_true", help="Run the server as subprocess. By default it runs as thread." ) parser.add_argument( "-v", "--verbose", help="Increase output verbosity", type=int, choices=[0, 1, 2], default=0, ) args = parser.parse_args() run_as_process = args.run_as_process local_conf = { 'GObj.trace_mach': True if args.verbose else False, 'GObj.logger': logging, } ga_cli = GAplic(name='client', roles='', **local_conf) ga_cli.create_gobj( 'client', OnClient, ga_cli, host=args.host, port=args.port, connections=args.connections, ) try: ga_cli.start() except (KeyboardInterrupt, SystemExit): from ginsfsm.c_sock import close_all_sockets close_all_sockets(ga_cli._socket_map) print('Program stopped')