Source code for rcluster.rcluster

import os
import json
from time import sleep
from inspect import signature
from pprint import PrettyPrinter
from logging import getLogger
from boto3 import session

import rcluster as rcl


[docs]class RCluster: """RCluster class object Designed to organize the information for a boto3 connection to EC2, paramiko connections using a consistent SSH key, creation of EC2 instances using a consistent key, the creation and tracking of manager and worker nodes comprising an R PSOCK cluster, and networking those manager and worker nodes to access within an RStudio Server session. .. automethod:: __repr__ .. automethod:: __setattr__ """ def __init__(self, aws_access_key_id, aws_secret_access_key, region_name, instance_conf, manager_runtime=None, worker_runtime=None, key_path=None, ip_ref='public_ip_address', ver=rcl.__ver__, purge=False): """Initialize the RCluster object. :param aws_access_key_id: AWS access key provided to boto3.session.Session() :param aws_secret_access_key: AWS secret access key provided to boto3.session.Session() :param region_name: The accessibility region provided to boto3.session.Session() :param instance_conf: Dictionary defining {'ami': '', 'type': ''} for instances (where 'ami' is the AMI ID for the instances and type is the instance type used); can also contain other parameters to boto3's EC2.ServiceResource.create_instances :param manager_runtime: String containing shell runtime command for the manager instance :param worker_runtime: String containing shell runtime command for the worker instance :param key_path: The path to the key used to create EC2 instances and to connect to them using paramiko clients :param ip_ref: Whether to provide the user with the public IP or private IP :param ver: Designated to stamp Security Groups, Placement Groups, keys, and all instances launched """ self._kwargs = list(signature(RCluster).parameters.keys()) self._kwargs.remove('purge') self._config = {} self._log = getLogger(__name__) self.ses = session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) self.ec2 = self.ses.resource('ec2') if purge: _ec2Purge(self.ec2, ver) if not key_path: self.key_name = ver key_path = rcl._setData('pem') kp = self.ec2.create_key_pair(KeyName=ver) with open(key_path, 'w') as out: out.write(kp.key_material) else: self.key_name = os.path.splitext(os.path.basename(key_path))[0] if 'SecurityGroups' not in instance_conf: sg = self.ec2.create_security_group( GroupName=ver, Description='22 and 8787 open, permissive internal traffic.' ) instance_conf['SecurityGroups'] = [ver] sleep(1) # Security group may not "exist" in time for next call sg.authorize_ingress(IpProtocol='tcp', FromPort=22, ToPort=22, CidrIp='0.0.0.0/0') sg.authorize_ingress(IpProtocol='tcp', FromPort=8787, ToPort=8787, CidrIp='0.0.0.0/0') sg.authorize_ingress(SourceSecurityGroupName=ver) if 'Placement' not in instance_conf: pg = self.ec2.create_placement_group(GroupName=ver, Strategy='cluster') instance_conf['Placement'] = {'GroupName': ver} for key in self._kwargs: self.__setattr__(key, locals()[key])
[docs] def __repr__(self): """Indicates RCluster and pretty prints the _config dictionary""" return 'RCluster class object\n' + PrettyPrinter().pformat( self._config)
[docs] def __setattr__(self, key, value): """ Redefined to keep an updated version of the :class:`~rcluster.RCluster` configuration options saved. Allows for easy exporting, duplication, and modification of configurations. See :meth:`~.rcluster.RCluster.fromConfig` and :meth:`~.rcluster.RCluster.writeConfig` """ if '_config' in self.__dict__ and key in self._kwargs: self._log.debug('Setting configuration attribute %s', key) self._config[key] = value super().__setattr__(key, value)
[docs] def writeConfig(self, fn): """Write out RCluster configuration data as JSON. :param fn: The filename to be written, will overwrite previous file """ with open(fn, 'w') as out: json.dump(self._config, out, indent=2, sort_keys=True)
[docs] def fromConfig(fn, **kwargs): """ Use RCluster JSON configuration to create RCluster object. Prompts the user to input mandatory configuration values that are missing (i.e., AWS access credentials). :param fn: The filename containing RCluster configuration data :param kwargs: Alternate or supplement RCluster configuration; will override the content of fn """ with open(fn, 'r') as out: dic = json.load(out) dic.update(kwargs) for key in sorted(dic): if dic[key] is None: dic[key] = input(key + ': ') return RCluster(**dic)
[docs] def createInstances(self, n_instances, **kwargs): """Create EC2 instances using RCluster's configuration. :param n_instances: The number of instances to be created :param kwargs: arbitrary arguments to boto3 Session Resource ec2.create_instances; will supersede RCluster.instance_conf content """ self._log.debug('Creating %d instances.', n_instances) conf = self.instance_conf.copy() conf.update(kwargs) instances = self.ec2.create_instances( DryRun=False, MinCount=n_instances, MaxCount=n_instances, KeyName=self.key_name, **conf ) sleep(5) instances[0].wait_until_running() for instance in instances: instance.create_tags(DryRun=False, Tags=[{'Key': 'rcluster', 'Value': self.ver}]) ids = [instance.id for instance in instances] return list(self.ec2.instances.filter(InstanceIds=ids))
[docs] def createCluster(self, n_workers=1, setup_pause=60, **kwargs): """Initialize the cluster. Launch a manager instance and n_workers worker instances, automating the configuration of their shared networking. :param n_workers: Number of worker instances to launch (default 1) :param setup_pause: Pause time to allow manager and workers to boot before attempting configuration steps (default 60) """ self._log.debug('Creating cluster of', n_workers, 'workers.') instances = self.createInstances(n_workers + 1, **kwargs) manager = instances[0] manager.create_tags(DryRun=False, Tags=[{'Key': self.ver, 'Value': 'master'}]) workers = instances[1:] sleep(setup_pause) self.manager_private = getattr(manager, 'private_ip_address') self.access_ip = getattr(manager, self.ip_ref) try: # TODO: thread self.hostfile = '' for worker in workers: self._log.debug('Configuring Worker %s', worker.instance_id) client = self.connect(worker) cpus = rcl.cpuCount(client) self.hostfile += (worker.private_ip_address + '\n') * cpus if self.worker_runtime: rcl.pmkCmd(client, self.worker_runtime.format(**self.__dict__)) self._log.debug('Configuring manager %s', manager.instance_id) client = self.connect(manager) cpus = rcl.cpuCount(client) - 1 self.hostfile += (manager.private_ip_address + '\n') * cpus if self.manager_runtime: rcl.pmkCmd(client, self.manager_runtime.format(**self.__dict__)) except Exception as err: self._log.error('Error during instance configuration: %s', err) raise err
[docs] def connect(self, instance): """ Create SSH connection to boto3.EC2.Instance as paramiko.client. :param instance: A boto3.EC2.Instance object """ host = getattr(instance, self.ip_ref) key_path = self.key_path return rcl.pmkConnect(host, key_path)
[docs] def retrieveAccessIp(self): """ Identify the master's access IP address (if a master has been defined). """ master = list(self.ec2.instances.filter( DryRun=False, Filters=[ {'Name': 'tag-key', 'Values': [self.ver]}, {'Name': 'tag-value', 'Values': ['master']}, {'Name': 'instance-state-name', 'Values': ['running', 'pending']} ])) if master: return getattr(master[0], self.ip_ref) else: self._log.info("No active rcluster found")
[docs] def terminateInstances(self, ver=None): """ Terminate EC2.Instance objects created by the current configuration file. """ if not ver: ver = self.ver instances = self.ec2.instances.filter( DryRun=False, Filters=[ {'Name': 'tag-key', 'Values': ['rcluster']}, {'Name': 'tag-value', 'Values': [ver]}, {'Name': 'instance-state-name', 'Values': ['running', 'pending']} ]) if instances: [instance.terminate() for instance in instances] else: self._log.debug("No instances terminated.")
[docs] def createAmi(self, base=None, setup_fn=None, ver=None, update_image=True, terminate=True, wait=True): """ Create an AMI, returning the AMI ID. :param base: boto3.EC2.Instance object or nothing; optional to allow for snapshotting. :param setup_fn: The shell script used to configure the instance; optional to allow for snapshotting. :param ver: Name of AMI, defaults to self.ver. :param update_image: Flag; whether to change the RCluster's instance_conf AMI ID to that of the new image. :param terminate: Flag; whether to terminate the instance used to build the AMI (useful for debugging). """ if not base: self._log.debug('Creating base instance for AMI generation.') base = self.createInstances(1, InstanceType='m4.large')[0] sleep(20) if setup_fn: client = self.connect(base) sftp_conn = client.open_sftp() sftp_conn.put(setup_fn, 'setup.sh') self._log.debug('Setup script %s, running configuration.', setup_fn) rcl.pmkCmd(client, 'sudo bash setup.sh') if not ver: ver = self.ver self._log.debug('Creating AMI %s', self.ver) image = base.create_image( DryRun=False, Name=ver, Description="RCluster AMI", NoReboot=False ) base.wait_until_running() if wait: while 'available' not in self.ec2.Image(image.id).state: self._log.debug('Waiting for AMI %s to be available', image.id) sleep(20) if terminate: base.terminate() if update_image: self.instance_conf['ImageId'] = image.id return image.id
[docs]def _ec2Purge(ec2_res, ver): """ Utility to clear an AWS account of previous RCluster settings (useful for development). Removes resources associated with a provided version: * Terminates instances with the tag key 'rcluster' and value `ver` * Deregisters AMI named `ver` * Deletes key-pair named `ver` * Deletes placement group named `ver` * Deletes security group named `ver` :param ec2_res: A boto3.EC2.ServiceResource :param ver: The "version" to delete """ log = getLogger(__name__) log.info('Purging %s configurations', ver) instances = ec2_res.instances.filter( DryRun=False, Filters=[ {'Name': 'tag-key', 'Values': ['rcluster']}, {'Name': 'tag-value', 'Values': [ver]}, {'Name': 'instance-state-name', 'Values': ['running', 'pending']} ]) [instance.terminate() for instance in instances] images = ec2_res.images.filter( DryRun=False, Filters=[{'Name': 'name', 'Values': [ver]}] ) [image.deregister() for image in images] key_pairs = ec2_res.key_pairs.filter( DryRun=False, Filters=[{'Name': 'key-name', 'Values': [ver]}] ) [key_pair.delete() for key_pair in key_pairs] placement_groups = ec2_res.placement_groups.filter( DryRun=False, Filters=[{'Name': 'group-name', 'Values': [ver]}] ) [placement_group.delete() for placement_group in placement_groups] security_groups = ec2_res.security_groups.filter( DryRun=False, Filters=[{'Name': 'group-name', 'Values': [ver]}] ) [security_group.delete() for security_group in security_groups]