"""Check :doc:`README` or :class:`django_fabfile.utils.Config` docstring
for setup instructions."""
from contextlib import contextmanager
import logging
import os
import re
from datetime import timedelta, datetime
from contextlib import nested
from itertools import groupby
from json import dumps
from boto.exception import EC2ResponseError
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dateutil.tz import tzutc
from fabric.api import env, local, put, settings, sudo, task, run
from fabric.contrib.files import append
from django_fabfile.instances import (attach_snapshot, create_temp_inst,
get_avail_dev, get_vol_dev, mount_volume)
from django_fabfile.utils import (
StateNotChangedError, add_tags, config, config_temp_ssh,
get_descr_attr, get_inst_by_id, get_region_conn, get_snap_device,
get_snap_time, get_snap_vol, timestamp, wait_for, wait_for_sudo)
USERNAME = config.get('DEFAULT', 'USERNAME')
env.update({'user': USERNAME, 'disable_known_hosts': True})
logger = logging.getLogger(__name__)
DEFAULT_TAG_NAME = config.get('DEFAULT', 'TAG_NAME')
DEFAULT_TAG_VALUE = config.get('DEFAULT', 'TAG_VALUE')
DESCRIPTION_TAG = 'Description'
SNAP_STATUSES = ['pending', 'completed'] # All but "error".
VOL_STATUSES = ['creating', 'available', 'in-use']
DETACH_TIME = config.getint('DEFAULT', 'MINUTES_FOR_DETACH') * 60
SNAP_TIME = config.getint('DEFAULT', 'MINUTES_FOR_SNAP') * 60
REPLICATION_SPEED = config.getfloat('DEFAULT', 'REPLICATION_SPEED')
class ReplicationCollisionError(Exception):
pass
[docs]def create_snapshot(vol, description='', tags=None, synchronously=True,
consistent=False):
"""Return new snapshot for the volume.
vol
volume to snapshot;
synchronously
wait for successful completion;
description
description for snapshot. Will be compiled from instnace
parameters by default;
tags
tags to be added to snapshot. Will be cloned from volume and from
instance by default.
consistent
if consistent True, script will try to freeze fs mountpoint and create
snapshot while it's freezed with all buffers dumped to disk.
"""
if vol.attach_data:
inst = get_inst_by_id(vol.region.name, vol.attach_data.instance_id)
else:
inst = None
if not description and inst:
description = dumps({
'Volume': vol.id,
'Region': vol.region.name,
'Device': vol.attach_data.device,
'Instance': inst.id,
'Type': inst.instance_type,
'Arch': inst.architecture,
'Root_dev_name': inst.root_device_name,
'Time': timestamp(),
})
def freeze_volume():
key_filename = config.get(inst.region.name, 'KEY_FILENAME')
try:
_user = config.get('SYNC', 'USERNAME')
except:
_user = USERNAME
with settings(host_string=inst.public_dns_name,
key_filename=key_filename, user=_user):
run('sync', shell=False)
run('for i in {1..20}; do sync; sleep 1; done &')
def initiate_snapshot():
if consistent:
if inst.state == 'running':
try:
freeze_volume()
except:
logger.info('FS NOT FREEZED! '
'Do you have access to this server?')
snapshot = vol.create_snapshot(description)
if tags:
add_tags(snapshot, tags)
else:
add_tags(snapshot, vol.tags)
if inst:
add_tags(snapshot, inst.tags)
logger.info('{0} started from {1} in {0.region}'.format(snapshot, vol))
return snapshot
if synchronously:
while True: # Iterate unless success and delete failed snapshots.
snapshot = initiate_snapshot()
try:
wait_for(snapshot, '100%', limit=SNAP_TIME)
assert snapshot.status == 'completed', (
'completed with wrong status {0}'.format(snapshot.status))
except (StateNotChangedError, AssertionError) as err:
logger.error(str(err) + ' - deleting')
snapshot.delete()
else:
break
else:
snapshot = initiate_snapshot()
return snapshot
@task
[docs]def backup_instance(region_name, instance_id=None, instance=None,
synchronously=False, consistent=False):
"""
Return list of created snapshots for specified instance.
region_name
instance location;
instance, instance_id
either `instance_id` or `instance` argument should be specified;
synchronously
wait for successful completion. False by default.
consistent
if True, then FS mountpoint will be frozen before snapshotting.
False by default.
"""
assert bool(instance_id) ^ bool(instance), ('Either instance_id or '
'instance should be specified')
conn = get_region_conn(region_name)
if instance_id:
instance = get_inst_by_id(conn.region.name, instance_id)
snapshots = []
for dev in instance.block_device_mapping:
vol_id = instance.block_device_mapping[dev].volume_id
vol = conn.get_all_volumes([vol_id])[0]
snapshots.append(create_snapshot(vol, synchronously=synchronously,
consistent=consistent))
return snapshots
@task
[docs]def backup_instances_by_tag(
region_name=None, tag_name=DEFAULT_TAG_NAME,
tag_value=DEFAULT_TAG_VALUE, synchronously=False, consistent=False):
"""Creates backup for all instances with given tag in region.
region_name
will be applied across all regions by default;
tag_name, tag_value
will be fetched from config by default;
synchronously
will be accomplished with assuring successful result. False by
default;
consistent
if True, then FS mountpoint will be frozen before snapshotting.
.. note:: when ``create_ami`` task compiles AMI from several
snapshots it restricts snapshot start_time difference with
10 minutes interval at most. Snapshot completion may take
much more time and due to this only asynchronously
generated snapshots will be assembled assurely."""
if region_name:
regions = [get_region_conn(region_name).region]
else:
regions = get_region_conn().get_all_regions()
for reg in regions:
conn = get_region_conn(reg.name)
filters = {'resource-type': 'instance', 'key': tag_name,
'tag-value': tag_value}
for tag in conn.get_all_tags(filters=filters):
backup_instance(reg.name, instance_id=tag.res_id,
synchronously=synchronously, consistent=consistent)
def _trim_snapshots(region, dry_run=False):
"""Delete snapshots back in time in logarithmic manner.
dry_run
just print snapshot to be deleted.
Modified version of the `boto.ec2.connection.trim_snapshots
<http://pypi.python.org/pypi/boto/2.0>_`. Licensed under MIT license
by Mitch Garnaat, 2011."""
hourly_backups = config.getint('purge_backups', 'HOURLY_BACKUPS')
daily_backups = config.getint('purge_backups', 'DAILY_BACKUPS')
weekly_backups = config.getint('purge_backups', 'WEEKLY_BACKUPS')
monthly_backups = config.getint('purge_backups', 'MONTHLY_BACKUPS')
quarterly_backups = config.getint('purge_backups', 'QUARTERLY_BACKUPS')
yearly_backups = config.getint('purge_backups', 'YEARLY_BACKUPS')
# work with UTC time, which is what the snapshot start time is reported in
now = datetime.utcnow()
last_hour = datetime(now.year, now.month, now.day, now.hour)
last_midnight = datetime(now.year, now.month, now.day)
last_sunday = datetime(now.year, now.month,
now.day) - timedelta(days=(now.weekday() + 1) % 7)
last_month = datetime.now() - relativedelta(months=1)
last_year = datetime.now() - relativedelta(years=1)
other_years = datetime.now() - relativedelta(years=2)
start_of_month = datetime(now.year, now.month, 1)
target_backup_times = []
# there are no snapshots older than 1/1/2000
oldest_snapshot_date = datetime(2000, 1, 1)
for hour in range(0, hourly_backups):
target_backup_times.append(last_hour - timedelta(hours=hour))
for day in range(0, daily_backups):
target_backup_times.append(last_midnight - timedelta(days=day))
for week in range(0, weekly_backups):
target_backup_times.append(last_sunday - timedelta(weeks=week))
for month in range(0, monthly_backups):
target_backup_times.append(last_month - relativedelta(months=month))
for quart in range(0, quarterly_backups):
target_backup_times.append(last_year - relativedelta(months=4 * quart))
for year in range(0, yearly_backups):
target_backup_times.append(other_years - relativedelta(years=year))
one_day = timedelta(days=1)
while start_of_month > oldest_snapshot_date:
# append the start of the month to the list of snapshot dates to save:
target_backup_times.append(start_of_month)
# there's no timedelta setting for one month, so instead:
# decrement the day by one,
#so we go to the final day of the previous month...
start_of_month -= one_day
# ... and then go to the first day of that previous month:
start_of_month = datetime(start_of_month.year,
start_of_month.month, 1)
temp = []
for t in target_backup_times:
if temp.__contains__(t) == False:
temp.append(t)
target_backup_times = temp
target_backup_times.reverse() # make the oldest date first
# get all the snapshots, sort them by date and time,
#and organize them into one array for each volume:
conn = get_region_conn(region.name)
all_snapshots = conn.get_all_snapshots(owner='self')
# oldest first
all_snapshots.sort(cmp=lambda x, y: cmp(x.start_time, y.start_time))
snaps_for_each_volume = {}
for snap in all_snapshots:
# the snapshot name and the volume name are the same.
# The snapshot name is set from the volume
# name at the time the snapshot is taken
volume_name = get_snap_vol(snap)
if volume_name:
# only examine snapshots that have a volume name
snaps_for_volume = snaps_for_each_volume.get(volume_name)
if not snaps_for_volume:
snaps_for_volume = []
snaps_for_each_volume[volume_name] = snaps_for_volume
snaps_for_volume.append(snap)
# Do a running comparison of snapshot dates to desired time periods,
# keeping the oldest snapshot in each
# time period and deleting the rest:
for volume_name in snaps_for_each_volume:
snaps = snaps_for_each_volume[volume_name]
snaps = snaps[:-1]
# never delete the newest snapshot, so remove it from consideration
time_period_num = 0
snap_found_for_this_time_period = False
for snap in snaps:
check_this_snap = True
while (check_this_snap and
time_period_num < target_backup_times.__len__()):
if get_snap_time(snap) < target_backup_times[time_period_num]:
# the snap date is before the cutoff date.
# Figure out if it's the first snap in this
# date range and act accordingly
#(since both date the date ranges and the snapshots
# are sorted chronologically, we know this
#snapshot isn't in an earlier date range):
if snap_found_for_this_time_period:
if not snap.tags.get('preserve_snapshot'):
if dry_run:
logger.info('Dry-trimmed {0} {1} from {2}'
.format(snap, snap.description,
snap.start_time))
else:
# as long as the snapshot wasn't marked with
# the 'preserve_snapshot' tag, delete it:
try:
conn.delete_snapshot(snap.id)
except EC2ResponseError as err:
logger.exception(str(err))
else:
logger.info('Trimmed {0} {1} from {2}'
.format(snap, snap.description,
snap.start_time))
# go on and look at the next snapshot,
# leaving the time period alone
else:
# this was the first snapshot found for this time
# period. Leave it alone and look at the next snapshot:
snap_found_for_this_time_period = True
check_this_snap = False
else:
# the snap is after the cutoff date.
# Check it against the next cutoff date
time_period_num += 1
snap_found_for_this_time_period = False
@task
[docs]def delete_broken_snapshots():
"""Delete snapshots with status 'error'."""
for region in get_region_conn().get_all_regions():
conn = get_region_conn(region.name)
filters = {'status': 'error'}
snaps = conn.get_all_snapshots(owner='self', filters=filters)
for snp in snaps:
logger.info('Deleting broken {0}'.format(snp))
snp.delete()
@task
[docs]def trim_snapshots(region_name=None, dry_run=False):
"""Delete old snapshots logarithmically back in time.
region_name
by default process all regions;
dry_run
boolean, only print info about old snapshots to be deleted."""
delete_broken_snapshots()
if region_name:
regions = [get_region_conn(region_name).region]
else:
regions = get_region_conn().get_all_regions()
for reg in regions:
logger.info('Processing {0}'.format(reg))
_trim_snapshots(reg, dry_run=dry_run)
@task
[docs]def rsync_mountpoints(src_inst, src_vol, src_mnt, dst_inst, dst_vol, dst_mnt,
encr=False):
"""Run `rsync` against mountpoints, copy disk label.
:param src_inst: source instance;
:param src_vol: source volume with label that will be copied to
dst_vol;
:param src_mnt: root or directory hierarchy to replicate;
:param dst_inst: destination instance;
:param dst_vol: destination volume, that will be marked with label
from src_vol;
:param dst_mnt: destination point where source hierarchy to place;
:param encr: True if volume is encrypted;
:type encr: bool."""
src_key_filename = config.get(src_inst.region.name, 'KEY_FILENAME')
dst_key_filename = config.get(dst_inst.region.name, 'KEY_FILENAME')
with config_temp_ssh(dst_inst.connection) as key_file:
with settings(host_string=dst_inst.public_dns_name,
key_filename=dst_key_filename):
wait_for_sudo('cp /root/.ssh/authorized_keys '
'/root/.ssh/authorized_keys.bak')
pub_key = local('ssh-keygen -y -f {0}'.format(key_file), True)
append('/root/.ssh/authorized_keys', pub_key, use_sudo=True)
if encr:
sudo('screen -d -m sh -c "nc -l 60000 | gzip -dfc | '
'sudo dd of={0} bs=16M"'
.format(get_vol_dev(dst_vol)), pty=False) # dirty magick
dst_ip = sudo(
'curl http://169.254.169.254/latest/meta-data/public-ipv4')
with settings(host_string=src_inst.public_dns_name,
key_filename=src_key_filename):
put(key_file, '.ssh/', mirror_local_mode=True)
dst_key_filename = os.path.split(key_file)[1]
if encr:
sudo('(dd if={0} bs=16M | gzip -cf --fast | nc -v {1} 60000)'
.format(get_vol_dev(src_vol), dst_ip))
else:
cmd = (
'rsync -e "ssh -i .ssh/{key_file} -o '
'StrictHostKeyChecking=no" -cahHAX --delete --inplace '
'--exclude /root/.bash_history '
'--exclude /home/*/.bash_history '
'--exclude /etc/ssh/moduli --exclude /etc/ssh/ssh_host_* '
'--exclude /etc/udev/rules.d/*persistent-net.rules '
'--exclude /var/lib/ec2/* --exclude=/mnt/* '
'--exclude=/proc/* --exclude=/tmp/* '
'{src_mnt}/ root@{rhost}:{dst_mnt}')
wait_for_sudo(cmd.format(
rhost=dst_inst.public_dns_name, dst_mnt=dst_mnt,
key_file=dst_key_filename, src_mnt=src_mnt))
label = sudo('e2label {0}'.format(get_vol_dev(src_vol)))
with settings(host_string=dst_inst.public_dns_name,
key_filename=dst_key_filename):
if not encr:
sudo('e2label {0} {1}'.format(get_vol_dev(dst_vol), label))
wait_for_sudo('mv /root/.ssh/authorized_keys.bak '
'/root/.ssh/authorized_keys')
run('sync', shell=False)
run('for i in {1..20}; do sync; sleep 1; done &')
[docs]def update_snap(src_vol, src_mnt, dst_vol, dst_mnt, encr, delete_old=False):
"""Update destination region from `src_vol`.
Create new snapshot with same description and tags. Delete previous
snapshot (if exists) of the same volume in destination region if
``delete_old`` is True."""
src_inst = get_inst_by_id(src_vol.region.name,
src_vol.attach_data.instance_id)
dst_inst = get_inst_by_id(dst_vol.region.name,
dst_vol.attach_data.instance_id)
rsync_mountpoints(src_inst, src_vol, src_mnt, dst_inst, dst_vol, dst_mnt,
encr)
src_snap = src_vol.connection.get_all_snapshots([src_vol.snapshot_id])[0]
create_snapshot(dst_vol, description=src_snap.description,
tags=src_snap.tags, synchronously=False)
if delete_old and dst_vol.snapshot_id:
old_snap = dst_vol.connection.get_all_snapshots(
[dst_vol.snapshot_id])[0]
logger.info('Deleting previous {0} in {1}'.format(old_snap,
dst_vol.region))
old_snap.delete()
@contextmanager
[docs]def create_tmp_volume(region, size):
"""Format new filesystem."""
with create_temp_inst(region) as inst:
earmarking_tag = config.get(region.name, 'TAG_NAME')
try:
vol = get_region_conn(region.name).create_volume(size,
inst.placement)
vol.add_tag(earmarking_tag, 'temporary')
vol.attach(inst.id, get_avail_dev(inst))
yield vol, mount_volume(vol, mkfs=True)
finally:
vol.detach(force=True)
wait_for(vol, 'available', limit=DETACH_TIME)
vol.delete()
[docs]def get_relevant_snapshots(
conn, tag_name=DEFAULT_TAG_NAME, tag_value=DEFAULT_TAG_VALUE,
native_only=True, filters={'status': SNAP_STATUSES}):
"""Returns snapshots with proper description."""
if tag_name and tag_value:
filters.update({'tag:{0}'.format(tag_name): tag_value})
snaps = conn.get_all_snapshots(owner='self', filters=filters)
is_described = lambda snap: get_snap_vol(snap) and get_snap_time(snap)
snaps = [snp for snp in snaps if is_described(snp)]
if native_only:
is_native = lambda snp, reg: get_descr_attr(snp, 'Region') == reg.name
snaps = [snp for snp in snaps if is_native(snp, conn.region)]
return snaps
def get_replicas(descriptions, dst_conn):
snaps = [snp for snp in dst_conn.get_all_snapshots(
owner='self', filters={'status': SNAP_STATUSES,
'description': descriptions})]
# Temporary volumes used by in-process replication.
vols = [vol for vol in dst_conn.get_all_volumes(filters={
'tag:{0}'.format(DESCRIPTION_TAG): descriptions,
'status': VOL_STATUSES})]
return snaps, vols
[docs]def get_oldest_replica(
src_conn, dst_conn, amount=1, native_only=True,
tag_name=DEFAULT_TAG_NAME, tag_value=DEFAULT_TAG_VALUE):
"""Return list with (one by default) not yet replicated snapshots."""
snaps = get_relevant_snapshots(src_conn, tag_name, tag_value, native_only)
if not snaps:
return []
# Separating out all but latest snapshots for every volume.
latest_snaps = []
snaps = sorted(snaps, key=get_snap_vol)
for vol_id, vol_snaps in groupby(snaps, key=get_snap_vol):
latest_snaps.append(sorted(vol_snaps, key=get_snap_time)[-1])
# Seeking for latests replicas in dst region for every new snapshot.
latest_descriptions = [snp.description for snp in latest_snaps]
dst_snaps, dst_vols = get_replicas(latest_descriptions, dst_conn)
snap_desc = [snp.description for snp in dst_snaps]
vol_desc = [vol.tags[DESCRIPTION_TAG] for vol in dst_vols]
# Seeking for snaps wihtout replicas.
snaps_to_replicate = [snp for snp in latest_snaps if
snp.description not in set(snap_desc + vol_desc)]
return sorted(snaps_to_replicate, key=get_snap_time)[:amount]
@task
[docs]def rsync_snapshot(src_region_name, snapshot_id, dst_region_name,
src_inst=None, dst_inst=None, force=False):
"""Duplicate the snapshot into dst_region.
src_region_name, dst_region_name
Amazon region names. Allowed to be contracted, e.g.
`ap-southeast-1` will be recognized in `ap-south` or even
`ap-s`;
snapshot_id
snapshot to duplicate;
src_inst, dst_inst
will be used instead of creating new for temporary;
force
rsync snapshot even if newer version exist.
You'll need to open port 60000 for encrypted instances replication."""
src_conn = get_region_conn(src_region_name)
src_snap = src_conn.get_all_snapshots([snapshot_id])[0]
dst_conn = get_region_conn(dst_region_name)
_src_device = get_snap_device(src_snap)
_src_dev = re.match(r'^/dev/sda$', _src_device) # check for encryption
if _src_dev:
encr = True
logger.info('Found traces of encryption')
else:
encr = None
info = 'Going to transmit {snap.volume_size} GiB {snap} {snap.description}'
if src_snap.tags.get('Name'):
info += ' of {name}'
info += ' from {snap.region} to {dst}'
logger.info(info.format(snap=src_snap, dst=dst_conn.region,
name=src_snap.tags.get('Name')))
src_vol = get_snap_vol(src_snap)
dst_snaps = get_relevant_snapshots(dst_conn, native_only=False)
vol_snaps = [snp for snp in dst_snaps if get_snap_vol(snp) == src_vol]
def sync_mountpoints(src_snap, src_vol, src_mnt, dst_vol, dst_mnt):
# Marking temporary volume with snapshot's description.
dst_vol.add_tag(DESCRIPTION_TAG, src_snap.description)
snaps, vols = get_replicas(src_snap.description, dst_vol.connection)
if not force and snaps:
raise ReplicationCollisionError(
'Stepping over {snap} - it\'s already replicated as {snaps} '
'in {snaps[0].region}'.format(snap=src_snap, snaps=snaps))
if not force and len(vols) > 1:
timeout = src_snap.volume_size / REPLICATION_SPEED
get_vol_time = lambda vol: parse(vol.create_time)
def not_outdated(vol, now):
age = now - get_vol_time(vol)
return age.days * 24 * 60 * 60 + age.seconds < timeout
now = datetime.utcnow().replace(tzinfo=tzutc())
actual_vols = [vol for vol in vols if not_outdated(vol, now)]
hunged_vols = set(vols) - set(actual_vols)
if len(actual_vols) > 1:
oldest = sorted(actual_vols, key=get_vol_time)[0]
if dst_vol.id != oldest.id:
raise ReplicationCollisionError(
'Stepping over {snap} - it\'s already replicating to '
'{vol} in {vol.region}'.format(snap=src_snap,
vol=oldest))
if len(hunged_vols) > 1:
logger.warn(
'Replication to temporary {vols} created during '
'transmitting {snap} to {reg} qualified as hunged up. '
'Starting new replication process.'.format(
snap=src_snap, vols=hunged_vols, reg=dst_vol.region))
update_snap(src_vol, src_mnt, dst_vol, dst_mnt, encr)
if vol_snaps:
dst_snap = sorted(vol_snaps, key=get_snap_time)[-1]
with nested(
attach_snapshot(src_snap, inst=src_inst, encr=encr),
attach_snapshot(dst_snap, inst=dst_inst, encr=encr)) as (
(src_vol, src_mnt), (dst_vol, dst_mnt)):
sync_mountpoints(src_snap, src_vol, src_mnt, dst_vol, dst_mnt)
else:
with nested(
attach_snapshot(src_snap, inst=src_inst, encr=encr),
create_tmp_volume(dst_conn.region, src_snap.volume_size)) as (
(src_vol, src_mnt), (dst_vol, dst_mnt)):
sync_mountpoints(src_snap, src_vol, src_mnt, dst_vol, dst_mnt)
@task
[docs]def rsync_region(
src_region_name, dst_region_name, tag_name=DEFAULT_TAG_NAME,
tag_value=DEFAULT_TAG_VALUE, native_only=True):
"""Duplicates latest snapshots with given tag into dst_region.
src_region_name, dst_region_name
every latest volume snapshot from src_region will be rsynced
to the dst_region;
tag_name, tag_value
snapshots will be filtered by tag. Tag will be fetched from
config by default;
native_only
sync only snapshots, created in the src_region_name. True by
default."""
src_conn = get_region_conn(src_region_name)
dst_conn = get_region_conn(dst_region_name)
snaps = get_relevant_snapshots(src_conn, tag_name, tag_value, native_only)
if not snaps:
return
with nested(create_temp_inst(src_conn.region),
create_temp_inst(dst_conn.region)) as (src_inst, dst_inst):
snaps = sorted(snaps, key=get_snap_vol) # Prepare for grouping.
for vol, vol_snaps in groupby(snaps, get_snap_vol):
latest_snap = sorted(vol_snaps, key=get_snap_time)[-1]
for inst in src_inst, dst_inst:
logger.debug('Rebooting {0} in {0.region} '
'to refresh attachments'.format(inst))
inst.reboot()
args = (src_region_name, latest_snap.id, dst_region_name, src_inst,
dst_inst)
try:
rsync_snapshot(*args)
except:
logger.exception('rsync of {1} from {0} to {2} failed'.format(
*args))
@task
[docs]def rsync_all_regions(primary_backup_region, secondary_backup_region):
"""
Replicates snapshots across all regions.
Sync snapshots from all but primary regions into primary. And then
snapshots from primary regions will be replicated into secondary
backup region.
:param primary_backup_region: AWS region name that keeps all
snapshots clones;
:type primary_backup_region: str
:param secondary_backup_region: AWS region name that keeps clones of
snapshots from `primary_backup_region`.
:type secondary_backup_region: str
"""
pri_name = get_region_conn(primary_backup_region).region.name
all_regs = get_region_conn().get_all_regions()
for reg in (reg for reg in all_regs if reg.name != pri_name):
rsync_region(reg.name, primary_backup_region)
rsync_region(primary_backup_region, secondary_backup_region)