qam_proxy

The QAMProxy sends method-calls to a QAMServer-instance and receives the results. It is possible to register a callback-method on the proxy. This method is called when a result for a specific method-call is available.

The QAMProxy consists of the following classes:

qam.qam_proxy.QAMProxy :

This class manages the start of:

  • ConsumerThread
  • PublisherThread
  • CallbackObserverThread

The PublisherThread runs in an infinite-loop and sends continuously messages to the QAMServer. The messages which are send to the QAMServer are stored in a method_queue.

The ConsumerThread receives continuously messages from the QAMServer and signals that a message has arrived. If a callback is registered the ConsumerThread signals the CallbackObserverThread that a message has arrived, else it signals the QAMProxy that the message has arrived.

The CallbackObserverThread checks continuously if it is time to execute a callback-method. A callback-method is executed if a callback-method is registered and the ConsumerThread signals that a result has arrived.

qam.qam_proxy.ConsumerThread :
The ConsumerThread receives continuously messages from the QAMServer and signals that a message has arrived. Therefor it is necessary to keep track of all remote-method-calls ever send to the QAMServer. The ConsumerThread keeps a list with all QAMServer calls, that haven’t received a result yet, so it is possible to relate a result to a caller. This list consists of “slots”. A slot is identified by an id. The slot is reserved if the caller is still waiting on a message and the slot with the given id is being freed if the result has arrived.
qam.qam_proxy.PublisherThread :
The PublisherThread runs in an infinite-loop and sends continuously messages to the QAMServer. Therefor he keeps a queue (method_queue) so that every method which is not sent yet, isn’t loosed. If the queue is empty, the PublisherThread waits until a new message arrives the queue.
qam.qam_proxy.CallbackObserverThread :
The CallbackObserverThread checks continuously if it is time to execute a callback-method. If a callback-method is registered and a result arrived in the ConsumerThread, the CallbackObserverThread will be informed about it. Then the he checks wether an error occourred on the QAMServer or not. If no error occoured the callback-method which is registered for this request is being called with the result from the QAMServer as parameter. If an error occoured and a error-method is registered this method will be called, with the exception as parameter. Else a log-message will be written into the logfiles. It is the job of the ExecuteCallbackThread to call the callback-methods, otherwise the CallbackObserverThread would block.
qam.qam_proxy.ExecuteCallbackThread :
The ExecuteCallbackThread receives a method and parameters from the CallbackObserverThread. The job of the ExecuteCallbackThread is to execute this method.
qam.qam_proxy.Callback :
This Class is used to realize the callback-mechanism.
qam.qam_proxy._Method :
This Class is used to realize the remote-method-call.

The QAMProxy consists of the following Exceptions:

qam.qam_proxy.QAMException :
This is the class for internal exceptions. An internal exception would be everything raised by the QAMServer.
qam.qam_proxy.QAMMEthodNotFoundException :
This exception is created, if the method called by the QAMProxy isn’t registerd on the QAMServer.

Usage

Calling a method without callback

An example-usage of the QAMProxy. The code shows how to call a method that is registered on QAMServer.

>>> from qam.qam_proxy import QAMProxy, QAMMethodNotFoundException, QAMException
>>> qam_proxy = QAMProxy(hostname="localhost",
...                    port=5672,
...                    username='guest',
...                    password='guest',
...                    vhost='/',
...                    server_id='qamserver',
...                    client_id='qamproxy')
...
>>> result = qam_proxy.add(2,3) # name of method registered on QAMServer
>>> qam_proxy.close()

Calling a method with callback

An example-usage of the QAMProxy with callback functions.

