Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

# Copyright (c) 2014, Facebook, Inc.  All rights reserved. 

# 

# This source code is licensed under the BSD-style license found in the 

# LICENSE file in the root directory of this source tree. An additional grant 

# of patent rights can be found in the PATENTS file in the same directory. 

# 

"""Module for implementing time-series counters.""" 

from __future__ import absolute_import 

 

from collections import deque 

from functools import partial 

from six import next 

from sparts.sparts import _Nameable, _Bindable, ProvidesCounters 

 

import time 

 

 

class SampleType: 

    """Pass an array of these in the `types` paremeter to `sample()`""" 

    COUNT = 'count' 

    SUM = 'sum' 

    AVG = 'avg' 

    AVERAGE = 'avg' 

    MAX = 'max' 

    MIN = 'min' 

 

 

class _BaseCounter(_Nameable, _Bindable, ProvidesCounters): 

    """Base type for counter-like things""" 

    suffix = 'UNDEF' 

 

    def __init__(self, name=None): 

        super(_BaseCounter, self).__init__(name) 

        self._initialize() 

 

    def _bind(self, obj): 

        return self.__class__(name=self.name) 

 

    def _initialize(self): 

        raise NotImplementedError() 

 

    def _genCounterCallbacks(self): 

        """Return this counter's (name, value)""" 

        yield self.name, self 

 

    def getvalue(self): 

        raise NotImplementedError() 

 

    def add(self, value): 

        raise NotImplementedError() 

 

    def __call__(self): 

        return self.getvalue() 

 

    def __int__(self): 

        return int(self.getvalue()) 

 

    def __float__(self): 

        return float(self.getvalue()) 

 

    def __str__(self): 

        v = self.getvalue() 

64        if v is None: 

            return '__None__' 

        return str(v) 

 

 

class ValueCounter(_BaseCounter): 

    """Base type for counter-like things that have a `._value`""" 

    DEFAULT_VALUE = 0.0 

    def _initialize(self, value=None): 

        self._value = value or self.DEFAULT_VALUE 

 

    def getvalue(self): 

        return self._value 

 

 

class CallbackCounter(_BaseCounter): 

    def __init__(self, callback, name=None): 

        super(CallbackCounter, self).__init__(name=name) 

        self._callback = callback 

 

    def _initialize(self): 

        pass 

 

    def getvalue(self): 

        return self._callback() 

 

 

class Sum(ValueCounter): 

    """A running total""" 

    suffix = SampleType.SUM 

 

    def add(self, value): 

        self._value += value 

 

    def increment(self): 

        self.add(1.0) 

 

    def incrementBy(self, value): 

        self.add(value) 

 

    def reset(self, value=0): 

        self._value = value 

 

counter = Sum 

 

class Count(ValueCounter): 

    """A running count""" 

    suffix = SampleType.COUNT 

    DEFAULT_VALUE = 0 

 

    def add(self, value): 

        self._value += 1 

 

class Average(_BaseCounter): 

    """A running average""" 

    suffix = SampleType.AVERAGE 

 

    def _initialize(self): 

        self._total = 0.0 

        self._count = 0 

 

    def add(self, value): 

        # TODO: Re-use sibling total/count counters if present 

        # not sure how to do this sensibly 

        self._total += value 

        self._count += 1 

 

    def getvalue(self): 

        if self._count == 0: 

            return None 

        return self._total / self._count 

 

 

class Max(ValueCounter): 

    """A running maximum""" 

    suffix = SampleType.MAX 

    DEFAULT_VALUE = None 

 

    def add(self, value): 

        if self._value is None: 

            self._value = value 

        elif value > self._value: 

            self._value = value 

 

 

class Min(ValueCounter): 

    """A running minimum""" 

    suffix = SampleType.MIN 

    DEFAULT_VALUE = None 

 

    def add(self, value): 

        if self._value is None: 

            self._value = value 

        elif value < self._value: 

            self._value = value 

 

    def getvalue(self): 

        return self._value 

 

# TODO: Percentiles!! 

 

 

# Lookup for mapping SampleTypes to their respective classes 

_SampleMethod = { 

    SampleType.COUNT: Count, 

    SampleType.SUM: Sum, 

    SampleType.AVERAGE: Average, 

    SampleType.MAX: Max, 

    SampleType.MIN: Min, 

} 

 

 

class Samples(_Nameable, _Bindable, ProvidesCounters): 

    """`samples` are used to generate series of counters dynamically 

 

    This is so you can say, keep track of the average duration of some event for 

    the last minute, hour, day, etc, and export these as 4 separate counters. 

    """ 

    def __init__(self, types=None, windows=None, name=None): 

        super(Samples, self).__init__(name) 

        self.types = types or [SampleType.AVERAGE] 

        # minutely, hourly 

        self.windows = sorted(windows or [60, 3600]) 

        self.max_window = max(self.windows) 

        self.samples = deque() 

        self.dirty = True 

        self._prev_counters = {} 

        self._prev_time = None 

 

    def _bind(self, obj): 

        return self.__class__(types=self.types, windows=self.windows, 

                              name=self.name) 

 

    def _genCounterCallbacks(self): 

        """Yield all the child counters.""" 

        for subcounter in self.iterkeys(): 

            yield subcounter, partial(self.getCounter, subcounter) 

 

    def _now(self): 

        """Defined to allow unittest overriding""" 

        return time.time() 

 

    def add(self, value): 

        now = self._now() 

        self.samples.append((now, value)) 

 

        # When adding samples, trim old ones. 

211        while now - self.max_window > self.samples[0][0]: 

            self.samples.popleft() 

        self.dirty = True 

 

        # TODO: Handle "infinite" windows 

 

    def getCounters(self): 

        if self.dirty is False and self._prev_time == int(self._now()): 

            return self._prev_counters 

 

        ops = [] 

        for type in self.types: 

            ops.append(_SampleMethod[type]()) 

 

        now = self._now() 

        genwindows = iter(self.windows) 

        window = next(genwindows) 

        result = {} 

        done = False 

 

        def _saveCounterValues(window): 

            """Re-usable helper function for setting results and continuing""" 

 

            prefix = '' 

            if self.name is not None: 

                prefix = self.name + '.' 

 

            for op in ops: 

                result[prefix + op.suffix + '.' + str(window)] = \ 

                    op.getvalue() 

            # Move to the next window 

            try: 

                return next(genwindows), False 

            except StopIteration: 

                # We exhausted all our windows 

                return None, True 

 

        for ts, value in reversed(self.samples): 

            # We exceeded the current window 

            while not done and now - window > ts: 

                # Save counter values 

                window, done = _saveCounterValues(window) 

 

            if done: 

                # TODO: "prune" any remaining samples 

                break 

 

            for op in ops: 

                op.add(value) 

 

        # We exhausted the samples before the windows 

        while not done: 

            window, done = _saveCounterValues(window) 

 

        self._prev_counters = result 

        self._prev_time = int(now) 

        self.dirty = False 

        return result 

 

    def getCounter(self, name, default=None): 

        return self.getCounters().get(name, default) 

 

    def iterkeys(self): 

        for type in self.types: 

            for window in self.windows: 

                yield self.name + '.' + type + '.' + str(window) 

        # TODO: Infinite Windows 

 

 

samples = Samples