Source code for bytestag.queue_test
from bytestag.queue import BigDiskQueue
import random
import unittest
[docs]class TestBigDiskQueue(unittest.TestCase):
[docs] def test_queue(self):
'''It should add n and return n items'''
q = BigDiskQueue(memory_size=10)
n = 100
for i in range(n):
q.put(i)
count = 0
for i in range(n):
item = q.get(timeout=1) # @UnusedVariable
count += 1
self.assertEqual(count, n)
[docs] def test_queue_random(self):
'''It should add n and return n items randomly'''
self.maxDiff = None
q = BigDiskQueue(memory_size=10)
n = 100
num_puts = 0
num_gets = 0
q.put(num_puts)
num_puts += 1
rand = random.Random()
rand.seed(0)
l = []
while num_gets < n:
if num_puts < n and rand.randint(0, 1):
q.put(num_puts)
num_puts += 1
if num_puts > num_gets and rand.random() < 0.3:
l.append(q.get(timeout=1))
num_gets += 1
self.assertEqual(n, len(l))
self.assertEqual(list(range(0, n)), list(sorted(l)))