from uuid import UUID
from logging import getLogger
from contextlib import closing
from requests import post
from requests.exceptions import HTTPError, ConnectionError
from requests_toolbelt.multipart.encoder import MultipartEncoder
from apscheduler.schedulers.background import BackgroundScheduler
[docs]class MeteorExporter(object):
"""
Manages the communication part of MeteorPi's export mechanism, acquiring :class:`meteorpi_fdb.FileExportTask` and
:class:`meteorpi_fdb.EventExportTask` instances from the database and sending them on to the appropriate receiver.
This class in effect defines the communication protocol used by this process.
The scheduler defined by default will also handle back-off under failure conditions. If an export fails, the count
of failures (since the server was started) for that export config will be incremented and all export tasks for that
config will be pushed a configurable distance into the future. If it fails more than a certain number of times the
config will be marked as disabled. Any successful export will reset the failure count.
:ivar scheduler:
An instance of :class:`apscheduler.schedulers.background.BackgroundScheduler` used to schedule regular mark of
entities to export and trigger the actual export of such entities.
"""
[docs] def __init__(self, db, mark_interval_seconds=300, max_failures_before_disable=4, defer_on_failure_seconds=1800,
scheduler=None):
"""
Build a new MeteorExporter. The export process won't run by default, you must call the appropriate methods on
this object to actually start exporting. A scheduler is created to handle automated, regular, exports, but is
not started, you must explicitly call its start method if you want regular exports to function.
:param MeteorDatabase db:
The database to read from, for both entities under replication and the export configurations.
:param int mark_interval_seconds:
The number of seconds after finishing on mark / export round that the next one will be triggered. Defaults
to 300 for a five minute break. Note that at the end of an export round the process is re-run immediately,
only finishing when there was nothing to do. This means that if we have a lot of data generated during an
export run we won't wait another five minutes before we process the new data.
:param int defer_on_failure_seconds:
The number of seconds into the future which will be applied to any tasks pending for a given config when
a task created by that config fails (including the failed task). The timestamp for any tasks with timestamps
less than now + defer_on_failure_seconds will be set to now + defer_on_failure_seconds.
:param scheduler:
The scheduler to use, defaults to a BackgroundScheduler with a non-daemon thread if not specified. Use a
blocking one for test purposes.
:param int max_failures_before_disable:
The number of times an export configuration can have its exports fail before it is disabled.
"""
self.db = db
if scheduler is None:
scheduler = BackgroundScheduler(daemon=False)
self.scheduler = scheduler
job_id = "meteorpi_export"
failure_counts = {}
logger = getLogger("meteorpi.db.export")
def scheduled_export():
self.scheduler.pause_job(job_id=job_id)
job_count = -1
try:
while job_count != 0:
# Mark any new entities for export
for export_config in db.get_export_configurations():
if export_config.enabled:
db.mark_entities_to_export(export_config)
logger.info("Marked entities to export for config id {0}".format(export_config.config_id))
job_count = 0
while True:
state = self.handle_next_export()
if state is None:
# No jobs were processed on this tick, break out to the next control layer
break
elif state.state == "failed" or state.state == "confused":
config_id = state.config_id
job_count += 1
# Handle failure
export_config = db.get_export_configuration(config_id=config_id)
failure_count = failure_counts.pop(config_id, 0) + 1
logger.info(
"Failure for {0}, previous failure count was {1}".format(config_id, failure_count))
if failure_count >= max_failures_before_disable:
# Disable this config
export_config = db.get_export_configuration(config_id=config_id)
export_config.enabled = False
db.create_or_update_export_configuration(export_config)
# Doesn't add the failure count back in, as we want to be able to run should the user
# re-enable this export configuration
else:
# Defer entries created by this config
failure_counts[config_id] = failure_count
db.defer_export_tasks(config_id=config_id, seconds=defer_on_failure_seconds)
else:
config_id = state.config_id
job_count += 1
# Reset failure count for this config, as we just exported something with it
failure_counts.pop(config_id, None)
finally:
self.scheduler.resume_job(job_id=job_id)
self.scheduler.add_job(id=job_id, func=scheduled_export, trigger="interval", seconds=mark_interval_seconds)
[docs] def handle_next_export(self):
"""
Retrieve and fully evaluate the next export task, including resolution of any sub-tasks requested by the
import client such as requests for binary data, camera status etc.
:return:
An instance of ExportStateCache, the 'state' field contains the state of the export after running as many
sub-tasks as required until completion or failure. If there were no jobs to run this returns None.
:complete:
A job was processed and completed. The job has been marked as complete in the database
:continue:
A job was processed, more information was requested and sent, but the job is still active
:failed:
A job was processed but an error occurred during processing
:confused:
A job was processed, but the importer returned a response which we couldn't recognise
"""
state = None
while True:
state = self._handle_next_export_subtask(export_state=state)
if state is None:
return None
elif state.export_task is None:
return state
def _handle_next_export_subtask(self, export_state=None):
"""
Process the next export sub-task, if there is one.
:param ExportStateCache previous_export_response:
If provided, this is used instead of the database queue, in effect directing the exporter to process the
previous export again. This is used to avoid having to query the database when we know already what needs
to be done. It also maintains a cache of the entity so we don't have to re-acquire it on multiple exports.
:return:
A :class:`meteorpi_fdb.exporter.MeteorExporter.ExportStateCache` representing the state of the export, or
None if there was nothing to do.
"""
# Use a cached state, or generate a new one if required
if export_state is None or export_state.export_task is None:
export = self.db.get_next_entity_to_export()
if export is not None:
export_state = self.ExportStateCache(export_task=export)
else:
return None
try:
auth = (export_state.export_task.target_user,
export_state.export_task.target_password)
target_url = export_state.export_task.target_url
if export_state.use_cache:
response = post(url=target_url,
json={'type': 'cached_entity',
'cached_entity_id': export_state.entity_id},
auth=auth)
else:
response = post(url=target_url,
json=export_state.entity_dict,
auth=auth)
response.raise_for_status()
json = response.json()
state = json['state']
if state == 'complete':
return export_state.fully_processed()
elif state == 'need_status':
status_id = UUID(hex=json['status_id'])
camera_status = self.db.get_camera_status_by_id(status_id)
if camera_status is None:
return export_state.failed()
post(url=target_url,
json={'type': 'status',
'status': camera_status.as_dict()},
auth=auth)
return export_state.partially_processed()
elif state == 'need_file_data':
file_id = UUID(hex=json['file_id'])
file_record = self.db.get_file(file_id=file_id)
if file_record is None:
return export_state.failed()
with open(self.db.file_path_for_id(file_id), 'rb') as file_content:
multi = MultipartEncoder(fields={'file': ('file', file_content, file_record.mime_type)})
post(url="{0}/data/{1}/{2}".format(target_url, file_id.hex, file_record.md5),
data=multi,
headers={'Content-Type': multi.content_type},
auth=auth)
return export_state.partially_processed()
elif state == 'continue':
return export_state.partially_processed()
elif state == 'continue-nocache':
return export_state.partially_processed(use_cache=False)
else:
return export_state.confused()
except HTTPError:
return export_state.failed()
except ConnectionError:
return export_state.failed()
[docs] class ExportStateCache(object):
"""
Used as a continuation when processing a multi-stage export. On sub-task completion, if the export_task is set
to None this is an indication that the task is completed (whether this means it's failed or succeeded, there's
nothing left to do).
"""
[docs] def __init__(self, export_task, state="not_started"):
if export_task is None:
raise ValueError("Export task cannot be none, must be specified on creation")
self.state = state
self.export_task = export_task
self.config_id = export_task.config_id
self.use_cache = False
self.entity_dict = export_task.as_dict()
self.entity_id = export_task.get_entity_id().hex
def fully_processed(self):
self.state = "complete"
self.export_task.set_status(0)
self.export_task = None
self.entity_dict = None
return self
def partially_processed(self, use_cache=True):
self.state = "partial"
self.use_cache = use_cache
return self
def failed(self):
self.state = "failed"
self.export_task = None
self.entity_dict = None
return self
def confused(self):
self.state = "confused"
self.export_task = None
self.entity_dict = None
return self
[docs]class EventExportTask(object):
"""
Represents a single active Event export, providing methods to get the underlying :class:`meteorpi_model.Event`,
the :class:`meteorpi_model.ExportConfiguration` and to update the completion state in the database.
"""
[docs] def __init__(self, db, config_id, config_internal_id, event_id, event_internal_id, timestamp, status, target_url,
target_user, target_password):
self.db = db
self.config_id = config_id
self.config_internal_id = config_internal_id
self.event_id = event_id
self.event_internal_id = event_internal_id
self.timestamp = timestamp
self.status = status
self.target_url = target_url
self.target_user = target_user
self.target_password = target_password
def get_event(self):
return self.db.get_event(self.event_id)
def get_export_config(self):
return self.db.get_export_configuartion(self.config_id)
def as_dict(self):
return {
'type': 'event',
'event': self.get_event().as_dict()
}
def get_entity_id(self):
return self.event_id
def set_status(self, status):
with closing(self.db.con.trans()) as transaction:
with closing(transaction.cursor()) as cur:
cur.execute('UPDATE t_eventExport x '
'SET x.exportState = (?) '
'WHERE x.eventID = (?) AND x.exportConfig = (?)',
(status, self.event_internal_id, self.config_internal_id))
transaction.commit()
[docs]class FileExportTask(object):
"""
Represents a single active FileRecord export, providing methods to get the underlying
:class:`meteorpi_model.FileRecord`, the :class:`meteorpi_model.ExportConfiguration` and to update the completion
state in the database.
"""
[docs] def __init__(self, db, config_id, config_internal_id, file_id, file_internal_id, timestamp, status, target_url,
target_user, target_password):
self.db = db
self.config_id = config_id
self.config_internal_id = config_internal_id
self.file_id = file_id
self.file_internal_id = file_internal_id
self.timestamp = timestamp
self.status = status
self.target_url = target_url
self.target_user = target_user
self.target_password = target_password
def get_file(self):
return self.db.get_file(self.file_id)
def get_export_config(self):
return self.db.get_export_configuartion(self.config_id)
def get_entity_id(self):
return self.file_id
def as_dict(self):
return {
'type': 'file',
'file': self.get_file().as_dict()
}
def set_status(self, status):
with closing(self.db.con.trans()) as transaction:
with closing(transaction.cursor()) as cur:
cur.execute('UPDATE t_fileExport x '
'SET x.exportState = (?) '
'WHERE x.fileID = (?) AND x.exportConfig = (?)',
(status, self.file_internal_id, self.config_internal_id))
transaction.commit()