>>> from qam.qam_proxy import QAMProxy, QAMMethodNotFoundException, QAMException
>>> qam_proxy = QAMProxy(hostname="localhost",
...                    port=5672,
...                    username='guest',
...                    password='guest',
...                    vhost='/',
...                    server_id='qamserver',
...                    client_id='qamproxy')
...
... # defining functions for callback
>>> def success(arg):
...    print arg
>>> def error(arg):
...    # if an error occours on QAMServer
...    print arg 
>>> uid = qam_proxy.callback(success, error).adder_function(2,4)
>>> while True:
...    state = qam_proxy.get_callback_state(uid)
...    if state == 2 :
...        # execution of callback finished
...        break
...
>>> qam_proxy.close()

Calling a registered Class

An example-usage of the QAMProxy for calling a registered class-instance on QAMServer.

>>> from qam.qam_proxy import QAMProxy, QAMMethodNotFoundException, QAMException
>>> qam_proxy = QAMProxy(hostname="localhost",
...                    port=5672,
...                    username='guest',
...                    password='guest',
...                    vhost='/',
...                    server_id='qamserver',
...                    client_id='qamproxy')
...
>>> result = qam_proxy.testclass.adder_function(2,4)
>>> qam_proxy.close()

Classes

class qam.qam_proxy.Callback(parent, id, client_id)

The class Callback is callable. It is used to realize the callback-mechanism.

Parameters:
  • parent – instance of QAMProxy
  • id – id of slot in the result-list
  • client_id – used for routing-key generation
set_timeout(timeout)

This method is for setting a timeout for a asynchronous remote method call. If the timeout happens before the callback function is called a QAMTimeoutException will be passed to the registered error function.

Parameters:timeout (integer) – the timeout in seconds
class qam.qam_proxy.CallbackObserverThread(parent)

The CallbackObserverThread checks continuosly if for an registered callback-function a result arrived. If the result arrived the specific callback-function is being executed by the ExecuteCallbackThread.

Parameters:parent – see parent

Class-variables of CallbackObserverThread:

parent : qam.qam_proxy.QAMProxy
the QAMProxy is necessary to reach the methods of qam.qam_proxy.ConsumerThread
callback_uid_dict : dictionary
Dictionary which monitors the callbacks, that are not executed yet.
Structure of the callback_uid_dict:
  • “id”: string (id in the result_list)
  • “state”: integer
callback_dickt_lock : threading.Lock
A lock for locking the critical sections.
kill : bool
The kill signal tells the CallbackObserverThread to stop calling callback-methods.
change_callback_item_state(uid, state)

This method changes the state of a specific callback-item. Possible values for the state are:

  • 0 = waiting for processing
  • 1 = processing
  • 2 = finished
Parameters:
  • uid (string) – uid of the desired item
  • state (integer) – state to be set

It is a critical section so it is necessary to lock this section.

check_timeout(uid, callback_info)

This method checks if a timeout has occoured and calls the error method with a QAMTimeoutException

Parameters:
  • uid (integer) – uid of the callback info in callback_uid_dict
  • callback_info – the corresponding info from the callback_uid_dict
exit()

The exit method terminates the CallbackObseverThread.

get_callback_item(uid)

This function returns an item from the callback_uid_dict, identified by the given uid.

Parameters:uid (string) – uid of the desired item
Return type:dictionary, {“id”: string, “state”: integer}

It is a critical section so it is necessary to lock this section.

init_waiting_callback_item(id, timeout)

This method initializes a new callback-item with an unique uid. The given id is being stored and the state is set to WAITING. The new item is stored in the callback_uid_dict.

Parameters:id (integer) – id of the item in the result_list
remove_callback_item(uid)

This method removes an item from the callback_uid_dict. This is the case, when a callback is already executed.

Parameters:uid (string) – uid of the desired item

It is a critical section so it is necessary to lock this section.

run()
The run method checks for every registered callback, if the result has already arrived. If the result has arrived:
  • in case of success the registered callback-function is being called
  • in case of an error:
    • if an error-function is registered this function is being called with the exception as parameter
    • if no error-function is registered a log-message is written into logfile

For the execution of the callback-function a new ExecuteCallbackThread is being started. This thread finally executes the method.

