kafka_replayer module
import kafka import logging import six import time class KafkaReplayer(object): """A utility that allows replaying Kafka records by time range.""" def __init__(self, topic_name, partitions=None, **configs): """Create the replayer. Args: topic_name: The topic to replay partitions: Optionally specify the set of partitions (ints) to replay configs: The configuration kwargs to pass forward to kafka.KafkaConsumer """ if not topic_name: raise ValueError('topic_name is required') self._topic_name = topic_name self._partitions = partitions self._configs = self._configs_with_defaults(configs) self._log_interval = 10000 self._logger = self._create_logger() def _create_logger(self): self._logger = logging.getLogger(__name__) null_handler = logging.NullHandler() null_handler.setLevel(logging.DEBUG) self._logger.addHandler(null_handler) return self._logger def _configs_with_defaults(self, configs): if 'group_id' not in configs: configs['group_id'] = None if 'consumer_timeout_ms' not in configs: configs['consumer_timeout_ms'] = 10000 return configs def _get_time_millis(self): return int(round(time.time() * 1000)) def _create_consumer(self): return kafka.KafkaConsumer(**(self._configs)) def _find_seek_points(self, start_time): seek_points = {} consumer = self._create_consumer() try: topic_partitions = self._topic_partitions_for_set(self._all_partitions_set(consumer)) for topic_partition in topic_partitions: # We need to compute the offset independently for each partition via binary search consumer.assign([topic_partition]) consumer.seek_to_end(topic_partition) end_offset = consumer.position(topic_partition) consumer.seek_to_beginning(topic_partition) start_offset = consumer.position(topic_partition) target_offset = self._binary_search(consumer, topic_partition, start_offset, end_offset, start_time) seek_points[topic_partition] = target_offset self._logger.debug('Start offset for {0} is {1}'.format( topic_partition, target_offset)) self._logger.info('Start offsets: {0}'.format(seek_points)) return seek_points finally: consumer.close() def _all_partitions_set(self, consumer): all_partitions = consumer.partitions_for_topic(self._topic_name) partitions = None if self._partitions: partitions = all_partitions.intersection(self._partitions) else: partitions = all_partitions return partitions def _topic_partitions_for_set(self, partition_set): return [kafka.TopicPartition(self._topic_name, p) for p in partition_set] def _get_next_if_available(self, consumer): record = None try: record = next(consumer) except StopIteration: self._logger.debug('Got StopIteration, leaving the record as None') return record def _binary_search(self, consumer, tp, start, end, target_time): # Overall goal: find the earliest offset that is no earlier than the target time if start == end: return start insertion_point = int(start + ((end - start) / 2)) consumer.seek(tp, insertion_point) record = self._get_next_if_available(consumer) if record: ts = record.timestamp if insertion_point == start: return start if target_time <= ts else end elif ts < target_time: return self._binary_search(consumer, tp, insertion_point + 1, end, target_time) else: return self._binary_search(consumer, tp, start, insertion_point, target_time) return start def replay(self, start_time, end_time): """Replay all specified partitions over the specified time range (inclusive). Args: start_time: The start timestamp in milliseconds end_time: The end timestamp in milliseconds Yields: The next ConsumerRecord found within the given time range Raises: ValueError: If the specified start or end time is invalid """ if start_time < 0: raise ValueError('start_time must be non-negative') if end_time < 0: raise ValueError('end_time must be non-negative') if start_time > self._get_time_millis(): raise ValueError('start_time must not be in the future') if start_time > end_time: raise ValueError('end_time must be at least start_time') count = 0 last_timestamp = 0 seek_points = self._find_seek_points(start_time) consumer = self._create_consumer() try: # Set up a the consumer to fetch all desired partitions from their seek points partitions = self._all_partitions_set(consumer) partition_list = self._topic_partitions_for_set(partitions) consumer.assign(partition_list) for tp, offset in six.iteritems(seek_points): consumer.seek(tp, offset) while len(partitions) > 0: record = self._get_next_if_available(consumer) if not record: self._logger.info('No more records available. Terminating.') partitions = set() else: last_timestamp = record.timestamp if last_timestamp > end_time: # Since partitions are ordered, if we see a too-new timestamp, mark the # partition complete. partitions.discard(record.partition) tp = kafka.TopicPartition(topic=record.topic, partition=record.partition) if tp not in consumer.paused(): consumer.pause(tp) self._logger.debug('Completed partition {0}'.format(tp)) elif (record.partition in partitions and last_timestamp >= start_time and last_timestamp <= end_time): # Send the record to the client if it's within the specified time range yield record count += 1 if count % self._log_interval == 0: self._logger.debug('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) except Exception as e: self._logger.error('Unexpected exception: {0}'.format(str(e))) finally: self._logger.info('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) consumer.close()
Classes
class KafkaReplayer
A utility that allows replaying Kafka records by time range.
class KafkaReplayer(object): """A utility that allows replaying Kafka records by time range.""" def __init__(self, topic_name, partitions=None, **configs): """Create the replayer. Args: topic_name: The topic to replay partitions: Optionally specify the set of partitions (ints) to replay configs: The configuration kwargs to pass forward to kafka.KafkaConsumer """ if not topic_name: raise ValueError('topic_name is required') self._topic_name = topic_name self._partitions = partitions self._configs = self._configs_with_defaults(configs) self._log_interval = 10000 self._logger = self._create_logger() def _create_logger(self): self._logger = logging.getLogger(__name__) null_handler = logging.NullHandler() null_handler.setLevel(logging.DEBUG) self._logger.addHandler(null_handler) return self._logger def _configs_with_defaults(self, configs): if 'group_id' not in configs: configs['group_id'] = None if 'consumer_timeout_ms' not in configs: configs['consumer_timeout_ms'] = 10000 return configs def _get_time_millis(self): return int(round(time.time() * 1000)) def _create_consumer(self): return kafka.KafkaConsumer(**(self._configs)) def _find_seek_points(self, start_time): seek_points = {} consumer = self._create_consumer() try: topic_partitions = self._topic_partitions_for_set(self._all_partitions_set(consumer)) for topic_partition in topic_partitions: # We need to compute the offset independently for each partition via binary search consumer.assign([topic_partition]) consumer.seek_to_end(topic_partition) end_offset = consumer.position(topic_partition) consumer.seek_to_beginning(topic_partition) start_offset = consumer.position(topic_partition) target_offset = self._binary_search(consumer, topic_partition, start_offset, end_offset, start_time) seek_points[topic_partition] = target_offset self._logger.debug('Start offset for {0} is {1}'.format( topic_partition, target_offset)) self._logger.info('Start offsets: {0}'.format(seek_points)) return seek_points finally: consumer.close() def _all_partitions_set(self, consumer): all_partitions = consumer.partitions_for_topic(self._topic_name) partitions = None if self._partitions: partitions = all_partitions.intersection(self._partitions) else: partitions = all_partitions return partitions def _topic_partitions_for_set(self, partition_set): return [kafka.TopicPartition(self._topic_name, p) for p in partition_set] def _get_next_if_available(self, consumer): record = None try: record = next(consumer) except StopIteration: self._logger.debug('Got StopIteration, leaving the record as None') return record def _binary_search(self, consumer, tp, start, end, target_time): # Overall goal: find the earliest offset that is no earlier than the target time if start == end: return start insertion_point = int(start + ((end - start) / 2)) consumer.seek(tp, insertion_point) record = self._get_next_if_available(consumer) if record: ts = record.timestamp if insertion_point == start: return start if target_time <= ts else end elif ts < target_time: return self._binary_search(consumer, tp, insertion_point + 1, end, target_time) else: return self._binary_search(consumer, tp, start, insertion_point, target_time) return start def replay(self, start_time, end_time): """Replay all specified partitions over the specified time range (inclusive). Args: start_time: The start timestamp in milliseconds end_time: The end timestamp in milliseconds Yields: The next ConsumerRecord found within the given time range Raises: ValueError: If the specified start or end time is invalid """ if start_time < 0: raise ValueError('start_time must be non-negative') if end_time < 0: raise ValueError('end_time must be non-negative') if start_time > self._get_time_millis(): raise ValueError('start_time must not be in the future') if start_time > end_time: raise ValueError('end_time must be at least start_time') count = 0 last_timestamp = 0 seek_points = self._find_seek_points(start_time) consumer = self._create_consumer() try: # Set up a the consumer to fetch all desired partitions from their seek points partitions = self._all_partitions_set(consumer) partition_list = self._topic_partitions_for_set(partitions) consumer.assign(partition_list) for tp, offset in six.iteritems(seek_points): consumer.seek(tp, offset) while len(partitions) > 0: record = self._get_next_if_available(consumer) if not record: self._logger.info('No more records available. Terminating.') partitions = set() else: last_timestamp = record.timestamp if last_timestamp > end_time: # Since partitions are ordered, if we see a too-new timestamp, mark the # partition complete. partitions.discard(record.partition) tp = kafka.TopicPartition(topic=record.topic, partition=record.partition) if tp not in consumer.paused(): consumer.pause(tp) self._logger.debug('Completed partition {0}'.format(tp)) elif (record.partition in partitions and last_timestamp >= start_time and last_timestamp <= end_time): # Send the record to the client if it's within the specified time range yield record count += 1 if count % self._log_interval == 0: self._logger.debug('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) except Exception as e: self._logger.error('Unexpected exception: {0}'.format(str(e))) finally: self._logger.info('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) consumer.close()
Ancestors (in MRO)
- KafkaReplayer
- __builtin__.object
Methods
def __init__(
self, topic_name, partitions=None, **configs)
Create the replayer.
Args: topic_name: The topic to replay partitions: Optionally specify the set of partitions (ints) to replay configs: The configuration kwargs to pass forward to kafka.KafkaConsumer
def __init__(self, topic_name, partitions=None, **configs): """Create the replayer. Args: topic_name: The topic to replay partitions: Optionally specify the set of partitions (ints) to replay configs: The configuration kwargs to pass forward to kafka.KafkaConsumer """ if not topic_name: raise ValueError('topic_name is required') self._topic_name = topic_name self._partitions = partitions self._configs = self._configs_with_defaults(configs) self._log_interval = 10000 self._logger = self._create_logger()
def replay(
self, start_time, end_time)
Replay all specified partitions over the specified time range (inclusive).
Args: start_time: The start timestamp in milliseconds end_time: The end timestamp in milliseconds
Yields: The next ConsumerRecord found within the given time range
Raises: ValueError: If the specified start or end time is invalid
def replay(self, start_time, end_time): """Replay all specified partitions over the specified time range (inclusive). Args: start_time: The start timestamp in milliseconds end_time: The end timestamp in milliseconds Yields: The next ConsumerRecord found within the given time range Raises: ValueError: If the specified start or end time is invalid """ if start_time < 0: raise ValueError('start_time must be non-negative') if end_time < 0: raise ValueError('end_time must be non-negative') if start_time > self._get_time_millis(): raise ValueError('start_time must not be in the future') if start_time > end_time: raise ValueError('end_time must be at least start_time') count = 0 last_timestamp = 0 seek_points = self._find_seek_points(start_time) consumer = self._create_consumer() try: # Set up a the consumer to fetch all desired partitions from their seek points partitions = self._all_partitions_set(consumer) partition_list = self._topic_partitions_for_set(partitions) consumer.assign(partition_list) for tp, offset in six.iteritems(seek_points): consumer.seek(tp, offset) while len(partitions) > 0: record = self._get_next_if_available(consumer) if not record: self._logger.info('No more records available. Terminating.') partitions = set() else: last_timestamp = record.timestamp if last_timestamp > end_time: # Since partitions are ordered, if we see a too-new timestamp, mark the # partition complete. partitions.discard(record.partition) tp = kafka.TopicPartition(topic=record.topic, partition=record.partition) if tp not in consumer.paused(): consumer.pause(tp) self._logger.debug('Completed partition {0}'.format(tp)) elif (record.partition in partitions and last_timestamp >= start_time and last_timestamp <= end_time): # Send the record to the client if it's within the specified time range yield record count += 1 if count % self._log_interval == 0: self._logger.debug('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) except Exception as e: self._logger.error('Unexpected exception: {0}'.format(str(e))) finally: self._logger.info('Processed {0} offsets, last timestamp: {1}'.format( count, last_timestamp)) consumer.close()