"""
:class:`hal.Sensor` device adapters shipped with MicroscoPy:
- :class:`Andor` EMCCD cameras
- PCO :class:`Pixelfly` CCD cameras
- MCCDAQ USB2600 analog input
"""
import numpy as np
import time
import sys
from math import copysign
import matplotlib.pyplot as plt
#from .. import hal
from microscopy import hal
[docs]class Dummy_Camera(hal.Sensor):
"""Dummy scan for debug purpose."""
def __init__(self):
self.snap_count = 0
self.current_position = 0
def snap(self, **kwargs):
self.paused = False #
self.snap_count += 1
self.status = True
#print "snap()+ # "+str(self.snap_count)
return self.snap_count
def update_display(self):
pass
# Andor EMCCD adapters
# Available only if the andor driver can be imported
try:
from drivers.andor import AndorError
[docs] class Andor(hal.Sensor):
"""Device adapter for the Andor EMCCD :py:class:`drivers.andor.Andor`.
The camera is operated in Kinetic mode.
The exposure property allows changing the exposure on-the-fly (i.e.,
without pausing the scan first).
"""
def __init__(self, cam, n=1, display=None, thermistor=None):
"""
:param cam: an AndorUI object
:type cam: :py:class:`drivers.andor.Andor`
:param n: number of images to take (default 1)
:type n: int
:param display: the active camera live display
:type display: :py:class:`drivers.andor.LiveDisplay`
:param thermistor: a Thermistor object to keep track of ambient temperature.
"""
self.cam = cam
self.n = n
self.display = display
self.thermistor = thermistor
self._sleep = []
self.snap_info_description = "(exposure, )"
self._queued_exposure = None
@property
def exposure(self):
"""Query or set the exposure time (in ms).
If the camera is acquiring, the exposure change will be applied once the current snap is finished.
"""
return self.cam.exposure
@exposure.setter
def exposure(self, exp_ms):
try:
self.cam.exposure = exp_ms
except AndorError as error:
if error.string is "DRV_ACQUIRING":
self._queued_exposure = exp_ms
else:
raise error
[docs] def snap(self, new=True):
"""Return the latest data, either from a new acquisition sequence (new=True, default),
or from the last one (new=False).
The data is also available """
if new:
self.paused = False
self.cam.Acquire.start()
#self.cam.Acquire.wait() # this blocks the main thread also! Use sleep + polling instead
time.sleep(0.9*self.cam.exposure / 1000.0) # sleep for one exposure time (which is in ms)
self._sleep.append(0)
while self.cam.Acquire.running:
self._sleep[-1] += 1
time.sleep(0.01)
self.status = True
#get data and insert a reference here and in andorui as well.
self.last_snap = self.cam.Acquire.GetAcquiredData()
self.cam.Acquire.last_snap = self.last_snap
# Apply exposure change
if self._queued_exposure is not None:
self.cam.exposure = self._queued_exposure
self._queued_exposure = None
return self.last_snap
def snap_info(self):
return (self.cam.exposure, )
def update_display(self):
if self.display is not None:
self.display.update()
else:
pass
def initialise_acquisition(self):
self.saved_mode = self.cam.Acquire
self.cam.Acquire.stop()
self.cam.Acquire.Kinetic(self.n, 0)
def terminate_acquisition(self):
self.cam.Acquire = self.saved_mode
self.cam.Acquire()
[docs] def dark(self):
"""Return a dark image (acquired with shutter closed)."""
self.cam.Shutter.Close()
self.dark_image = self.snap()
self.cam.Shutter.Open()
return self.dark_image
[docs] def intensity(self, center, width, new=True):
# Not used, currently the interferometer relies on roi_1D
"""Return the sum of pixel values in a ROI centered at *center*, of with *width*.
*center* and *width* must be tuples. Works with 1D and 2D data.
If new=True, snap a new image, otherwise use the last acquired image."""
imdata = self.snap(new=new)
self.imdata = imdata
imdata = imdata.sum(0) # add all images in Kinematic sequence.
if imdata.ndim == 2:
subarea = imdata[center[0]-width[0]:center[0]+width[0],
center[1]-width[1]:center[1]+width[1]]
else:
subarea = imdata[center[0]-width[0]:center[0]+width[0]]
return subarea.sum()
[docs] def roi_1D(self, center, width, new=True):
"""Return a region of interest from a 1D spectrum.
:param int center: center of the ROI (in pixel).
:param int width: half-width of the ROI (in pixels).
:param bool new: if True (default), acquire a new spectrum.
Otherwise use the last acquired spectrum.
:return: 1D array
.. todo::
This looks a bit trivial.
Should we move all of this code to :class:`interferometer.RayleighIntensity`?
"""
imdata = self.snap(new=new)
imdata = imdata.sum(0) # add all images in Kinematic sequence.
assert imdata.ndim == 1, "intensity_with_max() works only on 1D data"
return imdata[center-width:center+width]
[docs] class AndorExposures(Andor):
"""An extension of the Andor camera class where for each scan position several
images are taken with different exposures."""
def __init__(self, cam, exposures, n=1, display=None, thermistor=None):
super(AndorExposures, self).__init__(cam, n=n, display=display, thermistor=thermistor)
self.exposures = exposures
def snap(self, **kwargs):
self.paused = False
data = []
for exp in self.exposures:
self.cam.exposure = exp
self.cam.Acquire.start()
self.cam.Acquire.wait()
data.append(self.cam.Acquire.GetAcquiredData())
self.status = True
return np.array(data)
except ImportError:
pass
[docs]class VoltIn(hal.Sensor):
"""Analog input adapter."""
def __init__(self, ai, scaling=-1, wait=0):
"""
:param ai: analog input channel (any callable object that returns a float when called)
:param float scaling: a multiplicative factor applied to the measured voltage
:type float wait: delay (settling time) before taking measurement
"""
self.ai = ai
self.wait = wait
self.scaling = scaling
def snap(self, **kwargs):
self.paused = False
self.status = True
time.sleep(self.wait)
return self.scaling * self.ai()
def roi_1D(self, **kwargs):
return self.snap()
# Adapter for PCO Pixelfly CCD cameras.
# Will only be available if the pxlfly driver can be imported
try:
from pxlfly import CameraBuffer
[docs] class Pixelfly(hal.Sensor):
"""Sensor adapter for PCO Pixelfly CCD cameras"""
def __init__(self, camera):
""":param cam: a :class:`pxlflygui.PixelflyGUI` instance"""
self.cam = camera
status = True
def snap(self, new=True):
with CameraBuffer(self.cam):
return self.cam.getframe().copy()
except ImportError:
pass