class qam.qam_proxy.ConsumerThread(amqp_credentials, server_id, client_id, serializer)

The ConsumerThread receives messages from the QAMServer. When a message arrives, the result is stored in the result_list, identified by the id. Further the specific entry in the result_list gets the state DONE.

Structure of the received message:

  • “id”: integer
  • “result”: result of method-call
  • “exception”: bool
  • “internal_exception”: integer
Parameters:
  • amqp_credentials – all necessary stuff for creating a amqp-connection ist stored in here.
  • server_id – see server_id
  • client_id – see client_id

Class-variables of ConsumerThread:

free_index_queue : list
Free slots (ids) from the result_list are stored in the free_index_queue.
result_list : list
The result_list stores the result and a state for a remote-method-call identified by an id. Before receiving the result the specific slot for a remote-method-call in the result_list has the state WAITING. After receiving the slot gets the state DONE.
server_id : string
server_id received from parent QAMProxy. Used for amqp-connection to the server.
client_id : string
client_id received from parent QAMProxy. Used for defining routing-key and amqp-queue-name
kill : bool
The kill signal tells the ConsumerThread to stop receiving messages.
amqpconn : carrot.connection.AMQPConnection
AMQPConnection for the ConsumerThread
consumer : carrot.messaging.Consumer
AMQPConsumer for the ConsumerThread. The AMQPConsumer receives the messages sent by the QAMServer
exit()

The exit method terminates the ConsumerThread and closes the amqp-consumer and the amqp-connection of the ConsumerThread.

free_result_item(id)

This function frees a specific result_list “slot”.

Parameters:id (integer) – id of the slot to free
get_free_result_list_id(callback_method=None, exception_method=None)

This method assigns a free slot to a new request. The new request is appended to result_list with the state WAITING.

Parameters:
  • callback_method (function or None) – name of the method to call when result arrives
  • exception_method (function or None) – name of the method to call if an exception occured on the QAMServer
Return type:

id of the assigned slot

get_result(id)

This function is a high level function for getting the result of the remote method call. It makes several checks to validate the result against Exceptions and raises them if any occoured

Exceptions:
  • QAMMethodNotFoundException: gets raised when the method on the server is not found
  • Exception: is raised when a user defined exception is raised on the server.
Parameters:id (integer) – id for the desired result
Return type:result for a specific id
get_result_item(id)

This function returns the result from the result_list, for a specific id.

Parameters:id (integer) – id for the desired result_list “slot”
Return type:result for a specific id
is_result_received(id)

This function checks, if a desired result from the QAMServer is already received.

Parameters:id (integer) – id for checking the state in the result_list.
Return type:True if the message is already received, else False
run()

Messages are received from QAMServer and stored in the result_list. T he state in the result_list for specific results is set DONE.

class qam.qam_proxy.ExecuteCallbackThread(parent, method, params, uid)

The ExecuteCallbackThread executes a registered callback-function with the results received from the QAMServer as parameter.

Parameters:
  • parent – see parent
  • method – see method
  • params – see params
  • uid – see uid

Class-variables of ExecuteCallbackThread:

parent : qam.qam_proxy.QAMProxy
the QAMProxy is necessary to reach the methods of qam.qam_proxy.CallbackObserverThread:
method : function
The method that should be called by the ExecuteCallbackThread.
params : string
result received from QAMServer.
uid : string
The uid which identifies the actual callback.
run()

The run method executes the given callback-function.

class qam.qam_proxy.PublisherThread(amqp_credentials, server_id, client_id, serializer)

The PublisherThread takes messages from the method_queue and sends them to the QAMServer. The method_queue is filled by qam.qam_proxy.QAMProxy.__request and by qam.qam_proxy.Callback.__request.

Parameters:
  • amqp_credentials – see amqp_credentials
  • server_id – see server_id
  • client_id – see client_id

Class-variables of PublisherThread:

