1 import Queue
2 import time
3 from concurrent_tree_crawler.crawler_thread import CrawlerThread
4
7
8
9
10
12 """Starts and stops different crawler threads"""
13
15 """
16 @param tree: L{TreeAccessor} object to be used by all of the threads
17 @type tree: L{TreeAccessor}
18 @param navigators: navigators to be used by the threads. Each thread
19 obtains a single navigator. Number of threads created is the same
20 as the number of navigators.
21 @type navigators: list of L{NavigatorTreeWrapper}s
22 """
23 self.__tree = tree
24 self.__navigators = navigators
25 self.__status_queue = None
26 self.__threads = None
27
29 assert len(self.__navigators) > 0, "No navigators available"
30 self.__status_queue = Queue.Queue()
31 self.__threads = []
32 for navigator in self.__navigators:
33 crawler = CrawlerThread(navigator, self.__tree, self.__status_queue)
34 self.__threads.append(crawler)
35 for t in self.__threads:
36 t.setDaemon(True)
37 t.start()
38
43
45 """
46 Wait until all threads finished their jobs and then get rid of them. If
47 C{timeout} seconds pass before the threads are finished, they are
48 stopped.
49
50 @param timeout: if the value is not C{None}, the method
51 blocks at most for C{timeout} number of seconds, otherwise
52 the method blocks until all threads are finished.
53 @return: C{False} iff the wait ended because of the timeout
54 """
55 try:
56 for _ in xrange(len(self.__threads)):
57 wait_start = time.time()
58 ex_info = self.__status_queue.get(timeout=timeout)
59 wait_end = time.time()
60 if ex_info is not None:
61 raise CrawlersManagerException(ex_info[1])
62 if timeout is not None:
63 timeout = timeout - (wait_end - wait_start)
64 if timeout <= 0:
65 raise Queue.Empty
66 return True
67
68 except Queue.Empty as _:
69 for t in self.__threads:
70 t.stop_activity()
71 for t in self.__threads:
72 t.join()
73 return False
74 finally:
75 del self.__threads[:]
76