import os
import multiprocessing as mp
import time
import ConfigParser
from .metadata import ArchiveMeta, PlateMeta, PlateHeader, read_conf
from .solve import SolveProcess
from .database import PlateDB
from .image import PlateConverter
[docs]class PlateImagePipeline:
"""
Plate processing pipeline class
"""
def __init__(self, plate_converter=None):
self.conf = None
self.work_dir = ''
self.write_log_dir = ''
self.input_queue = None
self.done_queue = None
self.plate_converter = plate_converter
self.plate_epoch = None
self.read_wfpdb = False
self.read_csv = False
self.read_fits = False
self.output_header_file = False
self.output_header_fits = False
self.invert_image = False
self.extract_sources = False
self.solve_plate = False
self.output_solution_db = False
self.output_wcs_file = False
self.solve_recursive = False
self.calibrate_photometry = False
self.output_calibration_db = False
self.output_sources_db = False
self.output_sources_csv = False
[docs] def assign_conf(self, conf):
"""
Parse configuration and set class attributes.
"""
if isinstance(conf, str):
conf = read_conf(conf)
self.conf = conf
for attr in ['work_dir', 'write_log_dir']:
try:
setattr(self, attr, conf.get('Files', attr))
except ConfigParser.Error:
pass
for attr in ['read_wfpdb', 'read_csv', 'read_fits',
'output_header_file', 'output_header_fits',
'invert_image', 'extract_sources', 'solve_plate',
'output_solution_db', 'output_wcs_file',
'solve_recursive', 'calibrate_photometry',
'output_calibration_db',
'output_sources_db', 'output_sources_csv']:
try:
setattr(self, attr, conf.getboolean('Pipeline', attr))
except ValueError:
print ('Error in configuration file: not a boolean value '
'([{}], {})'.format('Pipeline', attr))
except ConfigParser.Error:
pass
[docs] def single_image(self, filename, plate_epoch=None):
"""
Process single plate image.
Parameters
----------
filename : str
Filename of the FITS image to be processed
plate_epoch : float
Plate epoch (decimal year)
"""
ameta = ArchiveMeta()
ameta.assign_conf(self.conf)
if self.read_wfpdb:
ameta.read_wfpdb()
if self.read_csv:
ameta.read_csv()
fn = os.path.basename(filename)
pmeta = ameta.get_platemeta(filename=fn)
pmeta.compute_values()
#platedb = PlateDB()
#platedb.assign_conf(self.conf)
#platedb.open_connection()
#platedb.write_plate(pmeta)
#platedb.write_scan(pmeta)
#platedb.close_connection()
h = PlateHeader()
h.assign_conf(pmeta.conf)
h.assign_platemeta(pmeta)
h.update_from_platemeta()
h.assign_values()
h.update_comments()
h.rewrite()
h.reorder()
if self.output_header_file:
fn_header = os.path.splitext(fn)[0] + '.hdr'
h.output_to_file(fn_header)
proc = SolveProcess(fn)
proc.assign_conf(pmeta.conf)
proc.assign_header(h)
if self.plate_epoch is not None:
proc.plate_epoch = self.plate_epoch
if plate_epoch is not None:
proc.plate_epoch = plate_epoch
proc.setup()
if self.invert_image:
proc.invert_plate()
if self.extract_sources:
proc.extract_sources()
if self.solve_plate:
proc.solve_plate()
if self.output_solution_db:
proc.output_solution_db()
if self.output_wcs_file:
proc.output_wcs_header()
if proc.solution is not None:
proc.log.write('Updating FITS header with the WCS',
level=3, event=47)
h.insert_wcs(proc.solution['wcs'])
if self.output_header_file:
proc.log.write('Writing FITS header to a file',
level=3, event=48)
h.output_to_file(fn_header)
if self.output_header_fits:
proc.log.write('Writing FITS header to the FITS file',
level=3, event=49)
h.output_to_fits(fn)
if self.solve_recursive:
proc.solve_recursive()
proc.process_source_coordinates()
if self.calibrate_photometry:
proc.calibrate_photometry()
if self.output_calibration_db:
proc.output_cterm_db()
proc.output_color_db()
proc.output_calibration_db()
proc.output_rmse_db()
if self.output_sources_db:
proc.output_sources_db()
if self.output_sources_csv:
proc.output_sources_csv()
proc.finish()
[docs] def worker(self):
"""
Take a filename from the queue and process the file.
"""
if self.input_queue is None:
return
while True:
if os.path.exists(os.path.join(self.work_dir, 'pyplate.stop')):
break
fn = self.input_queue.get()
if fn == 'DONE':
break
if self.plate_converter:
plateconv = PlateConverter()
plateconv.assign_conf(self.conf)
plateconv.tiff2fits(fn)
else:
self.single_image(fn)
self.done_queue.put(fn)
[docs] def parallel_run(self, filenames, processes=1):
"""
Run plate image processes in parallel.
Parameters
----------
filenames : list
List of filenames to process
processes : int
Number of parallel processes
"""
self.input_queue = mp.Queue()
self.done_queue = mp.Queue()
jobs = []
queue_list = []
try:
with open(os.path.join(self.work_dir, 'pyplate.queue'),
'rb') as f:
queue_list = [fn.strip() for fn in f.readlines()]
except IOError:
pass
if not queue_list:
queue_list = filenames
for fn in queue_list:
self.input_queue.put(fn)
for i in range(processes):
self.input_queue.put('DONE')
job = mp.Process(target=self.worker)
job.start()
jobs.append(job)
# Wait 10 seconds before starting another process
time.sleep(10)
# Write unfinished and finished file lists to disk every 10 seconds
while True:
time.sleep(10)
self.done_queue.put('STOP')
done_list = [fn for fn in iter(self.done_queue.get, 'STOP')]
queue_list = [fn for fn in queue_list if fn not in done_list]
try:
with open(os.path.join(self.work_dir, 'pyplate.done'),
'ab') as f:
for fn in done_list:
f.write('{}\n'.format(fn))
except IOError:
pass
try:
with open(os.path.join(self.work_dir, 'pyplate.queue'),
'wb') as f:
for fn in queue_list:
f.write('{}\n'.format(fn))
except IOError:
pass
if os.path.exists(os.path.join(self.work_dir, 'pyplate.stop')):
break
if queue_list == []:
break
for job in jobs:
job.join()
self.done_queue.put('STOP')
done_list = [fn for fn in iter(self.done_queue.get, 'STOP')]
queue_list = [fn for fn in queue_list if fn not in done_list]
try:
with open(os.path.join(self.work_dir, 'pyplate.done'),
'ab') as f:
for fn in done_list:
f.write('{}\n'.format(fn))
except IOError:
pass
try:
with open(os.path.join(self.work_dir, 'pyplate.queue'),
'wb') as f:
for fn in queue_list:
f.write('{}\n'.format(fn))
except IOError:
pass
[docs]def run_pipeline(filenames, fn_conf):
"""
Run metadata processing and plate solving pipeline.
Parameters
----------
filenames : list
List of FITS files to be processed
conf_file : str
Full path to the configuration file
"""
ameta = ArchiveMeta()
ameta.assign_conf(fn_conf)
ameta.read_wfpdb()
ameta.read_csv()
for fn in filenames:
fn = os.path.basename(fn)
pmeta = ameta.get_platemeta(filename=fn)
pmeta['archive_id'] = 0
pmeta.compute_values()
platedb = PlateDB()
platedb.open_connection(host=pmeta.output_db_host,
user=pmeta.output_db_user,
dbname=pmeta.output_db_name,
passwd=pmeta.output_db_passwd)
platedb.write_plate(pmeta)
platedb.write_scan(pmeta)
platedb.close_connection()
h = PlateHeader()
h.assign_conf(pmeta.conf)
h.assign_platemeta(pmeta)
h.update_from_platemeta()
h.assign_values()
h.update_comments()
h.rewrite()
h.reorder()
fn_header = os.path.splitext(fn)[0] + '.hdr'
h.output_to_file(fn_header)
proc = SolveProcess(fn)
proc.assign_conf(pmeta.conf)
proc.assign_header(h)
proc.setup()
proc.invert_plate()
proc.extract_sources()
proc.solve_plate()
proc.output_wcs_header()
proc.solve_recursive()
proc.output_sources_db()
proc.output_sources_csv()
proc.finish()