Source code for tawhiri.download

# Copyright 2014 (C) Daniel Richman
#
# This file is part of Tawhiri.
#
# Tawhiri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Tawhiri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Tawhiri.  If not, see <http://www.gnu.org/licenses/>.

"""
Wind :class:`tahwiri.wind.Dataset` Downloader

Downloaded data arrives in `GRIB <http://en.wikipedia.org/wiki/GRIB>`_
format, although three quarters of the records in the downloaded file are
ignored. The records that are used can also be written to a new grib file
as they are unpacked (which is therefore somewhat smaller, as it is
still compressed and only contains the useful bits).
"""


from __future__ import division

import logging
import logging.handlers
import argparse
import sys
import os
import os.path
import errno
import shutil
import math
import tempfile
from collections import namedtuple
from time import time
from datetime import datetime, timedelta
from socket import inet_ntoa
import gevent.local
from gevent import sleep
from gevent import greenlet
from gevent.timeout import Timeout
from gevent.event import Event
from gevent.pool import Group
from gevent.queue import PriorityQueue
from gevent.dns import resolve_ipv4
from gevent.coros import RLock
import gevent.socket
import ftplib
import itertools
import numpy as np
import pygrib

from .dataset import Dataset


__all__ = ["DatasetDownloader", "DownloadDaemon", "main", "unpack_grib"]


logger = logging.getLogger("tawhiri.downloader")


assert Dataset.element_type == 'float32'
assert Dataset.axes._fields[0:3] == ("hour", "pressure", "variable")


def make_checklist():
    """
    Create a matrix of bools with dimensions ``Dataset.shape[0:3]``

    ... i.e., a element for every GRIB record we need when downloading
    a new dataset
    """
    return np.zeros(Dataset.shape[0:3], dtype=np.bool_)


_grib_name_to_variable = {"Geopotential Height": "height",
                          "U component of wind": "wind_u",
                          "V component of wind": "wind_v"}

