predictionio Package Documentation

PredictoinIO Python SDK

The PredictoinIO Python SDK provides easy-to-use functions for integrating Python applications with PredictionIO REST API services.

The SDK comprises of two clients:

  1. EventClient, it is for importing data into the PredictionIO platform.
  2. EngineClient, it is for querying PredictionIO Engine Instance, submit query and extract prediction results.

The SDK also provides a FileExporter for you to write events to a JSON file in the same way as EventClient. The JSON file can be used by “pio import” for batch data import.

Please read PredictionIO Event API for explanation of how SDK can be used to import events.

predictionio.EventClient Class

class predictionio.EventClient(access_key, url='http://localhost:7070', threads=1, qsize=0, timeout=5, channel=None)

Client for importing data into PredictionIO Event Server.

Notice that app_id has been deprecated as of 0.8.2. Please use access_token instead.

Parameters:
  • access_key – the access key for your application.
  • url – the url of PredictionIO Event Server.
  • threads – number of threads to handle PredictionIO API requests. Must be >= 1.
  • qsize – the max size of the request queue (optional). The asynchronous request becomes blocking once this size has been reached, until the queued requests are handled. Default value is 0, which means infinite queue size.
  • timeout – timeout for HTTP connection attempts and requests in seconds (optional). Default value is 5.
  • channel – channel name (optional)

Note

The “threads” parameter specifies the number of connection threads to the PredictionIO server. Minimum is 1. The client object will spawn out the specified number of threads. Each of them will establish a connection with the PredictionIO server and handle requests concurrently.

Note

If you ONLY use blocking request methods, setting “threads” to 1 is enough (higher number will not improve anything since every request will be blocking). However, if you want to take full advantage of asynchronous request methods, you should specify a larger number for “threads” to increase the performance of handling concurrent requests (although setting “threads” to 1 will still work). The optimal setting depends on your system and application requirement.

acreate_event(event, entity_type, entity_id, target_entity_type=None, target_entity_id=None, properties=None, event_time=None)

Asynchronously create an event.

Parameters:
  • event – event name. type str.
  • entity_type – entity type. It is the namespace of the entityId and analogous to the table name of a relational database. The entityId must be unique within same entityType. type str.
  • entity_id – entity id. entity_type-entity_id becomes the unique identifier of the entity. For example, you may have entityType named user, and different entity IDs, say 1 and 2. In this case, user-1 and user-2 uniquely identifies entities. type str
  • target_entity_type – target entity type. type str.
  • target_entity_id – target entity id. type str.
  • properties – a custom dict associated with an event. type dict.
  • event_time – the time of the event. type datetime, must contain timezone info.
Returns:

AsyncRequest object. You can call the get_response() method using this object to get the final resuls or status of this asynchronous request.

create_event(event, entity_type, entity_id, target_entity_type=None, target_entity_id=None, properties=None, event_time=None)

Synchronously (blocking) create an event.

aget_event(event_id)

Asynchronouly get an event from Event Server.

Parameters:event_id – event id returned by the EventServer when creating the event.
Returns:AsyncRequest object.
get_event(event_id)

Synchronouly get an event from Event Server.

adelete_event(event_id)

Asynchronouly delete an event from Event Server.

Parameters:event_id – event id returned by the EventServer when creating the event.
Returns:AsyncRequest object.
delete_event(event_id)

Synchronouly delete an event from Event Server.

aset_user(uid, properties={}, event_time=None)

Set properties of a user.

Wrapper of acreate_event function, setting event to “$set” and entity_type to “user”.

set_user(uid, properties={}, event_time=None)

Set properties of a user

aunset_user(uid, properties, event_time=None)

Unset properties of an user.

Wrapper of acreate_event function, setting event to “$unset” and entity_type to “user”.

unset_user(uid, properties, event_time=None)

Set properties of a user

adelete_user(uid, event_time=None)

Delete a user.

Wrapper of acreate_event function, setting event to “$delete” and entity_type to “user”.

delete_user(uid, event_time=None)

Delete a user.

aset_item(iid, properties={}, event_time=None)

Set properties of an item.

Wrapper of acreate_event function, setting event to “$set” and entity_type to “item”.

set_item(iid, properties={}, event_time=None)

Set properties of an item.

aunset_item(iid, properties={}, event_time=None)

Unset properties of an item.

Wrapper of acreate_event function, setting event to “$unset” and entity_type to “item”.

unset_item(iid, properties={}, event_time=None)

Unset properties of an item.

adelete_item(iid, event_time=None)

