Source code for bytestag.dht.publishing

'''DHT publisher

This module includes classes that scan tables for values to publish
or replicate.
'''
# This file is part of Bytestag.
# Copyright © 2012 Christopher Foo <chris.foo@gmail.com>.
# Licensed under GNU GPLv3. See COPYING.txt for details.
from bytestag.dht.network import DHTNetwork
from bytestag.events import (EventReactorMixin, EventScheduler, EventID,
    asynchronous)
from bytestag.queue import BigDiskQueue
import logging
import threading
import time

__docformat__ = 'restructuredtext en'
_logger = logging.getLogger(__name__)


[docs]class Replicator(EventReactorMixin): '''Replicates values typically stored into the cache by other nodes.''' def __init__(self, event_reactor, dht_network, kvp_table, fn_task_slot): ''' :type event_reactor: :class:`.EventReactor :type dht_network: :class:`.DHTNetwork` :type kvp_table: :class:`.KVPTable` :param fn_task_slot: A slot that represents uploads. :type fn_task_slot: :class:`FnTaskSlot` ''' EventReactorMixin.__init__(self, event_reactor) self._dht_network = dht_network self._kvp_table = kvp_table self._event_scheduler = EventScheduler(event_reactor) self._timer_id = EventID(self, 'Replicate') self._thread_event = threading.Event() self._fn_task_slot = fn_task_slot self._event_reactor.register_handler(self._timer_id, self._timer_cb) self._event_scheduler.add_periodic(DHTNetwork.TIME_REPLICATE, self._timer_id) self._loop() def _timer_cb(self, event_id): self._thread_event.set() @asynchronous(name='Replicate Values') def _loop(self): while True: self._thread_event.wait() _logger.debug('Replicating values') self._thread_event.clear() for kvpid in self._kvp_table.keys(): kvp_record = self._kvp_table.record(kvpid) if kvp_record.is_original: continue if kvp_record.timestamp + kvp_record.time_to_live \ < time.time(): continue _logger.debug('Replicating value %s', kvpid) self._fn_task_slot.add(self._dht_network.store_value, kvpid.key, kvpid.index) _logger.debug('Value replication finished') self._clean_table() def _clean_table(self): if hasattr(self._kvp_table, 'clean'): self._kvp_table.clean() elif hasattr(self._kvp_table, 'tables'): for table in self._kvp_table.tables: if hasattr(table, 'clean'): table.clean()
[docs]class Publisher(EventReactorMixin): '''Publishes values typically created by the client.''' REPUBLISH_CHECK_INTERVAL = 3600 def __init__(self, event_reactor, dht_network, kvp_table, fn_task_slot): ''' :type event_reactor: :class:`.EventReactor :type dht_network: :class:`.DHTNetwork` :type kvp_table: :class:`.KVPTable` :param fn_task_slot: A slot that represents uploads. :type fn_task_slot: :class:`FnTaskSlot` ''' EventReactorMixin.__init__(self, event_reactor) self._dht_network = dht_network self._kvp_table = kvp_table self._event_scheduler = EventScheduler(event_reactor) self._timer_id = EventID(self, 'Publish timer') self._schedule_id = EventID(self, 'Publish schedule') self._scheduled_kvpids = set() self._schedule_lock = threading.Lock() self._scan_event = threading.Event() self._publish_queue = BigDiskQueue() self._fn_task_slot = fn_task_slot self._event_reactor.register_handler(self._schedule_id, self._publish_cb) self._event_reactor.register_handler(self._timer_id, self._timer_cb) self._kvp_table.value_changed_observer.register(self._table_change_cb) self._event_scheduler.add_periodic(Publisher.REPUBLISH_CHECK_INTERVAL, self._timer_cb) self._scan_loop() self._publish_loop() @asynchronous(name='Publish loop') def _publish_loop(self): while True: kvpid = self._publish_queue.get() _logger.debug('Publishing %s', kvpid) self._fn_task_slot.add(self._dht_network.store_value, kvpid.key, kvpid.index) def _schedule_for_publish(self, abs_time, kvpid): with self._schedule_lock: if kvpid in self._scheduled_kvpids: return self._scheduled_kvpids.add(kvpid) self._event_scheduler.add_absolute(abs_time, self._schedule_id, kvpid) def _publish_cb(self, event_id, kvpid): self._publish_queue.put(kvpid) def _table_change_cb(self, *args): self._scan_event.set() def _timer_cb(self, event_id): self._scan_event.set() @asynchronous(name='Publish scan loop') def _scan_loop(self): while True: self._scan_event.wait() _logger.debug('Scanning database for publishing') self._scan_event.clear() current_time = time.time() for kvpid in self._kvp_table.keys(): kvp_record = self._kvp_table.record(kvpid) if not kvp_record.is_original: continue if kvp_record.last_update == 0: republish_time = current_time else: republish_time = \ kvp_record.last_update + DHTNetwork.TIME_REPUBLISH next_interval = Publisher.REPUBLISH_CHECK_INTERVAL if republish_time - next_interval < current_time: self._schedule_for_publish(republish_time, kvpid)