Package concurrent_tree_crawler :: Package common :: Package threads :: Module token_bucket
[hide private]
[frames] | no frames]

Source Code for Module concurrent_tree_crawler.common.threads.token_bucket

 1  import threading 
 2  import logging 
 3   
 4  from concurrent_tree_crawler.common.threads.sleep import Sleep 
 5   
6 -class TokenBucket:
7 """Interface of a token bucket""" 8
9 - def get_token(self):
10 """Remove single token from the bucket""" 11 raise NotImplementedError
12
13 - def put_tokens(self, no):
14 """ 15 @param no: number of tokens to add to the bucket 16 """ 17 raise NotImplementedError
18
19 -class InfiniteTokenBucket(TokenBucket):
20 """Token bucket with an infinite supply of tokens""" 21
22 - def get_token(self):
23 pass
24
25 - def put_tokens(self, no):
26 pass
27
28 -class StandardTokenBucket(TokenBucket):
29 - def __init__(self, token_capacity):
30 self.__token_capacity = token_capacity 31 self.__cond = threading.Condition() 32 self.__tokens_no = 0
33
34 - def get_token(self):
35 self.__cond.acquire() 36 while not self.__tokens_no>0: 37 self.__cond.wait() 38 self.__tokens_no = self.__tokens_no-1 39 self.__cond.release()
40
41 - def put_tokens(self, no):
42 self.__cond.acquire() 43 self.__tokens_no = min(self.__tokens_no+no, self.__token_capacity) 44 self.__cond.notifyAll() 45 self.__cond.release()
46
47 -class TokenBucketFiller(threading.Thread):
48 """This thread fills the token bucket every C{fill_interval_in_secs} seconds 49 with C{fill_amount} number of tokens.""" 50
51 - def __init__(self, bucket, fill_interval_in_secs, fill_amount):
52 """ 53 @type bucket: L{TokenBucket} 54 """ 55 threading.Thread.__init__(self) 56 self.__bucket = bucket 57 self.__fill_interval = fill_interval_in_secs 58 self.__fill_amount = fill_amount 59 self.__should_stop = False 60 self.__sleep = Sleep()
61
62 - def run(self):
63 try: 64 while not self.__should_stop: 65 self.__bucket.put_tokens(self.__fill_amount) 66 self.__sleep.sleep(self.__fill_interval) 67 except Exception as ex: 68 logging.error('In TokenBucketFiller, an exception was caught: ' 69 '"{}"'.format(str(ex)))
70 71 ## HACK: in this implementation there is a very rare situation that would 72 ## lead to taking up to '__fill_interval' seconds to completely stop 73 ## this thread.
74 - def stop(self):
75 self.__should_stop = True 76 self.__sleep.wake_up() 77 self.join()
78