Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

# Copyright (c) 2014, Facebook, Inc.  All rights reserved. 

# 

# This source code is licensed under the BSD-style license found in the 

# LICENSE file in the root directory of this source tree. An additional grant 

# of patent rights can be found in the PATENTS file in the same directory. 

# 

"""Base Task and related helper classes for sparts' task system 

 

Tasks in sparts are a way to organize and delegate some sort of 

background or other synchronized processing.  This module defines 

the most common features. 

""" 

from __future__ import absolute_import 

import logging 

import six 

import threading 

 

from six.moves import xrange 

from sparts.sparts import _SpartsObject 

from sparts.timer import Timer 

 

 

class VTask(_SpartsObject): 

    """The base class for all tasks.  Needs to be subclassed to be useful. 

 

    Attributes: 

        OPT_PREFIX - Overrides the prefix for any associated options 

        LOOPLESS - True indicates this task should not spawn any threads 

        DEPS - List of `VTask` subclasses that must be initialized first 

        workers - Number of Threads that should execute the `_runloop` 

 

    """ 

 

    OPT_PREFIX = None 

    LOOPLESS = False 

    DEPS = [] 

    workers = 1 

 

    @property 

    def name(self): 

        return self.__class__.__name__ 

 

    def __init__(self, service): 

        """Task Constructor.  requires a `service` VService instance 

 

        You should not need to override this.  Override initTask isntead.""" 

        self.service = service 

        self.logger = logging.getLogger('%s.%s' % (service.name, self.name)) 

        self.threads = [] 

 

    def initTask(self): 

        """Override this to do any task-specific initialization 

 

        Don't forget to call super(...).initTask(), or things may not 

        run properly.""" 

        if not self.LOOPLESS: 

            for i in xrange(self.workers): 

                if self.workers == 1: 

                    name = self.name 

                else: 

                    name = '%s-%d' % (self.name, i + 1) 

                self.threads.append( 

                    threading.Thread(target=self._run, name=name)) 

 

    def initTaskThread(self): 

        """Override thread-specific initialization for multi-threaded tasks""" 

 

    def start(self): 

        """Called during bootstrap to spin up threads post-creation.""" 

        if not self.LOOPLESS: 

            for thread in self.threads: 

                thread.start() 

 

    def stop(self): 

        """Custom stopping logic for this task. 

 

        This is called by the main VService thread, after a graceful shutdown 

        request has been received.""" 

        pass 

 

    def join(self): 

        """Block, waiting for all child worker threads to finish.""" 

        if not self.LOOPLESS: 

            for thread in self.threads: 

                while thread.isAlive(): 

                    thread.join(0.5) 

 

    @property 

    def running(self): 

        """Returns True if task is still doing work. 

 

        This base implementation returns True if any child threads are alive""" 

        for thread in self.threads: 

            if thread.isAlive(): 

                return True 

        return False 

 

    def _run(self): 

        try: 

            self.initTaskThread() 

            self._runloop() 

        except Exception: 

            # In general, you should not get here.  So, we will shutdown the 

            # server.  It is better for your service to *completely* crash in 

            # response to an unhandled error, than to continue on in some sort 

            # of half-alive zombie state.  Please catch your exceptions. 

            # Consider throwing a TryLater if this task is a subclass of 

            # QueueTask or PeriodicTask. 

            # 

            # I hate zombies. 

            self.logger.exception("Unhandled exception in %s", self.name) 

            self.service.shutdown() 

        finally: 

            self.logger.debug('Thread %s exited', 

                              threading.currentThread().name) 

 

    def _runloop(self): 

        """For normal (non-LOOPLESS) tasks, this MUST be implemented""" 

        # TODO: May require some janky metaprogramming to make ABC enforce 

        # this in a cleaner way. 

        raise NotImplementedError() 

 

    @classmethod 

    def _loptName(cls, name): 

        return '--' + cls._optName(name).replace('_', '-') 

 

    @classmethod 

    def _optName(cls, name): 

        parts = [cls.OPT_PREFIX or cls.__name__, 

                 name.replace('-', '_')] 

        return '_'.join(parts) 

 

    def getTaskOption(self, opt, default=None): 

        return getattr(self.service.options, 

                       self._optName(opt), default) 

 

    def setTaskOption(self, opt, value): 

        setattr(self.service.options, self._optName(opt), value) 

 

    @classmethod 

    def register(cls): 

        REGISTERED.register(cls) 

 

 

class SkipTask(Exception): 

    """Throw during initTask() to skip execution of this task. 

 

    Useful in case the task is missing configuration critical to its operation, 

    but not critical to the overall program. 

 

    A good example might be a network-based logging task.""" 

    pass 

 

 

class TryLater(Exception): 

    """Throw this in overridden tasks to defer execution. 

 

    Can be used to temporarily suspend and restart execution, which is useful 

    for handling unexpected error conditions, or re-scheduling work.""" 

    pass 

 

 

