Source code for auromat.mapping.miracle

# Copyright European Space Agency, 2013

from __future__ import division, print_function

from collections import namedtuple
import datetime
import os
import fnmatch

import numpy as np
import numpy.ma as ma
from numpy.core.umath_tests import matrix_multiply

from astropy.coordinates.angles import Angle
import astropy.units as u

from auromat.coordinates.geodesic import wgs84A, wgs84B
from auromat.mapping.mapping import BaseMapping, BoundingBox, GenericMapping,\
    BaseMappingProvider, MappingCollection, \
    inflatedEarthIntersection, DefaultRGBMixin, sanitize_data
from auromat.coordinates.transform import Y, Z, rotation_matrix, geodetic2EcefZero,\
    ecef2Geodetic, spherical_to_cartesian, latLonToJ2000
import auromat.utils
from auromat.util.decorators import lazy_property, inherit_docs
from auromat.util.image import loadImage

fileDateTimeFormat = '%y%m%d_%H%M%S'

# NOTE: xc, yc, k are relative to a 512x512 image
# NOTE: xc is vertical, yc is horizontal!
CalibrationData = namedtuple('CalibrationData', ['station', 'validFrom', 'validTo',
                                                 'lat', 'lon', 'xc', 'yc', 
                                                 'k', 'rotation',
                                                 'boundingBoxSimple'])

