import base64
import copy
import cStringIO
import httplib
import json
import paramiko
import re
import socket
import time
from django.conf import settings
from . import AbstractSchedulerClient
from .states import JobState
MATCH = re.compile(
'(?P<app>[a-z0-9-]+)_?(?P<version>v[0-9]+)?\.?(?P<c_type>[a-z-_]+)?.(?P<c_num>[0-9]+)')
RETRIES = 3
[docs]class UHTTPConnection(httplib.HTTPConnection):
"""Subclass of Python library HTTPConnection that uses a Unix domain socket.
"""
def __init__(self, path):
httplib.HTTPConnection.__init__(self, 'localhost')
self.path = path
[docs] def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.path)
self.sock = sock
[docs]class FleetHTTPClient(AbstractSchedulerClient):
def __init__(self, target, auth, options, pkey):
super(FleetHTTPClient, self).__init__(target, auth, options, pkey)
# single global connection
self.conn = UHTTPConnection(self.target)
# connection helpers
def _request_unit(self, method, name, body=None):
headers = {'Content-Type': 'application/json'}
self.conn.request(method, '/v1-alpha/units/{name}.service'.format(**locals()),
headers=headers, body=json.dumps(body))
return self.conn.getresponse()
def _get_unit(self, name):
for attempt in xrange(RETRIES):
try:
resp = self._request_unit('GET', name)
data = resp.read()
if not 200 <= resp.status <= 299:
errmsg = "Failed to retrieve unit: {} {} - {}".format(
resp.status, resp.reason, data)
raise RuntimeError(errmsg)
return data
except:
if attempt >= (RETRIES - 1):
raise
def _put_unit(self, name, body):
for attempt in xrange(RETRIES):
try:
resp = self._request_unit('PUT', name, body)
data = resp.read()
if not 200 <= resp.status <= 299:
errmsg = "Failed to create unit: {} {} - {}".format(
resp.status, resp.reason, data)
raise RuntimeError(errmsg)
return data
except:
if attempt >= (RETRIES - 1):
raise
def _delete_unit(self, name):
headers = {'Content-Type': 'application/json'}
self.conn.request('DELETE', '/v1-alpha/units/{name}.service'.format(**locals()),
headers=headers)
resp = self.conn.getresponse()
data = resp.read()
if resp.status not in (404, 204):
errmsg = "Failed to delete unit: {} {} - {}".format(
resp.status, resp.reason, data)
raise RuntimeError(errmsg)
return data
def _get_state(self, name=None):
headers = {'Content-Type': 'application/json'}
url = '/v1-alpha/state'
if name:
url += '?unitName={name}.service'.format(**locals())
self.conn.request('GET', url, headers=headers)
resp = self.conn.getresponse()
data = resp.read()
if resp.status not in (200,):
errmsg = "Failed to retrieve state: {} {} - {}".format(
resp.status, resp.reason, data)
raise RuntimeError(errmsg)
return json.loads(data)
def _get_machines(self):
headers = {'Content-Type': 'application/json'}
url = '/v1-alpha/machines'
self.conn.request('GET', url, headers=headers)
resp = self.conn.getresponse()
data = resp.read()
if resp.status not in (200,):
errmsg = "Failed to retrieve machines: {} {} - {}".format(
resp.status, resp.reason, data)
raise RuntimeError(errmsg)
return json.loads(data)
# container api
[docs] def create(self, name, image, command='', template=None, **kwargs):
"""Create a container."""
self._create_container(name, image, command,
template or copy.deepcopy(CONTAINER_TEMPLATE), **kwargs)
def _create_container(self, name, image, command, unit, **kwargs):
l = locals().copy()
l.update(re.match(MATCH, name).groupdict())
# prepare memory limit for the container type
mem = kwargs.get('memory', {}).get(l['c_type'], None)
if mem:
l.update({'memory': '-m {}'.format(mem.lower())})
else:
l.update({'memory': ''})
# prepare memory limit for the container type
cpu = kwargs.get('cpu', {}).get(l['c_type'], None)
if cpu:
l.update({'cpu': '-c {}'.format(cpu)})
else:
l.update({'cpu': ''})
# set unit hostname
l.update({'hostname': self._get_hostname(name)})
# should a special entrypoint be used
entrypoint = kwargs.get('entrypoint')
if entrypoint:
l.update({'entrypoint': '{}'.format(entrypoint)})
# encode command as utf-8
if isinstance(l.get('command'), basestring):
l['command'] = l['command'].encode('utf-8')
# construct unit from template
for f in unit:
f['value'] = f['value'].format(**l)
# prepare tags only if one was provided
tags = kwargs.get('tags', {})
if tags:
tagset = ' '.join(['"{}={}"'.format(k, v) for k, v in tags.viewitems()])
unit.append({"section": "X-Fleet", "name": "MachineMetadata",
"value": tagset})
# post unit to fleet
self._put_unit(name, {"desiredState": "loaded", "options": unit})
def _get_hostname(self, application_name):
hostname = settings.UNIT_HOSTNAME
if hostname == "default":
return ''
elif hostname == "application":
# replace underscore with dots, since underscore is not valid in DNS hostnames
dns_name = application_name.replace("_", ".")
return '-h ' + dns_name
elif hostname == "server":
return '-h %H'
else:
raise RuntimeError('Unsupported hostname: ' + hostname)
[docs] def start(self, name):
"""Start a container."""
self._put_unit(name, {'desiredState': 'launched'})
self._wait_for_container_running(name)
def _wait_for_container_state(self, name):
# wait for container to get scheduled
for _ in xrange(30):
states = self._get_state(name)
if states and len(states.get('states', [])) == 1:
return states.get('states')[0]
time.sleep(1)
else:
raise RuntimeError('container timeout while retrieving state')
def _wait_for_container_running(self, name):
# we bump to 20 minutes here to match the timeout on the router and in the app unit files
try:
self._wait_for_job_state(name, JobState.up)
except RuntimeError:
raise RuntimeError('container failed to start')
def _wait_for_job_state(self, name, state):
# we bump to 20 minutes here to match the timeout on the router and in the app unit files
for _ in xrange(1200):
if self.state(name) == state:
return
time.sleep(1)
else:
raise RuntimeError('timeout waiting for job state: {}'.format(state))
def _wait_for_destroy(self, name):
for _ in xrange(30):
if not self._get_state(name):
break
time.sleep(1)
else:
raise RuntimeError('timeout on container destroy')
[docs] def stop(self, name):
"""Stop a container."""
self._put_unit(name, {"desiredState": "loaded"})
self._wait_for_job_state(name, JobState.created)
[docs] def destroy(self, name):
"""Destroy a container."""
# call all destroy functions, ignoring any errors
try:
self._destroy_container(name)
except:
pass
self._wait_for_destroy(name)
def _destroy_container(self, name):
for attempt in xrange(RETRIES):
try:
self._delete_unit(name)
break
except:
if attempt == (RETRIES - 1): # account for 0 indexing
raise
[docs] def run(self, name, image, entrypoint, command): # noqa
"""Run a one-off command."""
self._create_container(name, image, command, copy.deepcopy(RUN_TEMPLATE),
entrypoint=entrypoint)
# launch the container
self._put_unit(name, {'desiredState': 'launched'})
# wait for the container to get scheduled
state = self._wait_for_container_state(name)
try:
machineID = state.get('machineID')
# find the machine
machines = self._get_machines()
if not machines:
raise RuntimeError('no available hosts to run command')
# find the machine's primaryIP
primaryIP = None
for m in machines.get('machines', []):
if m['id'] == machineID:
primaryIP = m['primaryIP']
if not primaryIP:
raise RuntimeError('could not find host')
# prepare ssh key
file_obj = cStringIO.StringIO(base64.b64decode(self.pkey))
pkey = paramiko.RSAKey(file_obj=file_obj)
# grab output via docker logs over SSH
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(primaryIP, username="core", pkey=pkey)
# share a transport
tran = ssh.get_transport()
def _do_ssh(cmd):
with tran.open_session() as chan:
chan.exec_command(cmd)
while not chan.exit_status_ready():
time.sleep(1)
out = chan.makefile()
output = out.read()
rc = chan.recv_exit_status()
return rc, output
# wait for container to launch
# we loop indefinitely here, as we have no idea how long the docker pull will take
while True:
rc, _ = _do_ssh('docker inspect {name}'.format(**locals()))
if rc == 0:
break
time.sleep(1)
else:
raise RuntimeError('failed to create container')
# wait for container to start
for _ in xrange(2):
_rc, _output = _do_ssh('docker inspect {name}'.format(**locals()))
if _rc != 0:
raise RuntimeError('failed to inspect container')
_container = json.loads(_output)
started_at = _container[0]["State"]["StartedAt"]
if not started_at.startswith('0001'):
break
time.sleep(1)
else:
raise RuntimeError('container failed to start')
# wait for container to complete
for _ in xrange(1200):
_rc, _output = _do_ssh('docker inspect {name}'.format(**locals()))
if _rc != 0:
raise RuntimeError('failed to inspect container')
_container = json.loads(_output)
finished_at = _container[0]["State"]["FinishedAt"]
if not finished_at.startswith('0001'):
break
time.sleep(1)
else:
raise RuntimeError('container timed out')
# gather container output
_rc, output = _do_ssh('docker logs {name}'.format(**locals()))
if _rc != 0:
raise RuntimeError('could not attach to container')
# determine container exit code
_rc, _output = _do_ssh('docker inspect {name}'.format(**locals()))
if _rc != 0:
raise RuntimeError('could not determine exit code')
container = json.loads(_output)
rc = container[0]["State"]["ExitCode"]
finally:
# cleanup
self._destroy_container(name)
self._wait_for_destroy(name)
# return rc and output
return rc, output
[docs] def state(self, name):
"""Display the given job's running state."""
systemdActiveStateMap = {
'active': 'up',
'reloading': 'down',
'inactive': 'created',
'failed': 'crashed',
'activating': 'down',
'deactivating': 'down',
}
try:
# NOTE (bacongobbler): this call to ._get_unit() acts as a pre-emptive check to
# determine if the job no longer exists (will raise a RuntimeError on 404)
self._get_unit(name)
state = self._wait_for_container_state(name)
activeState = state['systemdActiveState']
# FIXME (bacongobbler): when fleet loads a job, sometimes it'll automatically start and
# stop the container, which in our case will return as 'failed', even though
# the container is perfectly fine.
if activeState == 'failed' and state['systemdLoadState'] == 'loaded':
return JobState.created
return getattr(JobState, systemdActiveStateMap[activeState])
except KeyError:
# failed retrieving a proper response from the fleet API
return JobState.error
except RuntimeError:
# failed to retrieve a response from the fleet API,
# which means it does not exist
return JobState.destroyed
SchedulerClient = FleetHTTPClient
CONTAINER_TEMPLATE = [
{"section": "Unit", "name": "Description", "value": "{name}"},
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker pull $IMAGE"'''}, # noqa
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "docker inspect {name} >/dev/null 2>&1 && docker rm -f {name} || true"'''}, # noqa
{"section": "Service", "name": "ExecStart", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker run --name {name} --rm {memory} {cpu} {hostname} -P $IMAGE {command}"'''}, # noqa
{"section": "Service", "name": "ExecStop", "value": '''/usr/bin/docker stop {name}'''},
{"section": "Service", "name": "TimeoutStartSec", "value": "20m"},
{"section": "Service", "name": "TimeoutStopSec", "value": "10"},
{"section": "Service", "name": "RestartSec", "value": "5"},
{"section": "Service", "name": "Restart", "value": "on-failure"},
]
RUN_TEMPLATE = [
{"section": "Unit", "name": "Description", "value": "{name} admin command"},
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker pull $IMAGE"'''}, # noqa
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "docker inspect {name} >/dev/null 2>&1 && docker rm -f {name} || true"'''}, # noqa
{"section": "Service", "name": "ExecStart", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker run --name {name} --entrypoint={entrypoint} -a stdout -a stderr $IMAGE {command}"'''}, # noqa
{"section": "Service", "name": "TimeoutStartSec", "value": "20m"},
]