Open Table Of Contents

Source code for bridgedb.schedule

# -*- coding: utf-8 ; test-case-name: bridgedb.test.test_schedule -*-
#
# This file is part of BridgeDB, a Tor bridge distribution system.
#
# :authors: Nick Mathewson
#           Isis Lovecruft <isis@torproject.org> 0xa3adb67a2cdb8b35
# :copyright: (c) 2007-2015, The Tor Project, Inc.
#             (c) 2014-2015, Isis Lovecruft
# :license: see LICENSE for licensing information

"""This module implements functions for dividing time into chunks.

.. inheritance-diagram:: UnknownInterval Unscheduled ScheduledInterval
    :parts: 1
"""

import calendar

import math

from datetime import datetime

from zope import interface
from zope.interface import implementer
from zope.interface import Attribute


#: The known time intervals (or *periods*) for dividing time by.
KNOWN_INTERVALS = ["second", "minute", "hour", "day", "week", "month"]


[docs]class UnknownInterval(ValueError): """Raised if an interval isn't one of the :data:`KNOWN_INTERVALS`."""
[docs]def toUnixSeconds(timestruct): """Convert a datetime struct to a Unix timestamp in seconds. :type timestruct: :any:`datetime.datetime` :param timestruct: A ``datetime`` object to convert into a timestamp in Unix Era seconds. :rtype: int """ return calendar.timegm(timestruct)
[docs]def fromUnixSeconds(timestamp): """Convert a Unix timestamp to a datetime struct. :param int timestamp: A timestamp in Unix Era seconds. :rtype: :any:`datetime.datetime` """ return datetime.fromtimestamp(timestamp)
[docs]class ISchedule(interface.Interface): """An ``Interface`` specification for a Schedule.""" intervalPeriod = Attribute( "The type of period which this Schedule's intervals will rotate by.") intervalCount = Attribute( "Number of **intervalPeriod**s before rotation to the next interval") def intervalStart(when=None): """Get the start time of the interval that contains **when**.""" def getInterval(when=None): """Get the interval which includes an arbitrary **when**.""" def nextIntervalStarts(when=None): """Get the start of the interval after the one containing **when**."""
@implementer(ISchedule)
[docs]class Unscheduled(object): """A base ``Schedule`` that has only one period that contains all time. >>> from bridgedb.schedule import fromUnixSeconds >>> from bridgedb.schedule import Unscheduled >>> timestamp = 1427769526 >>> str(fromUnixSeconds(timestamp)) '2015-03-31 02:38:46' >>> sched = Unscheduled() >>> start = sched.intervalStart(timestamp) >>> start -62135596800 >>> str(fromUnixSeconds(start)) '0001-01-01 00:00:00' >>> sched.getInterval(timestamp) '1970-01-01 00:00:00' >>> next = sched.nextIntervalStarts(timestamp) >>> next 253402300799 >>> str(fromUnixSeconds(next)) '9999-12-31 23:59:59' """ def __init__(self, count=None, period=None): """Create a schedule for dividing time into intervals. :param int count: The total number of **period** in one interval. :param str period: One of the periods in :data:`KNOWN_INTERVALS`. """ self.intervalCount = count self.intervalPeriod = period
[docs] def intervalStart(self, when=0): """Get the start time of the interval that contains **when**. :param int when: The time which we're trying to find the corresponding interval for. :rtype: int :returns: The Unix epoch timestamp for the start time of the interval that contains **when**. """ return toUnixSeconds(datetime.min.timetuple())
[docs] def getInterval(self, when=0): """Get the interval that contains the time **when**. .. note: We explicitly ignore the ``when`` parameter in this implementation because if something is Unscheduled then all timestamps should reside within the same period. :param int when: The time which we're trying to find the corresponding interval for. :rtype: str :returns: A timestamp in the form ``YEAR-MONTH[-DAY[-HOUR]]``. It's specificity depends on what type of interval we're using. For example, if using ``"month"``, the return value would be something like ``"2013-12"``. """ return fromUnixSeconds(0).strftime('%04Y-%02m-%02d %02H:%02M:%02S')
[docs] def nextIntervalStarts(self, when=0): """Return the start time of the interval starting _after_ when. :rtype: int :returns: Return the Y10K bug. """ return toUnixSeconds(datetime.max.timetuple())
@implementer(ISchedule)
[docs]class ScheduledInterval(Unscheduled): """An class that splits time into periods, based on seconds, minutes, hours, days, weeks, or months. >>> from bridgedb.schedule import fromUnixSeconds >>> from bridgedb.schedule import ScheduledInterval >>> timestamp = 1427769526 >>> str(fromUnixSeconds(timestamp)) '2015-03-31 02:38:46' >>> sched = ScheduledInterval(5, 'minutes') >>> start = sched.intervalStart(timestamp) >>> start 1427769300 >>> current = sched.getInterval(timestamp) >>> current '2015-03-31 02:35:00' >>> current == str(fromUnixSeconds(start)) True >>> next = sched.nextIntervalStarts(timestamp) >>> next 1427769600 >>> str(fromUnixSeconds(next)) '2015-03-31 02:40:00' >>> later = 1427771057 >>> str(fromUnixSeconds(later)) '2015-03-31 03:04:17' >>> sched.getInterval(later) '2015-03-31 03:00:00' :ivar str intervalPeriod: One of the :data:`KNOWN_INTERVALS`. :ivar int intervalCount: The number of times :attr:`intervalPeriod` should be repeated within an interval. """ def __init__(self, count=None, period=None): """Create a schedule for dividing time into intervals. :type count: :any:`int` or :any:`str` :param count: The total number of **period** in one interval. :param str period: One of the periods in :data:`KNOWN_INTERVALS`. """ super(ScheduledInterval, self).__init__(count, period) self._setIntervalCount(count) self._setIntervalPeriod(period)
[docs] def _setIntervalCount(self, count=None): """Set our :attr:`intervalCount`. .. attention:: This method should be called *before* :meth:`_setIntervalPeriod`, because the latter may change the count, if it decides to change the period (for example, to simplify things by changing weeks into days). :param int count: The number of times the :attr:`intervalPeriod` should be repeated during the interval. Defaults to ``1``. :raises UnknownInterval: if the specified **count** was invalid. """ try: if not count > 0: count = 1 count = int(count) except (TypeError, ValueError): raise UnknownInterval("%s.intervalCount: %r ist not an integer." % (self.__class__.__name__, count)) self.intervalCount = count
[docs] def _setIntervalPeriod(self, period=None): """Set our :attr:`intervalPeriod`. :param str period: One of the :data:`KNOWN_INTERVALS`, or its plural. Defaults to ``'hour'``. :raises UnknownInterval: if the specified **period** is unknown. """ if not period: period = 'hour' try: period = period.lower() # Depluralise the period if necessary, i.e., "months" -> "month". if period.endswith('s'): period = period[:-1] if not period in KNOWN_INTERVALS: raise ValueError except (TypeError, AttributeError, ValueError): raise UnknownInterval("%s doesn't know about the %r interval type." % (self.__class__.__name__, period)) self.intervalPeriod = period if period == 'week': self.intervalPeriod = 'day' self.intervalCount *= 7
[docs] def intervalStart(self, when=0): """Get the start time of the interval that contains **when**. :param int when: The time which we're trying to determine the start of interval that contains it. This should be given in Unix seconds, for example, taken from :func:`calendar.timegm`. :rtype: int :returns: The Unix epoch timestamp for the start time of the interval that contains **when**. """ # Convert `when`s which are floats, i.e. from time.time(), to ints: when = int(math.ceil(when)) if self.intervalPeriod == 'month': # For months, we always start at the beginning of the month. date = fromUnixSeconds(when) months = (date.year * 12) + (date.month - 1) months -= (months % self.intervalCount) month = months % 12 + 1 return toUnixSeconds((months // 12, month, 1, 0, 0, 0)) elif self.intervalPeriod == 'day': # For days, we start at the beginning of a day. when -= when % (86400 * self.intervalCount) return when elif self.intervalPeriod == 'hour': # For hours, we start at the beginning of an hour. when -= when % (3600 * self.intervalCount) return when elif self.intervalPeriod == 'minute': when -= when % (60 * self.intervalCount) return when elif self.intervalPeriod == 'second': when -= when % self.intervalCount return when
[docs] def getInterval(self, when=0): """Get the interval that contains the time **when**. >>> import calendar >>> from bridgedb.schedule import ScheduledInterval >>> sched = ScheduledInterval(1, 'month') >>> when = calendar.timegm((2007, 12, 12, 0, 0, 0)) >>> sched.getInterval(when) '2007-12' >>> then = calendar.timegm((2014, 05, 13, 20, 25, 13)) >>> sched.getInterval(then) '2014-05' :param int when: The time which we're trying to find the corresponding interval for. Given in Unix seconds, for example, taken from :func:`calendar.timegm`. :rtype: str :returns: A timestamp in the form ``YEAR-MONTH[-DAY[-HOUR]]``. It's specificity depends on what type of interval we're using. For example, if using ``"month"``, the return value would be something like ``"2013-12"``. """ date = fromUnixSeconds(self.intervalStart(when)) fstr = "%04Y-%02m" if self.intervalPeriod != 'month': fstr += "-%02d" if self.intervalPeriod != 'day': fstr += " %02H" if self.intervalPeriod != 'hour': fstr += ":%02M" if self.intervalPeriod == 'minute': fstr += ":%02S" return date.strftime(fstr)
[docs] def nextIntervalStarts(self, when=0): """Return the start time of the interval starting _after_ when. :returns: The Unix epoch timestamp for the start time of the interval that contains **when**. """ seconds = self.intervalStart(when) if self.intervalPeriod == 'month': date = fromUnixSeconds(seconds) year = date.year months = date.month + self.intervalCount if months > 12: year = date.year + 1 months = months - 12 return toUnixSeconds((year, months, 1, 0, 0, 0)) elif self.intervalPeriod == 'day': return seconds + (86400 * self.intervalCount) elif self.intervalPeriod == 'hour': return seconds + (3600 * self.intervalCount) elif self.intervalPeriod == 'minute': return seconds + (60 * self.intervalCount) elif self.intervalPeriod == 'second': return seconds + self.intervalCount