Delete an item.

Wrapper of acreate_event function, setting event to “$delete” and entity_type to “item”.

delete_item(iid, event_time=None)

Delete an item.

arecord_user_action_on_item(action, uid, iid, properties={}, event_time=None)

Create a user-to-item action.

Wrapper of acreate_event function, setting entity_type to “user” and target_entity_type to “item”.

record_user_action_on_item(action, uid, iid, properties={}, event_time=None)

Create a user-to-item action.

predictionio.EngineClient Class

class predictionio.EngineClient(url='http://localhost:8000', threads=1, qsize=0, timeout=5)

Client for extracting prediction results from an PredictionIO Engine Instance.

Parameters:
  • url – the url of the PredictionIO Engine Instance.
  • threads – number of threads to handle PredictionIO API requests. Must be >= 1.
  • qsize – the max size of the request queue (optional). The asynchronous request becomes blocking once this size has been reached, until the queued requests are handled. Default value is 0, which means infinite queue size.
  • timeout – timeout for HTTP connection attempts and requests in seconds (optional). Default value is 5.
asend_query(data)

Asynchronously send a request to the engine instance with data as the query.

Parameters:data – the query: It is coverted to an json object using json.dumps method. type dict.
Returns:AsyncRequest object. You can call the get_response() method using this object to get the final resuls or status of this asynchronous request.
send_query(data)

Synchronously send a request.

Parameters:data – the query: It is coverted to an json object using json.dumps method. type dict.
Returns:the prediction.

predictionio.AsyncRequest Class

class predictionio.AsyncRequest(method, path, **params)

AsyncRequest object

set_response(response)

store the response

NOTE: Must be only called once

get_response()

Get the response. Blocking.

Returns:self.rfunc’s return type.

predictionio.FileExporter Class

New in version 0.9.2.

class predictionio.FileExporter(file_name)

File exporter to write events to JSON file for batch import

Parameters:file_name – the destination file name
create_event(event, entity_type, entity_id, target_entity_type=None, target_entity_id=None, properties=None, event_time=None)

Create an event and write to the file.

(please refer to EventClient’s create_event())

close()

Close the FileExporter

Call this method when you finish writing all events to JSON file

predictionio SDK Usage Notes

Asynchronous Requests

In addition to normal blocking (synchronous) request methods, this SDK also provides non-blocking (asynchronous) request methods. All methods prefixed with ‘a’ are asynchronous (eg, aset_user(), aset_item()). Asynchronous requests are handled by separate threads in the background, so you can generate multiple requests at the same time without waiting for any of them to finish. These methods return immediately without waiting for results, allowing your code to proceed to work on something else. The concept is to break a normal blocking request (such as set_user()) into two steps:

  1. generate the request (e.g., calling asend_query());
  2. get the request’s response by calling get_response().

This allows you to do other work between these two steps.

Note

In some cases you may not care whether the request is successful for performance or application-specific reasons, then you can simply skip step 2.

Note

If you do care about the request status or need to get the return data, then at a later time you will need to call get_response() with the AsyncRequest object returned in step 1.

For example, the following code first generates an asynchronous request to retrieve recommendations, then get the result at later time:

>>> # Generates asynchronous request and return an AsyncRequest object
>>> engine_client = EngineClient()
>>> request = engine_client.asend_query(data={"uid": "1", "n" : 3})
>>> <...you can do other things here...>
>>> try:
>>>    result = request.get_response() # check the request status and get the return data.
>>> except:
>>>    <log the error>

Batch Import Data with EventClient

When you import large amount of data at once, you may also use asynchronous request methods to generate lots of requests in the beginning and then check the status at a later time to minimize run time.

For example, to import 100000 of user records:

>>> # generate 100000 asynchronous requests and store the AsyncRequest objects
>>> event_client = EventClient(access_key=<YOUR_ACCESS_KEY>)
>>> for i in range(100000):
>>>   event_client.aset_user(user_record[i].uid)
>>>
>>> <...you can do other things here...>
>>>
>>> # calling close will block until all requests are processed
>>> event_client.close()

Alternatively, you can use blocking requests to import large amount of data, but this has significantly lower performance:

>>> for i in range(100000):
>>>   try:
>>>      client.set_user(user_record[i].uid)
>>>   except:
>>>      <log the error>

Batch Import Data with FileExporter and “pio import”

New in version 0.9.2.

You can use FileExporter to create events and write to a JSON file which can be used by “pio import”. Pleas see Importing Data in Batch for more details.

Note that this method is much faster than batch import with EventClient.