Source code for bytestag.queue
'''Specialized queues'''
# This file is part of Bytestag.
# Copyright © 2012 Christopher Foo <chris.foo@gmail.com>.
# Licensed under GNU GPLv3. See COPYING.txt for details.
from bytestag.events import asynchronous
import atexit
import contextlib
import os.path
import pickle
import queue
import sqlite3
import tempfile
import threading
__docformat__ = 'restructuredtext en'
[docs]class BigDiskQueue(object):
'''A queue that spools onto disk when needed.
The core functionality is similar to :class:`queue.Queue`.
'''
def __init__(self, memory_size=100):
self._queue = queue.Queue(memory_size)
self._event = threading.Event()
self._tables_created = False
self._loop()
@contextlib.contextmanager
def _connection(self):
con = sqlite3.connect(self._path, isolation_level='DEFERRED',
detect_types=sqlite3.PARSE_DECLTYPES)
con.row_factory = sqlite3.Row
con.execute('PRAGMA synchronous=OFF')
con.execute('PRAGMA journal_mode=WAL')
# con.execute('PRAGMA foreign_keys = ON')
with con:
yield con
def _create_tables(self):
self._tables_created = True
self._temp_dir = tempfile.TemporaryDirectory(suffix='-queue')
self._path = os.path.join(self._temp_dir.name, 'queue.db')
# FIXME: tempdir isn't being cleaned, perhaps problem with threads
atexit.register(self._temp_dir.cleanup)
with self._connection() as con:
con.execute('CREATE TABLE IF NOT EXISTS queue '
'(id INTEGER PRIMARY KEY, pickle BLOB NOT NULL)')
[docs] def put(self, item, block=None, timeout=None):
'''Put an item on the queue.
This function is nonblocking. Parameters are provided for
compatibility with :class:`queue.Queue`.
'''
try:
self._queue.put_nowait(item)
except queue.Full:
if not self._tables_created:
self._create_tables()
self._put_database(item)
[docs] def put_nowait(self, item):
'''Put an item on the queue.'''
return self.put(item, False)
[docs] def get(self, block=True, timeout=None):
'''Get an item from the queue.'''
item = self._queue.get(block, timeout)
if self._tables_created:
self._event.set()
return item
[docs] def get_nowait(self):
'''Get an item from the queue without blocking.'''
return self.get(False)
def _put_database(self, item):
data = pickle.dumps(item)
with self._connection() as con:
con.execute('INSERT INTO queue (pickle) VALUES (?)', (data,))
@asynchronous(name='BigDiskQueue loop')
def _loop(self):
while True:
self._event.wait()
self._event.clear()
row_id = None
with self._connection() as con:
cur = con.execute('SELECT id, pickle FROM queue LIMIT 1')
for row in cur:
item = pickle.loads(row['pickle'])
row_id = row['id']
if not row_id:
continue
try:
self._queue.put_nowait(item)
except queue.Full:
continue
else:
with self._connection() as con:
con.execute('DELETE FROM queue WHERE ID = ?', (row_id,))