# coding: utf-8
from __future__ import absolute_import, division, print_function
import re
import threading
import time
from collections import Mapping
from functools import partial
import requests
from logbook import Logger
from ratelimiter import RateLimiter
from represent import ReprHelper, ReprHelperMixin
from .operators import _stringify_predicate_value
logger = Logger('spacetrack')
type_re = re.compile('(\w+)')
enum_re = re.compile("""
enum\(
'(\w+)' # First value
(?:, # Subsequent values optional
'(\w+)' # Capture string
)*
\)
""", re.VERBOSE)
[docs]class AuthenticationError(Exception):
"""Space-Track authentication error."""
[docs]class Predicate(ReprHelperMixin, object):
"""Hold Space-Track predicate information.
The current goal of this class is to print the repr for the user.
"""
def __init__(self, name, type_, nullable=False, default=None, values=None):
self.name = name
self.type_ = type_
self.nullable = nullable
self.default = default
# Values can be set e.g. for enum predicates
self.values = values
def _repr_helper_(self, r):
r.keyword_from_attr('name')
r.keyword_from_attr('type_')
r.keyword_from_attr('nullable')
r.keyword_from_attr('default')
if self.values is not None:
r.keyword_from_attr('values')
[docs]class SpaceTrackClient(object):
"""SpaceTrack client class.
Parameters:
identity: Space-Track username.
password: Space-Track password.
For more information, refer to the `Space-Track documentation`_.
.. _`Space-Track documentation`: https://www.space-track.org/documentation
#api-requestClasses
"""
base_url = 'https://www.space-track.org/'
# This dictionary is a mapping of request classes to request controllers.
# Each class is accessible as a method on this class.
request_classes = {
'tle': 'basicspacedata',
'tle_latest': 'basicspacedata',
'tle_publish': 'basicspacedata',
'omm': 'basicspacedata',
'boxscore': 'basicspacedata',
'satcat': 'basicspacedata',
'launch_site': 'basicspacedata',
'satcat_change': 'basicspacedata',
'satcat_debut': 'basicspacedata',
'decay': 'basicspacedata',
'tip': 'basicspacedata',
'announcement': 'basicspacedata',
'cdm': 'expandedspacedata',
'organization': 'expandedspacedata',
'file': 'fileshare',
'download': 'fileshare',
}
# These predicates are available for every request class.
rest_predicates = {
Predicate('predicates', 'str'),
Predicate('metadata', 'enum', values=('true', 'false')),
Predicate('limit', 'str'),
Predicate('orderby', 'str'),
Predicate('distinct', 'enum', values=('true', 'false')),
Predicate(
'format', 'enum',
values=('json', 'xml', 'html', 'csv', 'tle', '3le', 'kvn', 'stream')),
Predicate('emptyresult', 'enum', values=('show',)),
Predicate('favorites', 'str'),
}
def __init__(self, identity, password):
self.session = self._create_session()
self.identity = identity
self.password = password
# If set, this will be called when we sleep for the rate limit.
self.callback = None
self._authenticated = False
self._predicates = dict()
# "Space-track throttles API use in order to maintain consistent
# performance for all users. To avoid error messages, please limit your
# query frequency to less than 20 requests per minute."
self._ratelimiter = RateLimiter(
max_calls=19, period=60, callback=self._ratelimit_callback)
def _ratelimit_callback(self, until):
duration = int(round(until - time.time()))
logger.info('Rate limit reached. Sleeping for {:d} seconds.', duration)
if self.callback is not None:
self.callback(until)
@staticmethod
def _create_session():
"""Create session for accessing the web.
This method is overridden in
:class:`spacetrac.aio.AsyncSpaceTrackClient` to use :mod:`aiohttp`
instead of :mod:`requests`.
"""
return requests.Session()
[docs] def authenticate(self):
"""Authenticate with Space-Track.
Raises:
spacetrack.base.AuthenticationError: Incorrect login details.
"""
if not self._authenticated:
login_url = self.base_url + 'ajaxauth/login'
data = {'identity': self.identity, 'password': self.password}
resp = self.session.post(login_url, data=data)
_raise_for_status(resp)
# If login failed, we get a JSON response with {'Login': 'Failed'}
resp_data = resp.json()
if isinstance(resp_data, Mapping):
if resp_data.get('Login', None) == 'Failed':
raise AuthenticationError()
self._authenticated = True
[docs] def generic_request(self, class_, iter_lines=False, iter_content=False,
**kwargs):
"""Generic Space-Track query.
The request class methods use this method internally; the following
two lines are equivalent:
.. code-block:: python
spacetrack.tle_publish(*args, **kwargs)
spacetrack.generic_request('tle_publish', *args, **kwargs)
Parameters:
class_: Space-Track request class name
iter_lines: Yield result line by line
iter_content: Yield result in 100 KiB chunks.
**kwargs: These keywords must match the predicate fields on
Space-Track. You may check valid keywords with the following
snippet:
.. code-block:: python
spacetrack = SpaceTrackClient(...)
spacetrack.tle.get_predicates()
# or
spacetrack.get_predicates('tle')
See :func:`~spacetrack.operators._stringify_predicate_value` for
which Python objects are converted appropriately.
Yields:
Lines—stripped of newline characters—if ``iter_lines=True``
Yields:
100 KiB chunks if ``iter_content=True``
Returns:
Parsed JSON object, unless ``format`` keyword argument is passed.
.. warning::
Passing ``format='json'`` will return the JSON **unparsed**. Do
not set ``format`` if you want the parsed JSON object returned!
"""
if iter_lines and iter_content:
raise ValueError('iter_lines and iter_content cannot both be True')
if class_ not in self.request_classes:
raise ValueError("Unknown request class '{}'".format(class_))
# Decode unicode unless class == download, including conversion of
# CRLF newlines to LF.
decode = (class_ != 'download')
if not decode and iter_lines:
error = (
'iter_lines disabled for binary data, since CRLF newlines '
'split over chunk boundaries would yield extra blank lines. '
'Use iter_content=True instead.')
raise ValueError(error)
self.authenticate()
controller = self.request_classes[class_]
url = ('{0}{1}/query/class/{2}'
.format(self.base_url, controller, class_))
# Validate keyword argument names by querying valid predicates from
# Space-Track
predicates = self.get_predicates(class_)
predicate_fields = {p.name for p in predicates}
valid_fields = predicate_fields | {p.name for p in self.rest_predicates}
for key, value in kwargs.items():
if key not in valid_fields:
raise TypeError(
"'{class_}' got an unexpected argument '{key}'"
.format(class_=class_, key=key))
value = _stringify_predicate_value(value)
url += '/{key}/{value}'.format(key=key, value=value)
logger.debug(url)
resp = self._ratelimited_get(url, stream=iter_lines or iter_content)
_raise_for_status(resp)
if resp.encoding is None:
resp.encoding = 'UTF-8'
if iter_lines:
return _iter_lines_generator(resp, decode_unicode=decode)
elif iter_content:
return _iter_content_generator(resp, decode_unicode=decode)
else:
# If format is specified, return that format unparsed. Otherwise,
# parse the default JSON response.
if 'format' in kwargs:
if decode:
data = resp.text
# Replace CRLF newlines with LF, Python will handle platform
# specific newlines if written to file.
data = data.replace('\r\n', '\n')
else:
data = resp.content
return data
else:
return resp.json()
def _ratelimited_get(self, *args, **kwargs):
"""Perform get request, handling rate limiting."""
with self._ratelimiter:
resp = self.session.get(*args, **kwargs)
# It's possible that Space-Track will return HTTP status 500 with a
# query rate limit violation. This can happen if a script is cancelled
# before it has finished sleeping to satisfy the rate limit and it is
# started again.
#
# Let's catch this specific instance and retry once if it happens.
if resp.status_code == 500:
# Let's only retry if the error page tells us it's a rate limit
# violation.in
if 'violated your query rate limit' in resp.text:
# Mimic the RateLimiter callback behaviour.
until = time.time() + self._ratelimiter.period
t = threading.Thread(target=self._ratelimit_callback, args=(until,))
t.daemon = True
t.start()
time.sleep(self._ratelimiter.period)
# Now retry
with self._ratelimiter:
resp = self.session.get(*args, **kwargs)
return resp
def __getattr__(self, attr):
if attr not in self.request_classes:
raise AttributeError(
"'{name}' object has no attribute '{attr}'"
.format(name=self.__class__.__name__, attr=attr))
function = partial(self.generic_request, attr)
function.get_predicates = partial(self.get_predicates, attr)
return function
def _download_predicate_data(self, class_):
"""Get raw predicate information for given request class, and cache for
subsequent calls.
"""
self.authenticate()
controller = self.request_classes[class_]
url = ('{0}{1}/modeldef/class/{2}'
.format(self.base_url, controller, class_))
logger.debug(url)
resp = self._ratelimited_get(url)
_raise_for_status(resp)
return resp.json()['data']
[docs] def get_predicates(self, class_):
"""Get full predicate information for given request class, and cache
for subsequent calls.
"""
if class_ not in self._predicates:
predicates_data = self._download_predicate_data(class_)
predicate_objects = self._parse_predicates_data(predicates_data)
self._predicates[class_] = predicate_objects
return self._predicates[class_]
def _parse_predicates_data(self, predicates_data):
predicate_objects = []
for field in predicates_data:
full_type = field['Type']
type_match = type_re.match(full_type)
if not type_match:
raise ValueError(
"Couldn't parse field type '{}'".format(full_type))
type_name = type_match.group(1)
field_name = field['Field'].lower()
nullable = (field['Null'] == 'YES')
default = field['Default']
types = {
# Strings
'char': 'str',
'varchar': 'str',
'longtext': 'str',
# varbinary only used for 'file' request class, for the
# 'file_link' predicate.
'varbinary': 'str',
# Integers
'bigint': 'int',
'int': 'int',
'tinyint': 'int',
'smallint': 'int',
'mediumint': 'int',
# Floats
'decimal': 'float',
'float': 'float',
'double': 'float',
# Date/Times
'date': 'date',
'timestamp': 'datetime',
'datetime': 'datetime',
# Enum
'enum': 'enum',
# Bytes
'longblob': 'bytes',
}
if type_name not in types:
raise ValueError("Unknown predicate type '{}'."
.format(type_name))
predicate = Predicate(
name=field_name,
type_=types[type_name],
nullable=nullable,
default=default)
if type_name == 'enum':
enum_match = enum_re.match(full_type)
if not enum_match:
raise ValueError(
"Couldn't parse enum type '{}'".format(full_type))
predicate.values = enum_match.groups()
predicate_objects.append(predicate)
return predicate_objects
def __repr__(self):
r = ReprHelper(self)
r.parantheses = ('<', '>')
r.keyword_from_attr('identity')
return str(r)
def _iter_content_generator(response, decode_unicode):
"""Generator used to yield 100 KiB chunks for a given response."""
for chunk in response.iter_content(100 * 1024, decode_unicode=decode_unicode):
if decode_unicode:
# Replace CRLF newlines with LF, Python will handle
# platform specific newlines if written to file.
chunk = chunk.replace('\r\n', '\n')
# Chunk could be ['...\r', '\n...'], stril trailing \r
chunk = chunk.rstrip('\r')
yield chunk
def _iter_lines_generator(response, decode_unicode):
"""Iterates over the response data, one line at a time. When
stream=True is set on the request, this avoids reading the
content at once into memory for large responses.
The function is taken from :meth:`requests.models.Response.iter_lines`, but
modified to use our :func:`~spacetrack.base._iter_content_generator`. This
is because Space-Track uses CRLF newlines, so :meth:`str.splitlines` can
cause us to yield blank lines if one chunk ends with CR and the next one
starts with LF.
.. note:: This method is not reentrant safe.
"""
pending = None
for chunk in _iter_content_generator(response, decode_unicode=decode_unicode):
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
def _raise_for_status(response):
"""Raises stored :class:`HTTPError`, if one occurred.
This is the :meth:`requests.models.Response.raise_for_status` method,
modified to add the response from Space-Track, if given.
"""
http_error_msg = ''
if 400 <= response.status_code < 500:
http_error_msg = '%s Client Error: %s for url: %s' % (
response.status_code, response.reason, response.url)
elif 500 <= response.status_code < 600:
http_error_msg = '%s Server Error: %s for url: %s' % (
response.status_code, response.reason, response.url)
if http_error_msg:
spacetrack_error_msg = None
try:
json = response.json()
if isinstance(json, Mapping):
spacetrack_error_msg = json['error']
except (ValueError, KeyError):
pass
if not spacetrack_error_msg:
spacetrack_error_msg = response.text
if spacetrack_error_msg:
http_error_msg += '\nSpace-Track response:\n' + spacetrack_error_msg
raise requests.HTTPError(http_error_msg, response=response)