6. Showcase: create a virtual cluster from scratch

In this section we will create a virtual cluster, from scratch.

Requirements:

  • A synnefo deployment with functional Astakos,

    Pithos+, Plankton and Cyclades services.

  • A kamaki setup, configured with a default cloud (see how to do this with

    kamaki as a shell command , or a python library.

  • An image stored at file ./my_image.diskdump that can run on a predefined

    hardware flavor, identifiable by the flavor id 42 (see how to create an image with the synnefo image creator ).

This is the pseudocode:

  1. Get credentials and service endpoints, with kamaki config and the

    Astakos identity and account services

  2. Upload the image file to the Pithos+ object-store service

  3. Register the image file to the Plankton image service

  4. Create a number of virtual servers to the Cyclades compute service

6.1. Credentials and endpoints

We assume that the kamaki configuration file contains at least one cloud configuration, and this configuration is also set as the default cloud for kamaki. A cloud configuration is basically a name for the cloud, an authentication URL and an authentication TOKEN: the credentials we are looking for!

This is the plan:

  1. Get the credentials from the kamaki configuration
  2. Initialize an AstakosClient and test the credentials
  3. Get the endpoints for all services
from sys import stderr
from kamaki.cli.config import Config, CONFIG_PATH
from kamaki.clients.astakos import AstakosClient, ClientError

#  Initialize Config with default values.
cnf = Config()

#  1. Get the credentials
#  Get default cloud name
try:
    cloud_name = cnf.get('global', 'default_cloud')
except KeyError:
    stderr.write('No default cloud set in file %s\n' % CONFIG_PATH)
    raise

#  Get cloud authentication URL and TOKEN
try:
    AUTH_URL = cnf.get_cloud(cloud_name, 'url')
except KeyError:
    stderr.write('No authentication URL in cloud %s\n' % cloud_name)
    raise
try:
    AUTH_TOKEN = cnf.get_cloud(cloud_name, 'token')
except KeyError:
    stderr.write('No token in cloud %s\n' % cloud_name)
    raise

#  2. Test the credentials
#  Test authentication credentials
try:
    auth = AstakosClient(AUTH_URL, AUTH_TOKEN)
    auth.authenticate()
except ClientError:
    stderr.write('Athentication failed with url %s and token %s\n' % (
        AUTH_URL, AUTH_TOKEN))
    raise

#  3. Get the endpoints
#  Identity, Account --> astakos
#  Compute --> cyclades
#  Object-store --> pithos
#  Image --> plankton
try:
    endpoints = dict(
        astakos=AUTH_URL,
        cyclades=auth.get_service_endpoints('compute')['publicURL'],
        pithos=auth.get_service_endpoints('object-store')['publicURL'],
        plankton=auth.get_service_endpoints('image')['publicURL']
        )
    user_id = auth.user_info()['id']
except ClientError:
    stderr.write(
        'Failed to get user id and endpoints from the identity server\n')
    raise

6.2. Upload the image

We assume there is an image file at the current local directory, at ./my_image.diskdump and we need to upload it to a Pithos+ container. We also assume the contains does not currently exist. We will name it images.

This is the plan:

  1. Initialize a Pithos+ client
  2. Create the container images
  3. Upload the local file to the container
from os.path import abspath
from kamaki.clients.pithos import PithosClient

CONTAINER = 'images'
IMAGE_FILE = 'my_image.diskdump'

#  1. Initialize Pithos+ client and set account to current user
try:
    pithos = PithosClient(endpoints['pithos'], AUTH_TOKEN)
except ClientError:
    stderr.write('Failed to initialize a Pithos+ client\n')
    raise
pithos.account = user_id

#  2. Create the container "images" and let pithos client work with that
try:
    pithos.create_container('images')
except ClientError:
    stderr.write('Failed to create container "image"\n')
    raise
pithos.container = CONTAINER

#  3. Upload
with open(abspath(IMAGE_FILE)) as f:
    try:
        pithos.upload_object(IMAGE_FILE, f)
    except ClientError:
        stderr.write('Failed to upload file %s to container %s\n' % (
            IMAGE_FILE, CONTAINER))
        raise

6.3. Register the image

Now the image is located at pithos://<user_id>/images/my_image.diskdump and we want to register it to the Plankton image service.

from kamaki.clients.image import ImageClient

IMAGE_NAME = 'My image'
IMAGE_LOCATION = (user_id, CONTAINER, IMAGE_FILE)

#  3.1 Initialize ImageClient
try:
    plankton = ImageClient(endpoints['plankton'], AUTH_TOKEN)
except ClientError:
    stderr.write('Failed to initialize the Image client client\n')
    raise

#  3.2 Register the image
try:
    image = plankton.image_register(IMAGE_NAME, IMAGE_LOCATION)
except ClientError:
    stderr.write('Failed to register image %s\n' % IMAGE_NAME)
    raise

6.4. Create the virtual cluster

In order to build a virtual cluster, we need some information:

  • an image id. We can get them from image[‘id’] (the id of the image we

    have just created)

  • a hardware flavor. Assume we have picked the flavor with id 42

  • a set of names for our virtual servers. We will name them cluster1,

    cluster2, etc.

Here is the plan:

  1. Initialize a Cyclades/Compute client

  2. Create a number of virtual servers. Their name should be prefixed as

    “cluster”

#  4.  Create  virtual  cluster
from kamaki.clients.cyclades import CycladesClient

FLAVOR_ID = 42
IMAGE_ID = image['id']
CLUSTER_SIZE = 2
CLUSTER_PREFIX = 'cluster'

#  4.1 Initialize a cyclades client
try:
    cyclades = CycladesClient(endpoints['cyclades'], AUTH_TOKEN)
except ClientError:
    stderr.write('Failed to initialize cyclades client\n')
    raise

#  4.2 Create 2 servers prefixed as "cluster"
servers = []
for i in range(1, CLUSTER_SIZE + 1):
    server_name = '%s%s' % (CLUSTER_PREFIX, i)
    try:
        servers.append(
            cyclades.create_server(server_name, FLAVOR_ID, IMAGE_ID))
    except ClientError:
        stderr.write('Failed while creating server %s\n' % server_name)
        raise

6.5. Some improvements

6.5.1. Progress Bars

Uploading an image might take a while. You can wait patiently, or you can use a progress generator. Even better, combine a generator with the progress bar package that comes with kamaki. The upload_object method accepts two generators as parameters: one for calculating local file hashes and another for uploading

from progress.bar import Bar

def hash_gen(n):
    bar = Bar('Calculating hashes...')
    for i in bar.iter(range(int(n))):
        yield
    yield

def upload_gen(n):
    bar = Bar('Uploading...')
    for i in bar.iter(range(int(n))):
        yield
    yield

...
pithos.upload_object(
    IMAGE_FILE, f, hash_cb=hash_gen, upload_cb=upload_gen)

We can create a method to produce progress bar generators, and use it in other methods as well:

try:
    from progress.bar import Bar

    def create_pb(msg):
        def generator(n):
            bar=Bar(msg)
            for i in bar.iter(range(int(n))):
                yield
            yield
        return generator
except ImportError:
    stderr.write('Suggestion: install python-progress\n')
    def create_pb(msg):
        return None

...
pithos.upload_object(
    IMAGE_FILE, f,
    hash_cb=create_pb('Calculating hashes...'),
    upload_cb=create_pb('Uploading...'))

6.5.2. Wait for servers to built

When a create_server method is finished successfully, a server is being built. Usually, it takes a while for a server to built. Fortunately, there is a wait method in the kamaki cyclades client. It can use a progress bar too!

#  4.2 Create 2 servers prefixed as "cluster"
...

# 4.3 Wait for servers to built
for server in servers:
    cyclades.wait_server(server['id'])

6.5.3. Asynchronous server creation

In case of a large virtual cluster, it might be faster to spawn the servers with asynchronous requests. Kamaki clients offer an automated mechanism for asynchronous requests.

#  4.2 Create 2 servers prefixed as "cluster"
create_params = [dict(
    name='%s%s' % (CLUSTER_PREFIX, i),
    flavor_id=FLAVOR_ID,
    image_id=IMAGE_ID) for i in range(1, CLUSTER_SIZE + 1)]
try:
    servers = cyclades.async_run(cyclades.create_server, create_params)
except ClientError:
    stderr.write('Failed while creating servers\n')
    raise

6.5.4. Clean up virtual cluster

We need to clean up Cyclades from servers left from previous cluster creations. This clean up will destroy all servers prefixed with “cluster”. It will run before the cluster creation:

#  4.2 Clean up virtual cluster
to_delete = [server for server in cyclades.list_servers(detail=True) if (
    server['name'].startswith(CLUSTER_PREFIX))]
for server in to_delete:
    cyclades.delete_server(server['id'])
for server in to_delete:
    cyclades.wait_server(
        server['id'], server['status'],
        wait_cb=create_pb('Deleting %s...' % server['name']))

#  4.3 Create 2 servers prefixed as "cluster"
...

6.5.5. Inject ssh keys

When a server is created, the returned value contains a filed “adminPass”. This field can be used to manually log into the server.

An easier way is to inject the ssh keys of the users who are going to use the virtual servers.

Assuming that we have collected the keys in a file named rsa.pub, we can inject them into each server, with the personality argument

SSH_KEYS = 'rsa.pub'

...

#  4.3 Create 2 servers prefixed as "cluster"
personality = []
if SSH_KEYS:
    with open(abspath(SSH_KEYS)) as f:
        personality.append(dict(
            contents=b64encode(f.read()),
            path='/root/.ssh/authorized_keys',
            owner='root', group='root', mode='0600')
        personality.append(dict(
            contents=b64encode('StrictHostKeyChecking no'),
            path='/root/.ssh/config',
            owner='root', group='root', mode='0600'))

create_params = [dict(
    name='%s%s' % (CLUSTER_PREFIX, i),
    flavor_id=FLAVOR_ID,
    image_id=IMAGE_ID,
    personality=personality) for i in range(1, CLUSTER_SIZE + 1)]
...

6.5.6. Save server passwords in a file

A last touch: define a local file to store the created server information, including the superuser password.

#  4.4 Store passwords in file
SERVER_INFO = 'servers.txt'
with open(abspath(SERVER_INFO), 'w+') as f:
    from json import dump
    dump(servers, f, intend=2)

#  4.5 Wait for 2 servers to built
...

6.5.7. Errors and logs

Developers may use the kamaki tools for error handling and logging, or implement their own methods.

To demonstrate, we will modify the container creation code to warn users if the container already exists. We need a stream logger for the warning and a knowledge of the expected return values for the create_container method.

First, let’s get the logger.

from kamaki.cli.logger import add_stream_logger, get_logger

add_stream_logger(__name__)
log = get_logger(__name__)

The create_container method makes an HTTP request to the pithos server. It considers the request succesfull if the status code of the response is 201 (created) or 202 (accepted). These status codes mean that the container has been created or that it was already there anyway, respectively.

We will force create_container to raise an error in case of a 202 response. This can be done by instructing create_container to accept only 201 as a successful status.

try:
    pithos.create_container(CONTAINER, success=(201, ))
except ClientError as ce:
    if ce.status in (202, ):
        log.warning('Container %s already exists' % CONTAINER')
    else:
        log.debug('Failed to create container %s' % CONTAINER)
        raise
log.info('Container %s is ready' % CONTAINER)

6.6. create a cluster from scratch

We are ready to create a module that uses kamaki to create a cluster from scratch. We revised the code by grouping functionality in methods and using logging more. We also added some command line interaction candy.

#!/usr/bin/env python

from sys import argv
from os.path import abspath
from base64 import b64encode
from kamaki.clients import ClientError
from kamaki.cli.logger import get_logger, add_file_logger
from logging import DEBUG

#  Define loggers
log = get_logger(__name__)
add_file_logger('kamaki.clients', DEBUG, '%s.log' % __name__)
add_file_logger(__name__, DEBUG, '%s.log' % __name__)

#  Create progress bar generator
try:
    from progress.bar import Bar

    def create_pb(msg):
        def generator(n):
            bar=Bar(msg)
            for i in bar.iter(range(int(n))):
                yield
            yield
        return generator
except ImportError:
    log.warning('Suggestion: install python-progress')
    def create_pb(msg):
        return None


#  kamaki.config
#  Identity,Account / Astakos

def init_astakos():
    from kamaki.clients.astakos import AstakosClient
    from kamaki.cli.config import Config, CONFIG_PATH

    print(' Get the credentials')
    cnf = Config()

    #  Get default cloud name
    try:
        cloud_name = cnf.get('global', 'default_cloud')
    except KeyError:
        log.debug('No default cloud set in file %' % CONFIG_PATH)
        raise

    try:
        AUTH_URL = cnf.get_cloud(cloud_name, 'url')
    except KeyError:
        log.debug('No authentication URL in cloud %s' % cloud_name)
        raise
    try:
        AUTH_TOKEN = cnf.get_cloud(cloud_name, 'token')
    except KeyError:
        log.debug('No token in cloud %s' % cloud_name)
        raise

    print(' Test the credentials')
    try:
        auth = AstakosClient(AUTH_URL, AUTH_TOKEN)
        auth.authenticate()
    except ClientError:
        log.debug('Athentication failed with url %s and token %s' % (
            AUTH_URL, AUTH_TOKEN))
        raise

    return auth, AUTH_TOKEN


def endpoints_and_user_id(auth):
    print(' Get the endpoints')
    try:
        endpoints = dict(
            astakos=auth.get_service_endpoints('identity')['publicURL'],
            cyclades=auth.get_service_endpoints('compute')['publicURL'],
            pithos=auth.get_service_endpoints('object-store')['publicURL'],
            plankton=auth.get_service_endpoints('image')['publicURL']
            )
        user_id = auth.user_info()['id']
    except ClientError:
        print('Failed to get endpoints & user_id from identity server')
        raise
    return endpoints, user_id


#  Object-store / Pithos+

def init_pithos(endpoint, token, user_id):
    from kamaki.clients.pithos import PithosClient

    print(' Initialize Pithos+ client and set account to user uuid')
    try:
        return PithosClient(endpoint, token, user_id)
    except ClientError:
        log.debug('Failed to initialize a Pithos+ client')
        raise


def upload_image(pithos, container, image_path):

    print(' Create the container "images" and use it')
    try:
        pithos.create_container(container, success=(201, ))
    except ClientError as ce:
        if ce.status in (202, ):
            log.warning('Container %s already exists' % container)
        else:
            log.debug('Failed to create container %s' % container)
            raise
    pithos.container = container

    print(' Upload to "images"')
    with open(abspath(image_path)) as f:
        try:
            pithos.upload_object(
                image_path, f,
                hash_cb=create_pb('  Calculating hashes...'),
                upload_cb=create_pb('  Uploading...'))
        except ClientError:
            log.debug('Failed to upload file %s to container %s' % (
                image_path, container))
            raise


#  Image / Plankton

def init_plankton(endpoint, token):
    from kamaki.clients.image import ImageClient

    print(' Initialize ImageClient')
    try:
        return ImageClient(endpoint, token)
    except ClientError:
        log.debug('Failed to initialize the Image client')
        raise


def register_image(plankton, name, user_id, container, path):

    image_location = (user_id, container, path)
    print(' Register the image')
    try:
         return plankton.register(name, image_location)
    except ClientError:
        log.debug('Failed to register image %s' % name)
        raise


#  Compute / Cyclades

def init_cyclades(endpoint, token):
    from kamaki.clients.cyclades import CycladesClient

    print(' Initialize a cyclades client')
    try:
        return CycladesClient(endpoint, token)
    except ClientError:
        log.debug('Failed to initialize cyclades client')
        raise


class Cluster(object):

    def __init__(self, cyclades, prefix, flavor_id, image_id, size):
        self.client = cyclades
        self.prefix, self.size = prefix, int(size)
        self.flavor_id, self.image_id = flavor_id, image_id

    def list(self):
        return [s for s in self.client.list_servers(detail=True) if (
            s['name'].startswith(self.prefix))]

    def clean_up(self):
        to_delete = self.list()
        print('  There are %s servers to clean up' % len(to_delete))
        for server in to_delete:
            self.client.delete_server(server['id'])
        for server in to_delete:
            self.client.wait_server(
                server['id'], server['status'],
                wait_cb=create_pb(' Deleting %s...' % server['name']))

    def _personality(self, ssh_keys_path='', pub_keys_path=''):
        personality = []
        if ssh_keys_path:
            with open(abspath(ssh_keys_path)) as f:
                personality.append(dict(
                    contents=b64encode(f.read()),
                    path='/root/.ssh/id_rsa',
                    owner='root', group='root', mode='0600'))
        if pub_keys_path:
            with open(abspath(pub_keys_path)) as f:
                personality.append(dict(
                    contents=b64encode(f.read()),
                    path='/root/.ssh/authorized_keys',
                    owner='root', group='root', mode='0600'))
        if ssh_keys_path or pub_keys_path:
                personality.append(dict(
                    contents=b64encode('StrictHostKeyChecking no'),
                    path='/root/.ssh/config',
                    owner='root', group='root', mode='0600'))
        return personality

    def create(self, ssh_k_path='', pub_k_path='', server_log_path=''):
        print('\n Create %s servers prefixed as %s' % (
            self.size, self.prefix))
        servers = []
        for i in range(1, self.size + 1):
            try:
                server_name = '%s%s' % (self.prefix, i)
                servers.append(self.client.create_server(
                    server_name, self.flavor_id, self.image_id,
                    personality=self._personality(ssh_k_path, pub_k_path)))
            except ClientError:
                log.debug('Failed while creating server %s' % server_name)
                raise

        if server_log_path:
            print(' Store passwords in file %s' % server_log_path)
            with open(abspath(server_log_path), 'w+') as f:
                from json import dump
                dump(servers, f, indent=2)

        print(' Wait for %s servers to built' % self.size)
        for server in servers:
            new_status = self.client.wait_server(
                server['id'],
                wait_cb=create_pb(' Creating %s...' % server['name']))
            print(' Status for server %s is %s' % (
                server['name'], new_status or 'not changed yet'))
        return servers


def main(opts):

    print('1.  Credentials  and  Endpoints')
    auth, token = init_astakos()
    endpoints, user_id = endpoints_and_user_id(auth)

    print('2.  Upload  the  image  file')
    pithos = init_pithos(endpoints['pithos'], token, user_id)

    upload_image(pithos, opts.container, opts.imagefile)

    print('3.  Register  the  image')
    plankton = init_plankton(endpoints['plankton'], token)

    image = register_image(
        plankton, 'my image', user_id, opts.container, opts.imagefile)

    print('4.  Create  virtual  cluster')
    cluster = Cluster(
        cyclades = init_cyclades(endpoints['cyclades'], token),
        prefix=opts.prefix,
        flavor_id=opts.flavorid,
        image_id=image['id'],
        size=opts.clustersize)
    if opts.delete_stale:
        cluster.clean_up()
    servers = cluster.create(
        opts.sshkeypath, opts.pubkeypath, opts.serverlogpath)

    #  Group servers
    cluster_servers = cluster.list()

    active = [s for s in cluster_servers if s['status'] == 'ACTIVE']
    print('%s cluster servers are ACTIVE' % len(active))

    attached = [s for s in cluster_servers if s['attachments']]
    print('%s cluster servers are attached to networks' % len(attached))

    build = [s for s in cluster_servers if s['status'] == 'BUILD']
    print('%s cluster servers are being built' % len(build))

    error = [s for s in cluster_servers if s['status'] in ('ERROR')]
    print('%s cluster servers failed (ERROR satus)' % len(error))


if __name__ == '__main__':

    #  Add some interaction candy
    from optparse import OptionParser

    kw = {}
    kw['usage'] = '%prog [options]'
    kw['description'] = '%prog deploys a compute cluster on Synnefo w. kamaki'

    parser = OptionParser(**kw)
    parser.disable_interspersed_args()
    parser.add_option('--prefix',
                      action='store', type='string', dest='prefix',
                      help='The prefix to use for naming cluster nodes',
                      default='cluster')
    parser.add_option('--clustersize',
                      action='store', type='string', dest='clustersize',
                      help='Number of virtual cluster nodes to create ',
                      default=2)
    parser.add_option('--flavor-id',
                      action='store', type='int', dest='flavorid',
                      metavar='FLAVOR ID',
                      help='Choose flavor id for the virtual hardware '
                           'of cluster nodes',
                      default=42)
    parser.add_option('--image-file',
                      action='store', type='string', dest='imagefile',
                      metavar='IMAGE FILE PATH',
                      help='The image file to upload and register ',
                      default='my_image.diskdump')
    parser.add_option('--delete-stale',
                      action='store_true', dest='delete_stale',
                      help='Delete stale servers from previous runs, whose '
                           'name starts with the specified prefix, see '
                           '--prefix',
                      default=False)
    parser.add_option('--container',
                      action='store', type='string', dest='container',
                      metavar='PITHOS+ CONTAINER',
                      help='The Pithos+ container to store image file',
                      default='images')
    parser.add_option('--ssh-key-path',
                      action='store', type='string', dest='sshkeypath',
                      metavar='PATH OF SSH KEYS',
                      help='The ssh keys to inject to server (e.g., id_rsa) ',
                      default='')
    parser.add_option('--pub-key-path',
                      action='store', type='string', dest='pubkeypath',
                      metavar='PATH OF PUBLIC KEYS',
                      help='The public keys to inject to server',
                      default='')
    parser.add_option('--server-log-path',
                      action='store', type='string', dest='serverlogpath',
                      metavar='FILE TO LOG THE VIRTUAL SERVERS',
                      help='Where to store information on created servers '
                           'including superuser passwords',
                      default='')

    opts, args = parser.parse_args(argv[1:])

    main(opts)