Source code for gossip.communication.client_sender

# Copyright 2016 Anselm Binninger, Thomas Maier, Ralph Schaumann
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import multiprocessing
import socket

from gossip.util.exceptions import GossipQueueException, GossipIdentifierNotFound
from gossip.util.queue_item_types import QUEUE_ITEM_TYPE_SEND_MESSAGE, QUEUE_ITEM_TYPE_ESTABLISH_CONNECTION
from gossip.communication.client_receiver import GossipClientReceiver

__author__ = 'Anselm Binninger, Thomas Maier, Ralph Schaumann'


[docs]class GossipSender(multiprocessing.Process): """ The Gossip sender receives new commands from the responsible controller. The sender is responsible for sending new messages to specified receivers. It is able to establish new connections as well if the controller sends the appropriate command to do so. """ def __init__(self, sender_label, from_controller_queue, to_controller_queue, connection_pool): """ Constructor. :param sender_label: A label to derive the concrete functionality of this client sender :param from_controller_queue: The client sender gets new commands via this queue from the responsible controller :param to_controller_queue: This instance forwards the controller queue to new receiver instances :param connection_pool: The connection pool which contains all connections/sockets """ multiprocessing.Process.__init__(self) self.sender_label = sender_label self.from_controller_queue = from_controller_queue self.to_controller_queue = to_controller_queue self.connection_pool = connection_pool
[docs] def run(self): """ This is a typical run method for the sender process. It waits for commands from the controller to establish new connections or to send messages to established connections. The sender gets the appropriate connection/socket from the connection pool. """ logging.info('%s started - PID: %s' % (self.sender_label, self.pid)) while True: queue_item = self.from_controller_queue.get() queue_item_type = queue_item['type'] identifier = queue_item['identifier'] # Fetch the right connection if queue_item_type == QUEUE_ITEM_TYPE_SEND_MESSAGE: message = queue_item['message'] # Fetch connection from connection pool logging.info("%s | Redirecting message (code %d) to corresponding client" % (self.sender_label, message.get_values()['code'])) try: connection = self.connection_pool.get_connection(identifier) except GossipIdentifierNotFound: connection = None # Send message if connection: if message: encoded = message.encode() try: connection.send(encoded) logging.debug('%s | Sent message (%s) to client %s | Sent message: %s' % (self.sender_label, message.get_values()['code'], identifier, message.get_values())) except (ConnectionResetError, ConnectionAbortedError): self.connection_pool.remove_connection(identifier) logging.error('%s | During sending a message peer disconnected' % self.sender_label) else: logging.error('%s | No connection found in connection pool, giving up' % self.sender_label) elif queue_item_type == QUEUE_ITEM_TYPE_ESTABLISH_CONNECTION: # Establish new connection logging.info("%s Establishing new connection to %s" % (self.sender_label, identifier)) connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_host, server_port = identifier.split(':') server_port = int(server_port) try: connection.connect((server_host, server_port)) self.connection_pool.add_connection(identifier, connection, server_identifier=identifier) except ConnectionRefusedError: logging.error('%s | Cannot establish connection to %s' % (self.sender_label, identifier)) continue logging.info("%s | Added new connection to connection pool" % self.sender_label) # Create client receiver for new connection # TODO Client receiver label should not be hardcoded here! client_receiver = GossipClientReceiver('P2PClientReceiver', connection, server_host, server_port, self.to_controller_queue, self.connection_pool) client_receiver.start() else: # If this happens, someone did a horrible mistake in the code: The queue item type is not supported! raise GossipQueueException('%s: Queue item cannot be identified! This should never happen!' % self.sender_label)