Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

# Copyright (c) 2014, Facebook, Inc.  All rights reserved. 

# 

# This source code is licensed under the BSD-style license found in the 

# LICENSE file in the root directory of this source tree. An additional grant 

# of patent rights can be found in the PATENTS file in the same directory. 

# 

"""Module for tasks related to doing work from a queue""" 

from six.moves import queue 

from sparts.counters import counter, samples, SampleType, CallbackCounter 

from sparts.sparts import option 

from sparts.vtask import VTask, ExecuteContext, TryLater 

 

class QueueTask(VTask): 

    """Task that calls `execute` for all work put on its `queue`""" 

    MAX_ITEMS = 0 

    WORKERS = 1 

    max_items = option(type=int, default=lambda cls: cls.MAX_ITEMS, 

                       help='Set a bounded queue length.  This may ' 

                            'cause unexpected deadlocks. [%(default)s]') 

    workers = option(type=int, default=lambda cls: cls.WORKERS, 

                     help='Number of threads to spawn to work on items from ' 

                          'its queue. [%(default)s]') 

 

    execute_duration_ms = samples(windows=[60, 240], 

       types=[SampleType.AVG, SampleType.MAX, SampleType.MIN]) 

    n_trylater = counter() 

    n_completed = counter() 

    n_unhandled = counter() 

 

    def execute(self, item, context): 

        """Implement this in your QueueTask subclasses""" 

        raise NotImplementedError() 

 

    def initTask(self): 

        super(QueueTask, self).initTask() 

        self.queue = queue.Queue(maxsize=self.max_items) 

        self.counters['queue_depth'] = \ 

            CallbackCounter(lambda: self.queue.qsize()) 

        self._shutdown_sentinel = object() 

 

    def stop(self): 

        super(QueueTask, self).stop() 

        self.queue.put(self._shutdown_sentinel) 

 

    def _runloop(self): 

exit        while not self.service._stop: 

            try: 

                item = self.queue.get(timeout=1.0) 

                if item is self._shutdown_sentinel: 

                    self.queue.put(item) 

                    break 

            except queue.Empty: 

                continue 

 

            # Create an ExecuteContext if we didn't have one 

            if isinstance(item, ExecuteContext): 

                context = item 

                item = context.item 

            else: 

                context = ExecuteContext(item=item) 

 

            try: 

                context.start() 

                result = self.execute(item, context) 

                self.n_completed.increment() 

                self.execute_duration_ms.add(context.elapsed * 1000.0) 

                context.set_result(result) 

            except TryLater: 

                self.n_trylater.increment() 

                context.attempt += 1 

                self.queue.put(context) 

            except Exception as ex: 

                self.n_unhandled.increment() 

                self.execute_duration_ms.add(context.elapsed * 1000.0) 

                handled = context.set_exception(ex) 

                if not handled: 

                    raise 

 

            finally: 

                self.queue.task_done()