Source code for xd.docker.client

"""Module containing DockerClient and associated exceptions."""


import urllib.parse
import requests
import requests_unixsocket
import json
import base64
import os
import io
import tarfile
import re
import functools

from typing import Optional, Union, Sequence, Dict, Tuple, List

from xd.docker.container import Container
from xd.docker.image import Image
from xd.docker.parameters import ContainerConfig, HostConfig, ContainerName, \
    Repository, RegistryAuthConfig, VolumeMount, Signal, json_update

import logging
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

requests_unixsocket.monkeypatch()


__all__ = ['DockerClient', 'HTTPError', 'ClientError', 'ServerError']


class HTTPError(Exception):
    def __init__(self, url, code):
        self.url = url
        self.code = code


class ClientError(HTTPError):
    def __init__(self, url, code):
        super(ClientError, self).__init__(url, code)


class ServerError(HTTPError):
    def __init__(self, url, code):
        super(ServerError, self).__init__(url, code)


[docs]class DockerClient(object): """Docker client. A DockerClient instance is used to communicate with Docker daemon (or something else that is speaking Docker Remote API). Arguments: host: URL to Docker daemon socket to connect to. :Example: Connect to docker daemon on localhost TCP socket: >>> docker = DockerClient('tcp://127.0.0.1:2375') Connect to docker daemon on UNIX domain socket: >>> docker = DockerClient('unix:///var/run/docker.sock') """ def __init__(self, host: Optional[str]=None): if host is None: host = os.environ.get('DOCKER_HOST', 'unix:///var/run/docker.sock') if host.startswith('unix://'): host = 'http+unix://' + urllib.parse.quote_plus(host[7:]) elif host.startswith('tcp://'): host = 'http://' + host[6:] else: raise ValueError('Invalid host value: {}'.format(host)) self.base_url = host @staticmethod def _check_http_status_code(url, status_code): if status_code >= 200 and status_code < 300: return elif status_code >= 400 and status_code <= 499: raise ClientError(url, status_code) elif status_code >= 500 and status_code <= 599: raise ServerError(url, status_code) else: raise HTTPError(url, status_code) @staticmethod def _process_response_output(r, output, last_line=False): decoder = json.JSONDecoder() failed = False for line in r.iter_lines(): line = line.decode('utf-8') index = 0 while index < len(line): data, extra_data_index = decoder.raw_decode(line[index:]) index += extra_data_index for t in ('progressDetail', 'stream', 'status', 'error'): if t not in data: continue if t not in output: break print(data[t].rstrip('\n')) if 'error' in data: failed = True if failed: return False else: if last_line: return data else: return True def _get(self, url, params=None, headers=None, stream=False): url = self.base_url + url r = requests.get(url, params=params, headers=headers, stream=stream) self._check_http_status_code(url, r.status_code) return r def _post(self, url, params=None, headers=None, data=None, stream=False): url = self.base_url + url r = requests.post(url, params=params, headers=headers, data=data, stream=stream) self._check_http_status_code(url, r.status_code) return r def _delete(self, url, params=None, stream=False): url = self.base_url + url r = requests.delete(url, params=params, stream=stream) self._check_http_status_code(url, r.status_code) return r
[docs] def version(self) -> Tuple[int, int]: """Get Docker Remote API version. Raises: ServerError: Server error. Returns: Major/minor version number of Docker daemon (Docker Remote API). """ r = self._get('/version') return r.json()
@property @functools.lru_cache(maxsize=1) def api_version(self): version = self.version() return tuple([int(i) for i in version['ApiVersion'].split('.')])
[docs] def ping(self) -> None: """Ping the docker server. Raises: ServerError: Server error. """ self._get('/_ping')
[docs] def containers(self, only_running: bool = True) -> List[Container]: """Get list of containers. By default, only running containers are returned. Keyword arguments: only_running: List only running containers (if True), or all containers (if False). Raises: ClientError: Bad parameter. ServerError: Server error. Returns: List of containers. """ params = {} params['all'] = not only_running r = self._get('/containers/json', params=params) return [Container(self, list_response=c) for c in r.json()]
[docs] def images(self) -> List[Image]: """Get list of images. Images returned does only contain partial information. To obtain detailed information, use `image_inspect` or `Image.inspect` on the `Image` in question. Raises: ServerError: Server error. Returns: List of images. """ r = self._get('/images/json') return [Image(self, list_response=image) for image in r.json()]
def image_inspect_raw(self, name: str) -> Dict: r = self._get('/images/{}/json'.format(name)) return r.json()
[docs] def image_inspect(self, name: str) -> Image: """Get image with low-level information. Get low-level information of a named image. Returns `Image` instance with the information. Arguments: name: name of image. """ return Image(self, inspect_response=self.image_inspect_raw(name))
[docs] def image_build(self, context: str, output=('error', 'stream', 'status'), dockerfile: Optional[str]=None, tag: Optional[Union[Repository, str]]=None, cache: bool=True, pull: Optional[bool]=None, rm: Optional[bool]=None, force_rm: Optional[bool]=None, host_config: Optional[HostConfig]=None, registry_config: Optional[RegistryAuthConfig]=None, buildargs: Optional[Dict[str, str]]=None): """Build an image from a Dockerfile. Build image from a given context or stand-alone Dockerfile. Arguments: context: path to directory containing build context, or path to a stand-alone Dockerfile. output: tuple/list of with type of output information to allow (Default: ('stream', 'status', 'error')). dockerfile: path to dockerfile in build context. tag: repository name and tag to be applied to the resulting image. cache: use the cache when building the image (default: True). pull: attempt to pull the image even if an older image exists locally (default: False). rm: False/True. Remove intermediate containers after a successful build (default: True). force_rm: False/True. Always remove intermediate containers after build (default: False). host_config: HostConfig instance. registry_config: RegistryAuthConfig instance. buildargs: build-time environment variables. """ # Handle convenience argument types if isinstance(tag, str): tag = Repository(tag) # TODO: take from HostConfig: # memory # swap # cpu_shares # cpu_period # cpuset_cpus # Request headers headers = {'content-type': 'application/tar'} if registry_config: registry_config = json.dumps( registry_config.json()).encode('utf-8') headers['X-Registry-Config'] = base64.b64encode(registry_config) # Request body if not os.path.exists(context): raise ValueError('context argument does not exist: %s' % (context)) tar_buf = io.BytesIO() tar = tarfile.TarFile(fileobj=tar_buf, mode='w', dereference=True) if os.path.isfile(context): tar.add(context, 'Dockerfile') else: for f in os.listdir(context): tar.add(os.path.join(context, f), f) tar.close() # Query parameters query_params = {} no_cache = None if cache else True if force_rm: rm = None arg_fields = ( ('dockerfile', 'dockerfile', ((1, 17), None)), ('t', 'tag', None), ('nocache', 'no_cache', None), ('pull', 'pull', ((1, 16), None)), ('rm', 'rm', ((1, 16), None)), ('forcerm', 'force_rm', ((1, 16), None)), ('buildargs', 'buildargs', ((1, 21), None)), ) json_update(query_params, locals(), arg_fields, self.api_version) host_config_fields = ( ('memory', 'memory', ((1, 18), None)), ('memswap', 'memory_swap', ((1, 18), None)), ('cpushares', 'cpu_shares', ((1, 18), None)), ('cpusetcpus', 'cpuset_cpus', ((1, 18), None)), ('cpuperiod', 'cpu_period', ((1, 19), None)), ('cpuquota', 'cpu_quota', ((1, 19), None)), ('shmsize', 'shm_size', ((1, 22), None)), ) if host_config: json_update(query_params, host_config, host_config_fields, self.api_version) r = self._post('/build', headers=headers, data=tar_buf.getvalue(), params=query_params, stream=True) false_or_last_line = self._process_response_output( r, output, last_line=True) if false_or_last_line is False: return None id_match = re.match('Successfully built ([0-9a-f]+)', false_or_last_line['stream']) return id_match.group(1)
[docs] def image_pull(self, name, registry_auth=None, output=('error', 'stream', 'status')): """Pull image. Create an image by pulling it from a registry. Arguments: name: name of the image to pull. output: tuple/list of with type of output information to allow (Default: ('stream', 'status', 'error')). """ params = {'fromImage': name} headers = {'content-type': 'application/json'} if registry_auth: if not isinstance(registry_auth, dict): raise TypeError('registry_auth must be dict: %s' % ( type(registry_auth))) registry_auth = json.dumps(registry_auth).encode('utf-8') headers['X-Registry-Auth'] = base64.b64encode(registry_auth) r = self._post('/images/create', headers=headers, params=params, stream=True) return self._process_response_output(r, output)
[docs] def image_remove(self, name): """Remove an image. Remove the image name from the filesystem. Arguments: name: name of the image to remove. """ r = self._delete('/images/{}'.format(name)) return r.json()
[docs] def image_tag(self, image, tag: Optional[Union[Repository, str]]=None, force: Optional[bool]=None): """Tag an image. Add tag to an existing image. Arguments: image: image to add tag to. tag: repository name and optionally tag. force: force creation of tag. """ # Handle convenience argument types if isinstance(tag, str): tag = Repository(tag) params = {} params['repo'] = tag.name if tag.tag is not None: params['tag'] = tag.tag if isinstance(force, bool): json_update(params, {'force': force}, (('force', 'force', (None, (1, 23))),), self.api_version) self._post('/images/{}/tag'.format(image), params=params)
[docs] def container_create( self, config: ContainerConfig, name: Optional[Union[ContainerName, str]]=None, mounts: Optional[Sequence[VolumeMount]]=None, host_config: Optional[HostConfig]=None, pull: bool=True): """Create a new container. Create a new container based on existing image. Arguments: config: ContainerConfig instance. name: name to assign to container. mounts: mount points in the container (list of strings). host_config: HostConfig instance. pull: Pull image if needed. """ # Handle convenience argument types if isinstance(name, str): name = ContainerName(name) query_params = {} arg_fields = ( ('name', 'name', None), ) json_update(query_params, locals(), arg_fields, self.api_version) # TODO: implementing handling of 'mounts' argument, whatever it might # mean. It is not properly documented... headers = {'content-type': 'application/json'} json_params = {} if isinstance(config, str): config = ContainerConfig(config) if config: json_params.update(config.json(self.api_version)) if host_config: json_params['HostConfig'] = host_config.json(self.api_version) if 'ExposedPorts' in json_params: json_params['ExposedPorts'] = { port: {} for port in json_params['ExposedPorts']} # Pull image if necessary if pull: try: self.image_inspect_raw(config.image) except ClientError: self.image_pull(config.image, output=()) response = self._post('/containers/create', params=query_params, headers=headers, data=json.dumps(json_params)) response_json = response.json() return Container(self, id=response_json['Id'])
[docs] def container_remove(self, container: Union[Container, ContainerName, str], force: Optional[bool]=None, volumes: Optional[bool]=None): """Remove a container. Remove a container and (optionally) the associated volumes. Arguments: container: The container to remove (id or name). force: Kill then remove the container. volumes: Remove the volumes associated to the container. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name query_params = {} if force is not None: query_params['force'] = force if volumes is not None: query_params['v'] = volumes self._delete('/containers/' + id_or_name, params=query_params) return
[docs] def container_start(self, container: Union[Container, ContainerName, str]): """Start a container. Arguments: container: The container to start (id or name). Returns: True if container was started. False if container was already started. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name try: self._post('/containers/{}/start'.format(id_or_name)) except HTTPError as e: if e.code == 304: return False raise e return True
[docs] def container_wait(self, container: Union[Container, ContainerName, str]) -> int: """Block until container stops. Block until container stops, then returns the exit code. Arguments: container: The container to remove (id or name). Returns: Container exit code. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name r = self._post('/containers/{}/wait'.format(id_or_name)) return r.json()['StatusCode']
[docs] def container_stop(self, container: Union[Container, ContainerName, str], timeout: Optional[int]=None): """Stop container. Stop the container, and optionally killing the container after a timeout. Arguments: container: The container to remove (id or name). timeout: Number of seconds to wait before killing the container. Returns: True if container was stopped. False if container was already stopped. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name params = {} if timeout is not None: params['t'] = timeout try: self._post('/containers/{}/stop'.format(id_or_name), params=params) except HTTPError as e: if e.code == 304: return False raise e return True
[docs] def container_restart(self, container: Union[Container, ContainerName, str], timeout: Optional[int]=None): """Restart container. Restart the container, and optionally killing the container after a timeout waiting for the container to stop. Arguments: container: The container to remove (id or name). timeout: Number of seconds to wait before killing the container. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name params = {} if timeout is not None: params['t'] = timeout self._post('/containers/{}/restart'.format(id_or_name), params=params)
[docs] def container_kill(self, container: Union[Container, ContainerName, str], signal: Optional[Signal]=None): """Kill container. Send signal to container, and (maybe) wait for the container to exit. Note: Prior to Docker version 1.8, kill succeeds (without actually doing anything) when run on existing but stopped containers. Docker 1.8 and newer fails out with a ServerError exception. Arguments: container: The container to remove (id or name). signal: Signal to send to container. """ # Handle convenience argument types if isinstance(container, str): id_or_name = container elif isinstance(container, ContainerName): id_or_name = container.name else: id_or_name = container.id or container.name params = {} if signal is not None: params['signal'] = signal self._post('/containers/{}/kill'.format(id_or_name), params=params)