Source code for scheduler.mock

import json

from . import AbstractSchedulerClient
from .states import JobState, TransitionError


# HACK: MockSchedulerClient is not persistent across requests
jobs = {}


[docs]class MockSchedulerClient(AbstractSchedulerClient):
[docs] def create(self, name, image, command, **kwargs): """Create a new container.""" jobs.setdefault(name, {})['state'] = JobState.created
[docs] def destroy(self, name): """Destroy a container.""" jobs.setdefault(name, {})['state'] = JobState.destroyed
[docs] def run(self, name, image, entrypoint, command): """Run a one-off command.""" # dump input into a json object for testing purposes return 0, json.dumps({ 'name': name, 'image': image, 'entrypoint': entrypoint, 'command': command, })
[docs] def start(self, name): """Start a container.""" if self.state(name) not in [JobState.created, JobState.up, JobState.down, JobState.crashed, JobState.error]: raise TransitionError(self.state(name), JobState.up, 'the container must be stopped or up to start') jobs.setdefault(name, {})['state'] = JobState.up
[docs] def state(self, name): """Display the given job's running state.""" return jobs.get(name, {}).get('state', JobState.initialized)
[docs] def stop(self, name): """Stop a container.""" job = jobs.get(name, {}) if job.get('state') not in [JobState.up, JobState.crashed, JobState.error]: raise TransitionError(job.get('state'), JobState.up, 'the container must be up to stop') job['state'] = JobState.down
SchedulerClient = MockSchedulerClient