1 import threading
2 import logging
3
4 from concurrent_tree_crawler.common.threads.sleep import Sleep
5
7 """Interface of a token bucket"""
8
10 """Remove single token from the bucket"""
11 raise NotImplementedError
12
14 """
15 @param no: number of tokens to add to the bucket
16 """
17 raise NotImplementedError
18
20 """Token bucket with an infinite supply of tokens"""
21
24
27
30 self.__token_capacity = token_capacity
31 self.__cond = threading.Condition()
32 self.__tokens_no = 0
33
35 self.__cond.acquire()
36 while not self.__tokens_no>0:
37 self.__cond.wait()
38 self.__tokens_no = self.__tokens_no-1
39 self.__cond.release()
40
42 self.__cond.acquire()
43 self.__tokens_no = min(self.__tokens_no+no, self.__token_capacity)
44 self.__cond.notifyAll()
45 self.__cond.release()
46
48 """This thread fills the token bucket every C{fill_interval_in_secs} seconds
49 with C{fill_amount} number of tokens."""
50
51 - def __init__(self, bucket, fill_interval_in_secs, fill_amount):
52 """
53 @type bucket: L{TokenBucket}
54 """
55 threading.Thread.__init__(self)
56 self.__bucket = bucket
57 self.__fill_interval = fill_interval_in_secs
58 self.__fill_amount = fill_amount
59 self.__should_stop = False
60 self.__sleep = Sleep()
61
63 try:
64 while not self.__should_stop:
65 self.__bucket.put_tokens(self.__fill_amount)
66 self.__sleep.sleep(self.__fill_interval)
67 except Exception as ex:
68 logging.error('In TokenBucketFiller, an exception was caught: '
69 '"{}"'.format(str(ex)))
70
71
72
73
75 self.__should_stop = True
76 self.__sleep.wake_up()
77 self.join()
78