class ExecuteContext(object): 

    """An abstraction used internally by various tasks to track work 

 

    Encapsulates common metrics for work that can be retried later, hooks for 

    signalling completion, etc""" 

    def __init__(self, attempt=1, item=None, deferred=None, future=None): 

        self.attempt = attempt 

        self.item = item 

        self.deferred = deferred 

        self.future = future 

        self.running = threading.Event() 

        self.timer = Timer() 

 

    def start(self): 

        """Indicate that execution has started""" 

        if not self.running.is_set(): 

            if self.future is not None: 

                self.future.set_running_or_notify_cancel() 

            self.timer.start() 

            self.running.set() 

 

    def set_result(self, result): 

        """Indicate that execution has completed""" 

        self.timer.stop() 

        if self.future is not None: 

            self.future.set_result(result) 

        if self.deferred is not None: 

            self.deferred.callback(result) 

 

    def set_exception(self, exception): 

        """Indicate that execution has failed""" 

        handled = False 

 

        self.timer.stop() 

        if self.future is not None: 

            self.future.set_exception(exception) 

 

        if self.deferred is not None: 

            unhandled = [] 

            self.deferred.addErrback(self._unhandledErrback, unhandled) 

            self.deferred.errback(exception) 

207            if not unhandled: 

                handled = True 

 

        return handled 

 

    @property 

    def elapsed(self): 

        """Convenience property.  Returns timer duration.""" 

        return self.timer.elapsed 

 

    @staticmethod 

    def _unhandledErrback(error, unhandled): 

        """Fallback errback for deferred processing""" 

        unhandled.append(error) 

        return None 

 

class Tasks(object): 

    """Collection class for dealing with service tasks. 

 

    Tasks can be accessed but accessing them (by name) as attributes, or via 

    the get/require methods. 

    """ 

    def __init__(self, tasks=None): 

        self.logger = logging.getLogger('sparts.tasks') 

        self._registered = [] 

        self._registered_names = {} 

        self._created = [] 

        self._created_names = {} 

        self._did_create = False 

 

        tasks = tasks or [] 

236        for t in tasks: 

            self.register(t) 

 

    def register(self, task_class): 

        """Register task_class with the collection""" 

        assert not self._did_create 

        name = task_class.__name__ 

        if name not in self._registered_names: 

            # Recursively register dependencies 

            for dep in task_class.DEPS: 

                self.register(dep) 

 

            self._registered.append(task_class) 

            self._registered_names[name] = task_class 

 

    def register_all(self, tasks): 

        """Register multiple `tasks` classes with the collection""" 

        assert not self._did_create 

        for task in tasks: 

            self.register(task) 

 

    def unregister(self, task_class): 

        """Unregister `task_class` from the collection""" 

        assert not self._did_create 

        self._registered.remove(task_class) 

        del(self._registered_names[task_class.__name__]) 

 

    def create(self, *args, **kwargs): 

        """Create all registered tasks. 

 

        TODO: Handle SkipTask? 

        """ 

        assert not self._did_create 

        for task_cls in self._registered: 

            task = task_cls(*args, **kwargs) 

            self._created.append(task) 

            self._created_names[task_cls.__name__] = task 

 

        self._did_create = True 

 

    def remove(self, task): 

        """Remove created `task` from the collection""" 

        assert self._did_create 

        self._created.remove(task) 

        del(self._created_names[task.name]) 

 

    def init(self): 

        """Initialize all created tasks.  Remove ones that throw SkipTask.""" 

        assert self._did_create 

        exceptions = [] 

        skipped = [] 

 

        for t in self: 

            try: 

                t.initTask() 

            except SkipTask as e: 

                # Keep track of SkipTasks so we can remove it from this 

                # task collection 

                self.logger.info("Skipping %s (%s)", t.name, e) 

                skipped.append(t) 

            except Exception as e: 

                # Log and track unhandled exceptions during init, so we can 

                # fail later. 

                self.logger.exception("Error creating task, %s", t.name) 

                exceptions.append(e) 

 

        # Remove any tasks that should be skipped 

303        for t in skipped: 

            self.remove(t) 

 

        # Reraise a new exception, if any exceptions were thrown in init 

        if len(exceptions): 

            raise Exception("Unable to start service (%d task start errors)" % 

                            len(exceptions)) 

 

    def start(self): 

        """Start all the tasks, creating worker threads, etc""" 

        assert self._did_create 

        for t in self.tasks: 

            t.start() 

 

    def get(self, task): 

        """Returns the `task` or its class, if creation hasn't happened yet.""" 

        if isinstance(task, six.string_types): 

            name = task 

        else: 

            assert issubclass(task, VTask) 

            name = task.__name__ 

 

327        if self._did_create: 

            return self._created_names.get(name) 

        else: 

            return self._registered_names.get(name) 

 

    def require(self, task): 

        """Return the `task` instance or class, raising if not found.""" 

        result = self.get(task) 

333        if result is None: 

            raise KeyError('%s not in tasks (%s|%s)' % 

                           (task, self.task_classes, self.tasks)) 

 

        return result 

 

    @property 

    def task_classes(self): 

        """Accessor for accessing a copy of registered task classes""" 

        return self._registered[:] 

 

    @property 

    def tasks(self): 

        """Accessor for accessing a registered or instantiated task classes 

 

        Return value varies based on whether `create()` has been called.""" 

        if self._did_create: 

            return self._created[:] 

        else: 

            return self.task_classes 

 

    def __getattr__(self, name): 

        """Helper for accessing tasks using their name as an attribute.""" 

        return self.require(name) 

 

    def __iter__(self): 

        """Iterates on created or registered tasks, as appropriate.""" 

        return iter(self.tasks) 

 

    def __len__(self): 

        """Returns the number of created or registered tasks, as appropriate""" 

        return len(self.tasks) 

 

    def __getitem__(self, index): 

        """Returns the created or registered task at the specified `index`""" 

        return self.tasks[index] 

 

 

# This `Tasks` collection tracks globally registered tasks. 

REGISTERED = Tasks()