@inherit_docs
[docs]class MIRACLEMappingProvider(BaseMappingProvider): def __init__(self, imageFolder, altitude=110, simple=False, maxTimeOffset=5): """ :param imageFolder: path to folder containing images and cal.txt file :param altitude: the altitude in km onto which the image was mapped (e.g. 110) Note that for simple=True, this is always 110km :param bool simple: whether to use simple mapping (constant lat-lon grid) or intersection-based mapping :param maxTimeOffset: in seconds """ self.imageFolder = imageFolder self.altitude = altitude self.simple = simple self.imageFileExtension = 'jpg' self.maxTimeOffset = maxTimeOffset fileNames = os.listdir(imageFolder) imageFileNames = sorted(fnmatch.filter(fileNames, '*.' + self.imageFileExtension)) imageStations = [f[:3] for f in imageFileNames] self.imageDates = [datetime.datetime.strptime(f[3:16], fileDateTimeFormat) for f in imageFileNames] self.images = dict() # station -> [(filename,date)] for station in set(imageStations): self.images[station] = [(f,d) for (f,s,d) in zip(imageFileNames, imageStations, self.imageDates) if s == station] def __len__(self): return len(self.imageDates) @property def range(self): dates = sorted(self.imageDates) return dates[0], dates[-1]
[docs] def contains(self, date): for images in self.images.values(): dates = [d for (_,d) in images] # search for image at given time idx = auromat.utils.findNearest(dates, date) offset = abs(dates[idx]-date).total_seconds() if offset <= self.maxTimeOffset: return True return False
[docs] def get(self, date): mappings = [] # for each station find the image closest to date+-maxTimeOffset for images in self.images.values(): dates = [d for (_,d) in images] # search for image at given time idx = auromat.utils.findNearest(dates, date) offset = abs(dates[idx]-date).total_seconds() if offset <= self.maxTimeOffset: imagePath = os.path.join(self.imageFolder, images[idx][0]) mappings.append(getMapping(imagePath, self.altitude, self.simple)) ident = 'MIRACLE.' + date.strftime('%Y.%m.%d.%H.%M.%S') mappingColl = MappingCollection(mappings, identifier=ident, mayOverlap=True) return mappingColl
[docs] def getById(self, identifier): raise NotImplementedError
[docs] def getSequence(self, dateBegin=None, dateEnd=None): raise NotImplementedError
@sanitize_data @inherit_docs class MIRACLEMapping(DefaultRGBMixin, BaseMapping): """ A mapping defined using an image and calibration data from FMI MIRACLE. """ def __init__(self, calData, imagePath, photoTime, alti, simple=False): """ :param CalibrationData calData: :param photoTime: datetime object :param alti: the altitude in km onto which the image was mapped (e.g. 110) Note that for simple=True, this is always 110km :param bool simple: whether to use simple mapping (constant lat-lon grid) or intersection-based mapping """ # TODO SOD ASI images include wavelength in filename, add to identifier? identifier = calData.station + '.' + photoTime.strftime('%Y.%m.%d.%H.%M.%S') xASI,yASI,zASI = geodetic2EcefZero(np.deg2rad(calData.lat), np.deg2rad(calData.lon)) self.cameraPosGEO = [xASI, yASI, zASI] cameraPosGCRS = latLonToJ2000(calData.lat, calData.lon, 0, photoTime) alti = 110 if simple or alti is None else alti BaseMapping.__init__(self, alti, cameraPosGCRS, photoTime, identifier) self._calData = calData self._simple = simple self._imagePath = imagePath self._img = None self._img_unmasked = None @property def img(self): if self._img is None: self._img = ma.masked_array(self._img_unmasked) return self._img @property def img_unmasked(self): if self._img_unmasked is None: rgb = loadImage(self._imagePath) if rgb.shape[0] != rgb.shape[1]: # contains caption below image, cut it off rgb = rgb[:rgb.shape[1],:] assert rgb.shape == (rgb.shape[1],rgb.shape[1],3) self._img_unmasked = rgb return self._img_unmasked @property def lats(self): lats, _ = self._latsLonsCorner return lats @property def lons(self): _, lons = self._latsLonsCorner return lons @property def latsCenter(self): lats, _ = self._latsLonsCenter return lats @property def lonsCenter(self): _, lons = self._latsLonsCenter return lons @lazy_property def _latsLonsCorner(self): lats, lons = self._calculateLatsLons(center=False) lats = ma.masked_invalid(lats, copy=False) lons = ma.masked_invalid(lons, copy=False) return lats, lons @lazy_property def _latsLonsCenter(self): lats, lons = self._calculateLatsLons(center=True) lats = ma.masked_invalid(lats, copy=False) lons = ma.masked_invalid(lons, copy=False) return lats, lons def _calculateLatsLons(self, center=True): if self._simple: bb = self._calData.boundingBoxSimple deltaLat = (bb.latNorth - bb.latSouth)/self.img.shape[0] deltaLon = (bb.lonEast - bb.lonWest)/self.img.shape[1] if center: latSpace = np.linspace(bb.latNorth-deltaLat/2, bb.latSouth+deltaLat/2, self.img.shape[0]) lonSpace = np.linspace(bb.lonWest+deltaLon/2, bb.lonEast-deltaLon/2, self.img.shape[0]) else: latSpace = np.linspace(bb.latNorth, bb.latSouth, self.img.shape[0]+1) lonSpace = np.linspace(bb.lonWest, bb.lonEast, self.img.shape[0]+1) #lats, lons = np.meshgrid(latSpace, lonSpace, indexing='ij') # 'indexing' not supported in np 1.6 lats, lons = np.dstack(np.meshgrid(latSpace, lonSpace)).T else: intersectionInflated = self.intersectionInflatedCenter if center else self.intersectionInflatedCorner xyz = intersectionInflated.reshape(-1,3) lats, lons = ecef2Geodetic(xyz[:,0], xyz[:,1], xyz[:,2], wgs84A, wgs84B) np.rad2deg(lats, lats) np.rad2deg(lons, lons) lats = lats.reshape(intersectionInflated.shape[0], intersectionInflated.shape[1]) lons = lons.reshape(intersectionInflated.shape[0], intersectionInflated.shape[1]) return lats, lons @lazy_property def cameraToPixelCornerDirection(self): """ Direction vector for each pixel corner. """ vecs = self._calculateCameraToPixelDirection(self.elevationCorner, self.azimuthCorner) return vecs @lazy_property def cameraToPixelCenterDirection(self): """ Direction vector for each pixel center. """ _, el = self.azElCenter # don't use self.elevation here, conflicts with sanitization decorator vecs = self._calculateCameraToPixelDirection(el, self.azimuthCenter) return vecs def _calculateCameraToPixelDirection(self, el, az): el = np.deg2rad(el) az = np.deg2rad(-(az-180)) x,y,z = spherical_to_cartesian(1,el,az) vecs = np.dstack((x,y,z)) # simple spherical latitude rotation works here because # the latitude is the geodetic latitude which is the # angle between the normal and the equatorial plane matLat = rotation_matrix(np.deg2rad(90 - self._calData.lat), Y)[:3,:3] matLon = rotation_matrix(np.deg2rad(-self._calData.lon), Z)[:3,:3] mat = np.dot(matLon, matLat) # rotate latitude first, then longitude vecs = vecs.reshape(el.shape[0]*el.shape[1], 3) vecsRot = matrix_multiply(mat, vecs[...,np.newaxis]).reshape(el.shape[0], el.shape[1], 3) return vecsRot @lazy_property def intersectionInflatedCorner(self): """Returns the point of intersection with the inflated earth for each pixel corner.""" intersectionInflated = inflatedEarthIntersection(self.cameraToPixelCornerDirection.reshape(-1,3), self.cameraPosGEO, self.altitude) return intersectionInflated.reshape(self.cameraToPixelCornerDirection.shape) @lazy_property def intersectionInflatedCenter(self): """Returns the point of intersection with the inflated earth for each pixel center.""" intersectionInflated = inflatedEarthIntersection(self.cameraToPixelCenterDirection.reshape(-1,3), self.cameraPosGEO, self.altitude) return intersectionInflated.reshape(self.cameraToPixelCenterDirection.shape) @property def elevation(self): _, el = self.azElCenter return el @property def elevationCorner(self): _, el = self.azElCorner return el @property def azimuthCenter(self): az, _ = self.azElCenter return az @property def azimuthCorner(self): az, _ = self.azElCorner return az @lazy_property def azElCorner(self): """ Azimuth and elevation for each pixel corner. """ az, el = self.calculateAzEl(center=False) az = ma.masked_invalid(az, copy=False) el = ma.masked_invalid(el, copy=False) return az, el @lazy_property def azElCenter(self): """ Azimuth and elevation for each pixel center. """ az, el = self.calculateAzEl(center=True) az = ma.masked_invalid(az, copy=False) el = ma.masked_invalid(el, copy=False) return az, el def calculateAzEl(self, center=True): w = self.img_unmasked.shape[0] xc = self._calData.xc yc = self._calData.yc k = self._calData.k refW = 512 if w != refW: xc *= w/refW yc *= w/refW k *= w/refW w_ = w if center else w+1 center_ = np.array([xc,yc]) ind = np.indices((w_, w_)) # TODO verify that the (0,0) is really the upper left corner of the upper left pixel if center: ind += 0.5 ind = np.dstack((ind[0], ind[1])).reshape((w_)*(w_), 2) vecs = ind - center_ northVec = [-1,0] # points to top of image northVecs = np.repeat([northVec], len(vecs), axis=0) az = auromat.utils.signedAngleBetween(vecs, northVecs).reshape(w_,w_) az -= self._calData.rotation az = Angle(az * u.rad).wrap_at(360 * u.deg).degree distCenter = np.sqrt((vecs*vecs).sum(axis=1)).reshape(w_,w_) z = distCenter/k np.rad2deg(z, z) elev = 90 - z return az, elev def createResampled(self, lats, lons, latsCenter, lonsCenter, elevation, img): mapping = GenericMapping(lats, lons, latsCenter, lonsCenter, elevation, self.altitude, img, self.cameraPosGCRS, self.photoTime, self.identifier) return mapping
[docs]def getMapping(imagePath, alti=110, simple=False): filename = os.path.basename(imagePath) # e.g. KEV120304_172100_____4000.jpg station = filename[:3] date = datetime.datetime.strptime(filename[3:16], fileDateTimeFormat) calPath = os.path.join(os.path.dirname(imagePath), 'cal.txt') calData = getCalibrationData(calPath, station, date) mapping = MIRACLEMapping(calData, imagePath, date, alti, simple=simple) mapping = mapping.maskedByElevation(0.1) # .1 to account for rounding errors return mapping
def getCalibrationData(path, station, date): calDataEntries = np.loadtxt(path, dtype={'names': ('station', 'lat', 'lon', 'from', 'to', 'xc', 'yc', 'k', 'rotation', 'lat+', 'lat-', 'lon-', 'lon+', 'i1', 'i2', 'i3'), 'formats': ('S3', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'f8', 'b1', 'b1', 'b1')}) for entry in calDataEntries: if entry['station'] != station: continue fromDateY = int(entry['from']) fromDateM = int((entry['from']-fromDateY)*12 + 1) toDateY = int(entry['to']) toDateM = int((entry['to']-toDateY)*12 + 1) fromDate = datetime.datetime(fromDateY, fromDateM, 1) toDate = datetime.datetime(toDateY, toDateM+1, 1) # easier than using end of month if not fromDate <= date <= toDate: continue lat = entry['lat'] lon = entry['lon'] bbSimple = BoundingBox(latSouth=lat+entry['lat-'], lonWest=lon+entry['lon-'], latNorth=lat+entry['lat+'], lonEast=lon+entry['lon+']) calData = CalibrationData(station=entry['station'], validFrom=fromDate, validTo=toDate, lat=lat, lon=lon, xc=entry['xc'], yc=entry['yc'], k=entry['k'], rotation=entry['rotation'], boundingBoxSimple=bbSimple) return calData raise ValueError('No MIRACLE calibration data found for ' + station + ' station') # TODO MIRACLEMapping is missing in sphinx docs __all__ = map(lambda f: f.__name__, [MIRACLEMappingProvider, MIRACLEMapping, getMapping])