Source code for gossip.communication.server

# Copyright 2016 Anselm Binninger, Thomas Maier, Ralph Schaumann
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import multiprocessing
import socket

from gossip.communication.client_receiver import GossipClientReceiver

__author__ = 'Anselm Binninger, Thomas Maier, Ralph Schaumann'


[docs]class GossipServer(multiprocessing.Process): def __init__(self, server_label, client_receiver_label, bind_address, tcp_port, to_controller_queue, connection_pool): """ The Gossip server waits for new connections established by other clients. It also instantiates new receivers for incoming connections. :param server_label: A label to derive the concrete functionality of this gossip server :param client_receiver_label: This label is used for newly instantiated receivers :param bind_address: IPv4 address which is used to listen for new connections :param tcp_port: TCP port which is used to listen for new connections :param to_controller_queue: Newly instantiated receivers need to know a queue to communicate with the controller :param connection_pool: New connections will be added to the appropriate connection pool """ multiprocessing.Process.__init__(self) self.server_label = server_label self.client_receiver_label = client_receiver_label self.bind_address = bind_address self.tcp_port = tcp_port self.to_controller_queue = to_controller_queue self.connection_pool = connection_pool
[docs] def run(self): """ Typical run method for the sender process. It waits for new connections, refers to newly instantiated receiver instances, and finally starts the new receivers. """ try: logging.info('%s started (%s:%d) - PID: %s' % (self.server_label, self.bind_address, self.tcp_port, self.pid)) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((self.bind_address, self.tcp_port)) server_socket.listen(5) while True: client_socket, address = server_socket.accept() tcp_address, tcp_port = address connection_identifier = '%s:%d' % (tcp_address, tcp_port) self.connection_pool.add_connection(connection_identifier, client_socket) logging.info("%s | Added new connection to connection pool" % self.server_label) client_receiver = GossipClientReceiver(self.client_receiver_label, client_socket, tcp_address, tcp_port, self.to_controller_queue, self.connection_pool) client_receiver.start() server_socket.close() except OSError as os_error: logging.error('%s crashed (%s:%d) - PID: %s - %s' % (self.server_label, self.bind_address, self.tcp_port, self.pid, os_error))