"""
Copyright (c) 2011, 2012, Regents of the University of California
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
- Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
"""
"""
@author Stephen Dawson-Haggerty <stevedh@eecs.berkeley.edu>
"""
import os
import urlparse
from twisted.internet import reactor, task, defer, threads
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.python import log
import logging
import util
import core
import disklog
import sjson as json
from contrib import client
from smap.formatters import get_formatter
try:
from smap.ssl import SslClientContextFactory
except ImportError:
log.err("Failed to import ssl... only HTTP delivery will be available")
# this is the largest number of records we will store.
BUFSIZE_LIMIT = 100000
# the most records to pack into a single log entry/message to the server
REPORT_RECORD_LIMIT = 10000
def reporting_copy(obj):
if isinstance(obj, dict):
rv = dict(obj)
for k in rv.keys():
rv[k] = reporting_copy(rv[k])
return rv
elif isinstance(obj, list):
rv = list(obj)
for i in xrange(0, len(rv)):
rv[i] = reporting_copy(rv[i])
return rv
else:
return obj
"""Iterate over all the colletions and timeseries contained in a
report object.
"""
def reporting_map(rpt, col_cb, ts_cb):
q = ['/']
while len(q) > 0:
cur_path = util.norm_path(q.pop(0))
cur = rpt.get(cur_path, None)
if not cur: continue
if 'Contents' in cur:
for pc in cur['Contents']:
q.append(cur_path + '/' + pc)
col_cb(cur_path, cur)
else:
ts_cb(cur_path, cur)
del rpt[cur_path]
for p, v in rpt.iteritems():
if 'Contents' in v:
col_cb(p, v)
else:
ts_cb(p, v)
def is_https_url(url):
return urlparse.urlparse(url).scheme == 'https'
class DataBuffer:
"""Buffer outgoing data.
This buffer class allows you to add readings which are stored into
a circular buffer. It supports split-phase adding and delivery by
returning a "truncation specification" along with the data so that
if you later wish to shorten the buffer to remove previously read
data, you can do that. Note that since this is a circular buffer,
that may mean removing less data than you read back (some may have
been overwritten).
"""
def __init__(self, datadir):
"""
:param int max_size: the maximum total log size, in records
"""
log.msg("Create databuffer " + datadir)
self.datadir = datadir
self.data = disklog.DiskLog(datadir)
self.head_metric = self.metric(self.data.tail())
# if we do a read, we have to add a new log record so we don't
# keep appending to the same one
self.clear = False
def __str__(self):
return "DataBuffer len: %i" % len(self.data)
def __len__(self):
return len(self.data)
def sync(self):
self.data.sync()
def metric(self, val):
if hasattr(val, '__iter__'):
if 'Readings' in val:
val_metric = len(val['Readings'])
else: val_metric = sum(map(len, val))
else:
val_metric = 1
return val_metric
def add(self, key, val):
"""Enqueue a new object for delivery with a subscription
:param string key: The key for the data stream
:param string val: The new value for the object. Copied.
"""
tail = self.data.tail()
# length of the head
val_metric = self.metric(val)
if tail == None or self.clear:
self.data.append({key: reporting_copy(val)})
self.head_metric = val_metric
self.clear = False
elif 'Contents' in val and len(val['Contents']) == 0:
# okay to skip b/c it doesn't apply to anything
pass
elif key in tail and len(val) == 2 and \
'Readings' in val and 'uuid' in val:
if not 'Readings' in tail[key]:
tail[key]['Readings'] = []
if self.head_metric < REPORT_RECORD_LIMIT:
tail[key]['Readings'].extend(val['Readings'])
self.data.update_tail(tail)
self.head_metric += val_metric
else:
self.data.append({key: reporting_copy(val)})
self.head_metric = val_metric
# really this might just want to merge updates...
elif key in tail and val == tail[key]:
pass
elif not key in tail and self.head_metric < REPORT_RECORD_LIMIT:
tail[key] = reporting_copy(val)
self.data.update_tail(tail)
self.head_metric += val_metric
else:
self.data.append({key: reporting_copy(val)})
self.head_metric = val_metric
def truncate(self):
"""Truncate a set of readings based on the sequence number
stored from a previous read"""
self.data.pop()
def read(self):
"""Read n points (per stream) back as a flat list. AlsoFi
return a truncation specification so we can remove this data
later if desired
:rvalue: readings, tspec. ``readings`` is the reading object
you can send to a consumer. ``tspec`` can be passed to
``truncate()`` and will removme exactly the data you just read
-- note this is necessary because the log is a circular buffer
and may have wrapped while you were processing the readings.
"""
if len(self.data) > 0:
self.clear = True
return self.data.head()
else:
raise core.SmapException("No Pending Data!")
class ReportInstance(dict):
"""Represent the stored state pending for one report destination
"""
def __init__(self, datadir, *args):
dict.__init__(self, *args)
if not 'MinPeriod' in self:
self['MinPeriod'] = 0
if not 'MaxPeriod' in self:
self['MaxPeriod'] = 2 ** 31 - 1
self['DataDir'] = datadir
self['PendingData'] = DataBuffer(datadir)
self['ReportDeliveryIdx'] = 0
self['LastAttempt'] = 0
self['LastSuccess'] = 0
self['Busy'] = False
def deliverable(self):
"""Check if attempt should be called
:rvalue boolean: True if a report should be sent
"""
now = util.now()
if self.get('Paused', False): return False
return (now - self['LastSuccess'] > self['MaxPeriod']) or \
(len(self['PendingData']) > 0 and \
(now - self['LastSuccess']) > self['MinPeriod'])
def _log_response(self, resp, url):
def response_printer(resp):
log.msg("Reply from %s: '%s'" % (url,resp.strip()))
done = defer.Deferred()
resp.deliverBody(util.BufferProtocol(done))
done.addCallback(response_printer)
return done
def _success(self, resp):
self['Busy'] = False
if resp.code in [200, 201, 204]:
# on success record the time and remove the data
self['LastSuccess'] = util.now()
self['PendingData'].truncate()
if len(self['PendingData']) > 0:
# this causes a new deferred to get added to
# the chain, so we continue immediately at the
# next log position if this one worked.
return self.attempt()
else:
return resp
else:
# but most HTTP codes indicate a failure
log.msg("Report delivery to %s returned %i" % (
self['ReportDeliveryLocation'][self['ReportDeliveryIdx']],
resp.code))
response = self._log_response(resp,
self['ReportDeliveryLocation'][self['ReportDeliveryIdx']])
self['ReportDeliveryIdx'] = ((self['ReportDeliveryIdx'] + 1) %
len(self['ReportDeliveryLocation']))
return response
def _failure(self, fail):
self['Busy'] = False
try:
log.msg("Report delivery to %s failed: %s" %
(self['ReportDeliveryLocation'][self['ReportDeliveryIdx']],
str(fail.value)))
self['ReportDeliveryIdx'] = ((self['ReportDeliveryIdx'] + 1) %
len(self['ReportDeliveryLocation']))
except:
log.err()
return fail
def attempt(self):
"""Try to make a delivery
:rvalue: a :py:class:`Deferred` of the attempt
"""
if ('Busy' in self and self['Busy']) or \
('Paused' in self and self['Paused']):
return
try:
data = self['PendingData'].read()
if data == None:
self['PendingData'].truncate()
return
except:
log.err()
return
self['LastAttempt'] = util.now()
log.msg("publishing to %s: %i %s" %
(self['ReportDeliveryLocation'][self['ReportDeliveryIdx']],
len(data),
str([len(x['Readings'])
for x in data.itervalues()
if 'Readings' in x])),
logLevel=logging.DEBUG)
# set up an agent to push the data to the consumer
dest_url = self['ReportDeliveryLocation'][self['ReportDeliveryIdx']]
if is_https_url(dest_url):
agent = Agent(reactor, SslClientContextFactory(self))
else:
agent = Agent(reactor)
try:
formatter = get_formatter(self.get('Format', 'json'))
headers = {'Content-type' : [formatter.content_type]}
if formatter.content_encoding:
headers['Content-encoding'] = [formatter.content_encoding]
d = agent.request('POST',
dest_url,
Headers(headers),
formatter(data))
except:
log.err()
return
self['Busy'] = True
d.addCallback(self._success)
d.addErrback(self._failure)
return d
[docs]class Reporting:
[docs] def __init__(self, inst, autoflush=1.0, reportfile=None, max_size=BUFSIZE_LIMIT):
"""Create a new report manager, responsible for delivering
data to subscribers. Buffers data on disk until it can be
delivered, storing up to *max_size* points per timeseries.
:param inst: the :py:class:`~smap.core.SmapInstance` we'll be
delivering reports for.
:param float autoflush: how often to call flush, which attempts
to deliver all data.
:param string reportfile: backing store for reporting instances
and data which hasn't been delivered yet.
:param int max_size: the maximum number of points we will buffer for
any stream.
"""
self.inst = inst
self.subscribers = []
self.reportfile = reportfile
self.max_size = max_size
self.autoflush = autoflush
if self.reportfile:
self.load_reports()
if autoflush != None:
# add a blank callback to the deferred to kill off any errors
def flush():
d = self._flush()
if d: d.addBoth(lambda _ : None)
self.t = task.LoopingCall(flush)
reactor.callLater(autoflush, self.t.start, autoflush)
# add a shutdown handler so we save the final reports after exiting
if reportfile:
reactor.addSystemEventTrigger('after', 'shutdown',
self.save_reports)
def get_report(self, id):
return util.find(lambda item: item['uuid'] == id, self.subscribers)
def add_report(self, rpt):
dir = os.path.join(self.reportfile + '-reports',
str(rpt['uuid']))
rpt['ReportDeliveryLocation'] = map(str, rpt['ReportDeliveryLocation'])
report_instance = ReportInstance(dir, rpt)
log.msg("Creating report -- dest is %s" % str(rpt['ReportDeliveryLocation']))
self._update_subscriptions(report_instance)
self.subscribers.append(report_instance)
# publish the full data set when we add a subscription so we
# can compress from here
for k, v in self.inst.lookup(report_instance['ReportResource']).iteritems():
self.publish(k, v)
def del_report(self, id):
rpt = self.get_report(id)
if rpt:
log.msg("removing report " + str(id))
del self.subscribers[self.subscribers.index(rpt)]
return rpt
def update_report(self, rpt):
"""Copy in fields from a new reporting object
"""
cur = self.get_report(rpt['uuid'])
log.msg("updating report %s: dest is now %s" % (
str(rpt['uuid']),
str(rpt['ReportDeliveryLocation'])))
if cur:
cur.update(rpt)
self._update_subscriptions(cur)
return True
return False
def _update_subscriptions(self, sub):
result = self.inst.lookup(sub['ReportResource'])
if isinstance(result, dict):
sub['Topics'] = set(result.iterkeys())
elif isinstance(result, dict) and 'uuid' in result:
sub['Topics'] = set([getattr(result, 'path')])
else:
sub['Topics'] = set()
def update_subscriptions(self):
"""Should be called whenever the set of resources changes so we can
update the list of uuids for each subscriber."""
map(self._update_subscriptions, self.subscribers)
def publish(self, path, val, prepend=False):
"""Publish a new reading to the stream identified by a path.
Not thread safe.
"""
path = util.norm_path(path)
for sub in self.subscribers:
if path in sub['Topics']:
sub['PendingData'].add(path, val)
def load_reports(self):
self.subscribers = util.pickle_load(self.reportfile)
if self.subscribers == None:
self.subscribers = []
for s in self.subscribers:
s['Busy'] = False
s['Paused'] = False
if not 'Format' in s:
s['Format'] = 'json'
def save_reports(self, *args):
"""Save reports while holding the filesystem lock.
"""
util.pickle_dump(self.reportfile, self.subscribers)
for s in self.subscribers:
s['PendingData'].sync()
if len(args) == 1:
return args[0]
else:
return
[docs] def _flush(self, force=False):
"""Send out json-packed report objects to registered listeners.
:param boolean force: if True, ignore ``MinPeriod``/``MaxPeriod``
and force the reporting metadata to disk
:rtype: a :py:class:`twisted.internet.task.DeferredList`
instance which will fire when deliver to all subscribers has
finished, or errBack when any fail
"""
deferList, deleteList = [], []
for sub in self.subscribers:
now = util.now()
if sub.get('ExpireTime', now) < now:
# remove expired reports
deleteList.append(sub['uuid'])
# either we've gone too long without trying and so need to
# deliver a report or else we have new data and have
# waited for at least MinPeriod since the last report.
elif force or sub.deliverable():
d = defer.maybeDeferred(sub.attempt)
# we don't need an errBack for this case since we want
# to propagate the error and don't need to do any
# cleanup
deferList.append(d)
map(self.del_report, deleteList)
d = defer.DeferredList(deferList, fireOnOneErrback=True, consumeErrors=True)
if force: d.addBoth(self.save_reports)
return d
[docs] def flush(self):
"""Causes delivery to be attempted for all non-busy reports
now. Threaded version; blocks until :py:meth:`_flush` finishes.
"""
return threads.blockingCallFromThread(reactor, self.inst._flush)
def pause(self):
for s in self.subscribers:
s['Paused'] = True
def unpause(self):
for s in self.subscribers:
s['Paused'] = False
return self._flush()