Source code for scheduler.chaos
import random
from .mock import MockSchedulerClient, jobs
from .states import JobState
CREATE_ERROR_RATE = 0
DESTROY_ERROR_RATE = 0
START_ERROR_RATE = 0
STOP_ERROR_RATE = 0
[docs]class ChaosSchedulerClient(MockSchedulerClient):
[docs] def create(self, name, image, command, **kwargs):
"""Create a new container."""
if random.random() < CREATE_ERROR_RATE:
jobs.setdefault(name, {})['state'] = JobState.error
else:
super(ChaosSchedulerClient, self).create(name, image, command, **kwargs)
[docs] def destroy(self, name):
"""Destroy a container."""
if random.random() < DESTROY_ERROR_RATE:
jobs.setdefault(name, {})['state'] = JobState.error
else:
super(ChaosSchedulerClient, self).destroy(name)
[docs] def run(self, name, image, entrypoint, command):
"""Run a one-off command."""
if random.random() < CREATE_ERROR_RATE:
raise RuntimeError('exit code 1')
else:
super(ChaosSchedulerClient, self).run(name, image, entrypoint, command)
[docs] def start(self, name):
"""Start a container."""
if random.random() < START_ERROR_RATE:
jobs.setdefault(name, {})['state'] = JobState.crashed
else:
super(ChaosSchedulerClient, self).start(name)
[docs] def stop(self, name):
"""Stop a container."""
if random.random() < STOP_ERROR_RATE:
jobs.setdefault(name, {})['state'] = JobState.crashed
else:
super(ChaosSchedulerClient, self).stop(name)
SchedulerClient = ChaosSchedulerClient