[docs]def unpack_grib(filename, dataset=None, checklist=None, gribmirror=None, assert_hour=None, file_checklist=None, callback=None): """ Unpack the GRIB file at `filename` ... into `dataset` ... setting the cell corresponding to each GRIB record in `checklist` to ``True`` (see :meth:`make_checklist`) ... copying the GRIB records we care about into `gribmirror` ... checking that the `forecastTime` matches `assert_hour` ... checking that the GRIB records in this file (that we care about) exactly match the set of ``(forecast time, level, variable)`` tuples in `file_checklist` ... calling `callback` after processing each record, with arguments ``(pass, location indices, location names)`` (where location is ``(forecast time, level, variable)``) `callback` must _not_ edit `dataset`, `checklist` or `gribmirror`, or yield to a greenlet that will (hence :attr:`DownloadDaemon.unpack_lock`). `callback` is mainly used to yield to other greenlets doing IO (i.e., downloading other files) while we do the CPU intensive task of unpacking GRIB data. The data is unpacked in two passes; the first * checks the shape and forecast time of each record, * checks the axes of the first record (i.e., the latitudes and longitudes each point corresponds to) - this is really slow, so is only done once, * checks the contents of the file exactly matches `file_checklist` (excluding records we don't care about), * checks that no elements of `checklist` that we're about to unpack are already marked as having been unpacked (i.e., ``True``). The second pass copies the data in each record to its correct location in `dataset`, writes a copy to `gribmirror` and marks the correct place in `checklist` as True. :exc:`ValueError` is raised in case of any problems. """ # callback must _not_ edit dataset/checklist/gribmirror # or yield to a greenlet that will (see DownloadDaemon.unpack_lock) if dataset is not None: dataset_array = \ np.ndarray(shape=Dataset.shape, dtype=np.float32, buffer=dataset.array, offset=0, order='C') else: dataset_array = None if file_checklist is not None: file_checklist = file_checklist.copy() grib = pygrib.open(filename) try: # pass one: check the contents of the file _check_grib_file(grib, filename, dataset_array, checklist, assert_hour, file_checklist, callback) # pass two: unpack for record, location, location_name in _grib_records(grib): if dataset_array is not None: dataset_array[location] = record.values if gribmirror is not None: gribmirror.write(record.tostring()) if checklist is not None: checklist[location] = True logger.debug("unpacked %s %s %s", filename, location_name, location) if callback is not None: callback(True, location, location_name) logger.info("unpacked %s", filename) finally: grib.close()
def _check_grib_file(grib, filename, dataset_array, checklist, assert_hour, file_checklist, callback): """ The first pass over the GRIB file, checking its contents * checks the shape and forecast time of each record, * checks the axes of the first record (i.e., the latitudes and longitudes each point corresponds to) - this is really slow, so is only done once, * checks the contents of the file exactly matches `file_checklist` (excluding records we don't care about), * checks that no elements of `checklist` that we're about to unpack are already marked as having been unpacked (i.e., ``True``). """ checked_axes = False for record, location, location_name in _grib_records(grib): _check_record(record, location, location_name, checklist, assert_hour, file_checklist) if file_checklist is not None: file_checklist.remove(location_name) # Checking axes (for some reason) is really slow, so do it once as # a small sanity check, and hope that if it's OK for one record, # they haven't changed things and the other records will be OK if not checked_axes: _check_axes(record) checked_axes = True if dataset_array is not None and \ dataset_array[location].shape != record.values.shape: raise ValueError("record values had incorrect shape") logger.debug("checked %s %s %s", filename, location_name, location) if callback is not None: callback(False, location, location_name) if file_checklist != set(): raise ValueError("records missing from file") def _grib_records(grib): """ Yield ``(record, location, location_name)`` tuples in the file `grib` ... where location and location_name are tuples containing indicies or actual axes names/values corresponding to forecast time, level and variable. e.g., ``(4, 2, 1)`` ``(12, 950, "wind_u")`` (i.e., 12 hours, 950 mb) Records that don't have levels specified as pressure, or are not variables that we are interested in, are ignored. """ grib.seek(0) for record in grib: if record.typeOfLevel != "isobaricInhPa": continue if record.name not in _grib_name_to_variable: continue location_name = (record.forecastTime, record.level, _grib_name_to_variable[record.name]) location = tuple(Dataset.axes[i].index(n) for i, n in enumerate(location_name)) yield record, location, location_name def _check_record(record, location, location_name, checklist, assert_hour, file_checklist): """ Check that this record ... has not already been unpacked, i.e., ``checklist[location]`` is not set ... is for the correct forecast time (i.e., ``forecastTime == assert_hour``) ... is expected for this file (i.e., ``location_name in file_checklist``) """ if checklist is not None and checklist[location]: raise ValueError("record already unpacked (from other file): {0}" .format(location_name)) if assert_hour is not None and record.forecastTime != assert_hour: raise ValueError("Incorrect forecastTime (assert_hour)") if file_checklist is not None and location_name not in file_checklist: raise ValueError("unexpected record: {0}".format(location_name)) def _check_axes(record): """ Check the axes on `record` match what we expect i.e., that the latitudes and longitudes are -90 to 90 / 0 to 360 respectively in 0.5 degree increments. """ # I'm unsure whether this is the correct thing to do. # Some GRIB functions (.latitudes, .latLonValues) have the # latitudes scanning negatively (90 to -90); but .values and # .distinctLatitudes seem to return a grid scanning positively # If it works... if not np.array_equal(record.distinctLatitudes, Dataset.axes.latitude): raise ValueError("unexpected axes on record (latitudes)") if not np.array_equal(record.distinctLongitudes, Dataset.axes.longitude): raise ValueError("unexpected axes on record (longitudes)") class FTP(ftplib.FTP): """gevent-friendly :class:`ftplib.FTP`""" def connect(self, host=None, port=None, timeout=None): if host is not None: self.host = host if port is not None: self.port = port if timeout is not None: self.timeout = timeout self.sock = gevent.socket.create_connection( (self.host, self.port), self.timeout) self.af = self.sock.family self.file = self.sock.makefile('rb') self.welcome = self.getresp() return self.welcome def makeport(self): raise NotImplementedError def ntransfercmd(self, cmd, rest=None): assert self.passiveserver host, port = self.makepasv() conn = gevent.socket.create_connection((host, port), self.timeout) try: if rest is not None: self.sendcmd("REST %s" % rest) resp = self.sendcmd(cmd) if resp[0] == '2': resp = self.getresp() if resp[0] != '1': raise ftplib.error_reply(resp) except: conn.close() raise if resp[:3] == '150': size = ftplib.parse150(resp) return conn, size class NotFound(Exception): """A GRIB file wasn't found""" class BadFile(Exception): """A GRIB file was retrieved, but its contents were bad"""
[docs]class DatasetDownloader(object): _queue_item_type = namedtuple("queue_item", ("hour", "sleep_until", "filename", "expect_pressures", "bad_downloads")) def __init__(self, directory, ds_time, timeout=120, first_file_timeout=600, bad_download_retry_limit=3, write_dataset=True, write_gribmirror=True, deadline=None, dataset_host="ftp.ncep.noaa.gov", dataset_path="/pub/data/nccf/com/gfs/prod/gfs.{0}/"): # set these ASAP for close() via __del__ if __init__ raises something self.success = False self._dataset = None self._gribmirror = None self._tmp_directory = None assert ds_time.hour in (0, 6, 12, 18) assert ds_time.minute == ds_time.second == ds_time.microsecond == 0 if not (write_dataset or write_gribmirror): raise ValueError("Choose write_datset or write_gribmirror " "(or both)") if deadline is None: deadline = max(datetime.now() + timedelta(hours=2), ds_time + timedelta(hours=9, minutes=30)) self.directory = directory self.ds_time = ds_time self.timeout = timeout self.first_file_timeout = first_file_timeout self.write_dataset = write_dataset self.write_gribmirror = write_gribmirror self.bad_download_retry_limit = bad_download_retry_limit self.deadline = deadline self.dataset_host = dataset_host self.dataset_path = dataset_path self.have_first_file = False self.files_complete = 0 self.files_count = 0 self.completed = Event() ds_time_str = self.ds_time.strftime("%Y%m%d%H") self.remote_directory = dataset_path.format(ds_time_str) self._greenlets = Group() self.unpack_lock = RLock() # Items in the queue are # (hour, sleep_until, filename, ...) # so they sort by hour, and then if a not-found adds a delay to # a specific file, files from that hour without the delay # are tried first self._files = PriorityQueue() # areas in self.dataset.array are considered 'undefined' until # self.checklist[index[:3]] is True, since unpack_grib may # write to them, and then abort via ValueError before marking # updating the checklist if the file turns out later to be bad # the checklist also serves as a sort of final sanity check: # we also have "does this file contain all the records we think it # should" checklists; see Worker._download_file self._checklist = make_checklist()
[docs] def open(self): logger.info("downloader: opening files for dataset %s", self.ds_time) self._tmp_directory = \ tempfile.mkdtemp(dir=self.directory, prefix="download.") os.chmod(self._tmp_directory, 0o775) logger.debug("Temporary directory is %s", self._tmp_directory) if self.write_dataset: self._dataset = \ Dataset(self.ds_time, directory=self._tmp_directory, new=True) if self.write_gribmirror: fn = Dataset.filename(self.ds_time, directory=self._tmp_directory, suffix=Dataset.SUFFIX_GRIBMIRROR) logger.debug("Opening gribmirror (truncate and write) %s %s", self.ds_time, fn) self._gribmirror = open(fn, "w+")
[docs] def download(self): logger.info("download of %s starting", self.ds_time) ttl, addresses = resolve_ipv4(self.dataset_host) logger.debug("Resolved to %s IPs", len(addresses)) addresses = [inet_ntoa(x) for x in addresses] total_timeout = self.deadline - datetime.now() total_timeout_secs = total_timeout.total_seconds() if total_timeout_secs < 0: raise ValueError("Deadline already passed") else: logger.debug("Deadline in %s", total_timeout) self._add_files() self._run_workers(addresses, total_timeout_secs) if not self.completed.is_set(): raise ValueError("timed out") if not self._checklist.all(): raise ValueError("incomplete: records missing") self.success = True logger.debug("downloaded %s successfully", self.ds_time)
def _add_files(self): filename_prefix = self.ds_time.strftime("gfs.t%Hz.pgrb2") for hour in Dataset.axes.hour: hour_str = "{0:02}".format(hour) for bit, exp_pr in (("f", Dataset.pressures_pgrb2f), ("bf", Dataset.pressures_pgrb2bf)): self._files.put(self._queue_item_type( hour, 0, filename_prefix + bit + hour_str, exp_pr, 0)) self.files_count += 1 logger.info("Need to download %s files", self.files_count) def _run_workers(self, addresses, total_timeout_secs): logger.debug("Spawning %s workers", len(addresses) * 2) # don't ask _join_all to raise the first exception it catches # if we're already raising something in the except block raising = False try: for worker_id, address in enumerate(addresses * 2): w = DownloadWorker(self, worker_id, address) w.start() w.link() self._greenlets.add(w) # worker unhandled exceptions are raised in this greenlet # via link(). They can appear in completed.wait and # greenlets.kill(block=True) only (the only times that this # greenlet will yield) self.completed.wait(timeout=total_timeout_secs) except: # includes LinkedCompleted - a worker should not exit cleanly # until we .kill them below logger.debug("_run_workers catch %s (will reraise)", sys.exc_info()[1]) raising = True raise finally: # don't leak workers. self._join_all(raise_exception=(not raising)) def _join_all(self, raise_exception=False): # we need the loop to run to completion and so have it catch and # hold or discard exceptions for later. # track the first exception caught and re-raise that exc_info = None while len(self._greenlets): try: self._greenlets.kill(block=True) except greenlet.LinkedCompleted: # now that we've killed workers, these are expected. # ignore. pass except greenlet.LinkedFailed as e: if exc_info is None and raise_exception: logger.debug("_join_all catch %s " "(will reraise)", e) exc_info = sys.exc_info() else: logger.debug("_join_all discarding %s " "(already have exc)", e) if exc_info is not None: try: assert False finally: # avoid circular reference del exc_info def _file_complete(self): self.files_complete += 1 self.have_first_file = True if self.files_complete == self.files_count: self.completed.set() logger.info("progress %s/%s %s%%", self.files_complete, self.files_count, self.files_complete / self.files_count * 100)
[docs] def close(self, move_files=None): if move_files is None: move_files = self.success if self._dataset is not None or self._gribmirror is not None or \ self._tmp_directory is not None: if move_files: logger.info("moving downloaded files") else: logger.info("deleting failed download files") if self._dataset is not None: self._dataset.close() self._dataset = None if move_files: self._move_file() else: self._delete_file() if self._gribmirror is not None: self._gribmirror.close() self._gribmirror = None if move_files: self._move_file(Dataset.SUFFIX_GRIBMIRROR) else: self._delete_file(Dataset.SUFFIX_GRIBMIRROR) if self._tmp_directory is not None: self._remove_download_directory() self._tmp_directory = None
def __del__(self): self.close() def _remove_download_directory(self): l = os.listdir(self._tmp_directory) if l: logger.warning("cleaning %s unknown file%s in temporary directory", len(l), '' if len(l) == 1 else 's') logger.debug("removing temporary directory") shutil.rmtree(self._tmp_directory) def _move_file(self, suffix=''): fn1 = Dataset.filename(self.ds_time, directory=self._tmp_directory, suffix=suffix) fn2 = Dataset.filename(self.ds_time, directory=self.directory, suffix=suffix) logger.debug("renaming %s to %s", fn1, fn2) os.rename(fn1, fn2) def _delete_file(self, suffix=''): fn = Dataset.filename(self.ds_time, directory=self._tmp_directory, suffix=suffix) logger.warning("deleting %s", fn) os.unlink(fn)
class DownloadWorker(gevent.Greenlet): def __init__(self, downloader, worker_id, connect_host): gevent.Greenlet.__init__(self) self.downloader = downloader self.worker_id = worker_id self.connect_host = connect_host self._connection = None self._server_sleep_backoff = 0 self._server_sleep_time = 0 self._files = downloader._files logger_path = logger.name + ".worker.{0}".format(worker_id) self._logger = logging.getLogger(logger_path) def _run(self): while True: # block, with no timeout. If the queue is empty, another # worker might put a file back in (after failure) queue_item = self._files.get(block=True) self._logger.debug("downloading %s", queue_item.filename) sleep_for = queue_item.sleep_until - time() if sleep_for > 0: self._logger.debug("sleeping for %s", sleep_for) self._connection_close() # don't hold connections open sleep(sleep_for) # by default, sleep zero seconds at the end to yield: # if we not-found, ideally we want another server to try self._server_sleep_time = 0 self._run_queue_item(queue_item) if self._server_sleep_time > 0: self._connection_close() sleep(self._server_sleep_time) def _run_queue_item(self, queue_item): temp_file = os.path.join(self.downloader._tmp_directory, queue_item.filename) try: self._logger.debug("begin download") timeout = Timeout(self.downloader.timeout) timeout.start() try: self._download_file(temp_file, queue_item) finally: timeout.cancel() self._unpack_file(temp_file, queue_item) except NotFound: self._handle_notfound(queue_item) except Timeout: self._handle_timeout(queue_item) except BadFile as e: abort = self._handle_badfile(queue_item) if abort: raise # thereby killing the whole download except (gevent.socket.error, ftplib.Error): self._handle_ioerror(queue_item) except (greenlet.GreenletExit, KeyboardInterrupt, SystemExit): raise else: self._server_sleep_backoff = 0 # unfortunately gevent doesn't have JoinablePriorityQueues self.downloader._file_complete() finally: try: os.unlink(temp_file) except OSError as e: if e.errno != errno.ENOENT: raise def _download_file(self, temp_file, queue_item): if self._connection is None: self._logger.debug("connecting to %s", self.connect_host) self._connection = FTP(self.connect_host, user='anonymous') remote_file = os.path.join(self.downloader.remote_directory, queue_item.filename) with open(temp_file, "w") as f: start = time() length = 0 try: self._connection.retrbinary('RETR ' + remote_file, f.write) except ftplib.Error as e: if e[0].startswith("550"): raise NotFound else: raise length = f.tell() end = time() duration = end - start speed = length / (duration * 1024 * 1024) self._logger.debug("download complete, speed %sMB/s", speed) def _handle_notfound(self, queue_item): if self.downloader.have_first_file: sleep_time = self.downloader.timeout else: sleep_time = self.downloader.first_file_timeout self._logger.info("not found: %s; file sleep %s", queue_item.filename, sleep_time) sleep_until = time() + sleep_time self._files.put(queue_item._replace(sleep_until=sleep_until)) def _handle_timeout(self, queue_item): # skip the small server sleeps (less than the timeout that just # failed); also ensures other workers get a go at this file backoff_min = int(math.ceil(math.log(self.downloader.timeout, 2))) if self._server_sleep_backoff < backoff_min + 1: self._server_sleep_backoff = backoff_min + 1 self._server_sleep_time = 2 ** self._server_sleep_backoff self._logger.warning("timeout while downloading %s; server sleep %s", queue_item.filename, self._server_sleep_time) self._files.put(queue_item) def _handle_badfile(self, queue_item): retry_limit = self.downloader.bad_download_retry_limit if queue_item.bad_downloads == retry_limit: self._logger.exception("retry limit reached") return True # abort download else: n = queue_item.bad_downloads + 1 su = time() + self.downloader.timeout i = queue_item._replace(bad_downloads=n, sleep_until=su) self._logger.warning("bad file (%s, attempt %s), file sleep %s", queue_item.filename, n, self.downloader.timeout, exc_info=1) self._files.put(i) def _handle_ioerror(self, queue_item): if self._server_sleep_backoff < 10: self._server_sleep_backoff += 1 self._server_sleep_time = 2 ** self._server_sleep_backoff # don't print a stack trace until it's more if self._server_sleep_backoff >= 5: lf = lambda a, b: self._logger.warning(a, b, exc_info=1) else: lf = self._logger.info lf("exception; server sleep %s", self._server_sleep_time) self._files.put(queue_item) def _connection_close(self): try: self._connection.close() except (greenlet.GreenletExit, KeyboardInterrupt, SystemExit): raise except: pass self._connection = None def _unpack_file(self, temp_file, queue_item): # callback: yields to other greenlets for IO _only_ # the timeout must be cancelled - we do not want to be interrupted, # it could leave downloader._dataset/_checklist in an inconsistent # state with self.downloader.unpack_lock: axes = ([queue_item.hour], queue_item.expect_pressures, Dataset.axes.variable) file_checklist = set(itertools.product(*axes)) try: unpack_grib(temp_file, self.downloader._dataset, self.downloader._checklist, self.downloader._gribmirror, file_checklist=file_checklist, assert_hour=queue_item.hour, callback=lambda a, b, c: sleep(0)) except (greenlet.GreenletExit, KeyboardInterrupt, SystemExit): raise except: try: type, value, traceback = sys.exc_info() value = str(value) raise BadFile(value) finally: # avoid circular reference del traceback
[docs]class DownloadDaemon(object): def __init__(self, directory, num_datasets=1): # TODO - accept the options that DatasetDownloader does self.directory = directory self.num_datasets = num_datasets
[docs] def clean_directory(self): # also returns the latest dataset we have # XXX: this won't clean up gribmirror files that don't have their # corresponding dataset. datasets = Dataset.listdir(self.directory, only_suffices=('', )) keep_rows = sorted(datasets, reverse=True)[:self.num_datasets] keep_ds_times = [r.ds_time for r in keep_rows] kept = [] removed = [] for row in Dataset.listdir(self.directory): if row.ds_time not in keep_ds_times: removed.append(row.filename) os.unlink(row.path) else: kept.append(row.filename) logger.info("cleaning: kept %s, removed %s", kept, removed) for filename in os.listdir(self.directory): if filename.startswith("download."): logging.warning("removing old temporary directory %s", filename) shutil.rmtree(os.path.join(self.directory, filename)) if len(keep_ds_times): logger.debug("latest downloaded dataset is: %s", keep_ds_times[0]) return keep_ds_times[0] else: return None
[docs] def run(self): last_downloaded_dataset = self.clean_directory() latest_dataset = self._latest_dataset() if last_downloaded_dataset is None or \ last_downloaded_dataset < latest_dataset: next_dataset = latest_dataset else: next_dataset = last_downloaded_dataset + timedelta(hours=6) while True: # datasets typically start hitting the mirror 3.5 hours after # their named time expect = next_dataset + timedelta(hours=3, minutes=30) wait_for = (expect - datetime.now()).total_seconds() if wait_for > 0: logger.info("waiting until %s (%s) for dataset %s", expect, wait_for, next_dataset) sleep(wait_for) logger.info("downloading dataset %s", next_dataset) self._download(next_dataset) self.clean_directory() next_dataset += timedelta(hours=6)
def _latest_dataset(self): latest_dataset = (datetime.now() - timedelta(hours=3, minutes=30)) \ .replace(minute=0, second=0, microsecond=0) hour = latest_dataset.hour - (latest_dataset.hour % 6) latest_dataset = latest_dataset.replace(hour=hour) logger.info("latest dataset is %s", latest_dataset) return latest_dataset def _download(self, ds_time): try: d = DatasetDownloader(self.directory, ds_time) d.open() d.download() except (greenlet.GreenletExit, KeyboardInterrupt, SystemExit): raise except: logger.exception("Failed to download %s", ds_time) else: logger.info("Download complete %s", ds_time) finally: d.close()
def _parse_ds_str(ds_time_str): try: if len(ds_time_str) != 10: raise ValueError ds_time = datetime.strptime(ds_time_str, "%Y%m%d%H") except ValueError: raise argparse.ArgumentTypeError("invalid dataset string") if ds_time.hour % 6 != 0: raise argparse.ArgumentTypeError("dataset hour must be a multiple of 6") return ds_time _format_email = \ """%(levelname)s from logger %(name)s (thread %(threadName)s) Time: %(asctime)s Location: %(pathname)s:%(lineno)d Module: %(module)s Function: %(funcName)s %(message)s""" _format_string = \ "[%(asctime)s] %(levelname)s %(name)s %(threadName)s: %(message)s"
[docs]def main(): root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) parent = argparse.ArgumentParser(add_help=False) parent.add_argument('-d', '--directory', default=Dataset.DEFAULT_DIRECTORY) parent.add_argument('-f', '--log-file') parent.add_argument('-e', '--email-exceptions', metavar='USER@DOMAIN.TLD') parent.add_argument('-s', '--email-from', default='tawhiri@localhost') parent.add_argument('-c', '--email-server', default='localhost') group = parent.add_mutually_exclusive_group() group.add_argument('-w', '--log-file-verbose', action="store_true") group.add_argument('-r', '--log-file-quiet', action="store_true") group = parent.add_mutually_exclusive_group() group.add_argument("-v", "--verbose", action="store_true") group.add_argument("-q", "--quiet", action="store_true") parser = argparse.ArgumentParser(description='Dataset Downloader') subparsers = parser.add_subparsers(dest='subparser_name') parser_daemon = subparsers.add_parser('daemon', parents=[parent], help='downloader daemon mode') parser_daemon.add_argument('-n', '--num-datasets', type=int, default=1) parser_download = subparsers.add_parser('download', parents=[parent], help='download a single dataset') parser_download.add_argument('dataset', type=_parse_ds_str) # TODO - more options (other options of relevant initialisers) args = parser.parse_args() fmtr = logging.Formatter(_format_string) handler = logging.StreamHandler() # stderr handler.setFormatter(fmtr) if args.verbose: handler.setLevel(logging.DEBUG) elif not args.quiet: handler.setLevel(logging.INFO) else: handler.setLevel(logging.WARNING) root_logger.addHandler(handler) if args.log_file: handler = logging.handlers.WatchedFileHandler(args.log_file) handler.setFormatter(fmtr) if args.log_file_verbose: handler.setLevel(logging.DEBUG) elif not args.log_file_quiet: handler.setLevel(logging.INFO) else: handler.setLevel(logging.WARNING) root_logger.addHandler(handler) logger.info("Opening log file %s", args.log_file) if args.email_exceptions: emails_to = [args.email_exceptions] emails_from = args.email_from email_server = args.email_server handler = logging.handlers.SMTPHandler( email_server, emails_from, emails_to, "tawhiri wind downloader") handler.setLevel(logging.ERROR) handler.setFormatter(logging.Formatter(_format_email)) root_logger.addHandler(handler) try: if args.subparser_name == 'download': d = DatasetDownloader(args.directory, args.dataset) try: d.open() d.download() finally: d.close() else: d = DownloadDaemon(args.directory, args.num_datasets) d.run() except (greenlet.GreenletExit, KeyboardInterrupt, SystemExit): logger.warning("exit via %s", sys.exc_info()[0].__name__) raise except: logger.exception("unhandled exception") raise
if __name__ == "__main__": main()