amqp_credentials : qam.amqp_credentials.AMQPCredentials
all necessary stuff for creating a amqp-connection ist stored in here.
server_id : string
server_id received from parent QAMProxy. Used for amqp-connection to the server.
client_id : string
client_id received from parent QAMProxy. Used for defining routing-key and amqp-queue-name
kill : bool
The kill signal tells the PublisherThread to stop receiving messages.
method_queue : list
All requests for a method-call on the QAMServer are stored in the method_queue.
publisher : carrot.messaging.Publisher
The publisher sends messages to the QAMServer.
add_message_to_send(message)

A new message is appended to the method_queue. All messages that are stored in the method_queue are sent to the QAMServer.

Parameters:message (dictionary) – the message which should be appended.
The structure of the message is as follows:
  • “id”: integer
  • “method_name”: string
  • “routing_key”: string
  • “params”: list
exit()

The exit method terminates the PublisherThread and closes the publisher and the amqp-connection

run()

The run method sends messages which are stored in the method_queue to the QAMServer.

exception qam.qam_proxy.QAMException

Internal QAMServer Exception.

exception qam.qam_proxy.QAMMethodNotFoundException

Internal QAMServer Exception, which is generated if the method called by the QAMProxy doesn’t exist on the QAMServer.

class qam.qam_proxy.QAMProxy(server_id, hostname='localhost', port=5672, username='guest', password='guest', vhost='/', client_id=None, serializer='pickle', ssl=False)

The role of the QAMProxy is sending messages to a specific QAMServer. Instead of waiting on the message it is also possible to register a callback-method which is being called if a result from QAMServer has arrived. The QAMProxy manages the start of ConsumerThread, PublisherThread an CallbackObserverThread.

Parameters:
  • server_id – id of the QAMServer, for reaching the right queue
  • hostname – hostname where amqp-server is running
  • port – port for amqp-server
  • username – username for amqp-server
  • password – password for amqp-server
  • vhost – virtual host of amqp-server
  • client_id – see client_id

Class-variables of QAMProxy:

amqp_credentials : qam.amqp_credentials.AMQPCredentials
all necessary stuff for creating a amqp-connection ist stored in here.
consumer_thread : qam.qam_proxy.ConsumerThread
Thread for receiving messages from QAMServer.
publisher_thread : qam.qam_proxy.PublisherThread
Thread for sending messages to a QAMServer
callback_observer_thread : qam.qam_proxy.CallbackObserverThread
Thread for calling the callback-functions when a result is available
callback(callback_method, exception_method=None)

The callback-function registers a callback-method and an exception-method if an exception-method is given.

Parameters:
  • callback_method (function) – the function that should be called
  • exception_method (function or None) – exception_method
Return type:

callable qam.qam_proxy.Callback-instance

close()

The PublisherThread, the ConsumerThread and the CallbackObserverTrhead are being stopped (kill-signal set to True).

get_callback_state(uid)

Returns the state of the callback method registered for a specific uid.

Return type:state of the callback:
  • 0 = waiting for processing
  • 1 = processing
  • 2 = finished
get_num_waiting_callbacks()

This function returns the number of callacks which are waiting for a result from QAMServer.

Return type:number of callbacks not executed yet
set_timeout(timeout)

The set_timeout-function sets a timeout in seconds and returns a callable class where the remote function can be called. If a timeout occurs a QAMTimeoutException is raised.

Parameters:timeout (integer) – timeout value in seconds
Return type:callable qam.qam_proxy.SynchronTimeout-instance
exception qam.qam_proxy.QAMTimeoutException

Internal QAMServer Exception, which is generated if the method called by the QAMProxy produces a timeout.

class qam.qam_proxy.SynchronTimeout(parent, client_id, timeout)

The class SynchronTimeout is callable. It is used to realize the synchronous Timeout-mechanism.

Parameters:
  • parent – instance of QAMProxy
  • client_id – used for routing-key generation
  • timeout – timeout in seconds

Table Of Contents

Previous topic

qam_server

This Page