import asyncio
import logging
import math
from datetime import datetime
from typing import List, Tuple, Any, Dict, Optional
import numpy as np
import zwoasi as asi # type: ignore
from pyobs.interfaces import ICamera, IWindow, IBinning, ICooling, IImageFormat, IAbortable, ITemperatures, IGain
from pyobs.modules.camera.basecamera import BaseCamera
from pyobs.utils.enums import ImageFormat, ExposureStatus
from pyobs.images import Image
from pyobs.utils.parallel import event_wait
from pyobs.utils import exceptions as exc
log = logging.getLogger(__name__)
# map of image formats
FORMATS = {
ImageFormat.INT8: asi.ASI_IMG_RAW8,
ImageFormat.INT16: asi.ASI_IMG_RAW16,
ImageFormat.RGB24: asi.ASI_IMG_RGB24,
}
class AsiCamera(BaseCamera, ICamera, IWindow, IBinning, IImageFormat, IAbortable, IGain, ITemperatures):
"""A pyobs module for ASI cameras."""
__module__ = "pyobs_asi"
def __init__(self, camera: str, sdk: str = "/usr/local/lib/libASICamera2.so", **kwargs: Any):
"""Initializes a new AsiCamera.
Args:
camera: Name of camera to use.
sdk: Path to .so file from ASI SDK.
"""
BaseCamera.__init__(self, **kwargs)
# variables
self._camera_name = camera
self._sdk_path = sdk
self._camera: Optional[asi.Camera] = None
self._camera_info: Dict[str, Any] = {}
# window and binning and mode
self._window = (0, 0, 0, 0)
self._binning = 1
self._image_format = ImageFormat.INT16
self._gain: float = 1.0
async def open(self) -> None:
"""Open module."""
await BaseCamera.open(self)
# init driver
asi.init(self._sdk_path)
# get number of cameras
num_cameras = asi.get_num_cameras()
if num_cameras == 0:
raise ValueError("No cameras found")
# get ID of camera
# index() raises ValueError, if camera could not be found
cameras_found = asi.list_cameras()
camera_id = cameras_found.index(self._camera_name)
# open driver
self._camera = asi.Camera(camera_id)
self._camera_info = self._camera.get_camera_property()
log.info("Camera info:")
for key, val in self._camera_info.items():
log.info(" - %s: %s", key, val)
# Set some sensible defaults. They will need adjusting depending upon
# the sensitivity, lens and lighting conditions used.
self._camera.disable_dark_subtract()
self._camera.set_control_value(asi.ASI_WB_B, 99)
self._camera.set_control_value(asi.ASI_WB_R, 75)
self._camera.set_control_value(asi.ASI_GAMMA, 50)
self._camera.set_control_value(asi.ASI_BRIGHTNESS, 50)
self._camera.set_control_value(asi.ASI_FLIP, 0)
self._camera.set_image_type(asi.ASI_IMG_RAW16)
# enabling image mode
self._camera.stop_video_capture()
self._camera.stop_exposure()
# get initial window and binning
self._binning = self._camera.get_bin()
self._window = self._camera.get_roi()
async def get_full_frame(self, **kwargs: Any) -> Tuple[int, int, int, int]:
"""Returns full size of CCD.
Returns:
Tuple with left, top, width, and height set.
"""
return 0, 0, self._camera_info["MaxWidth"], self._camera_info["MaxHeight"]
async def get_window(self, **kwargs: Any) -> Tuple[int, int, int, int]:
"""Returns the camera window.
Returns:
Tuple with left, top, width, and height set.
"""
return self._window
async def get_binning(self, **kwargs: Any) -> Tuple[int, int]:
"""Returns the camera binning.
Returns:
Tuple with x and y.
"""
return self._binning, self._binning
async def set_window(self, left: int, top: int, width: int, height: int, **kwargs: Any) -> None:
"""Set the camera window.
Args:
left: X offset of window.
top: Y offset of window.
width: Width of window.
height: Height of window.
Raises:
ValueError: If binning could not be set.
"""
self._window = (left, top, width, height)
log.info("Setting window to %dx%d at %d,%d...", width, height, left, top)
async def set_binning(self, x: int, y: int, **kwargs: Any) -> None:
"""Set the camera binning.
Args:
x: X binning.
y: Y binning.
Raises:
ValueError: If binning could not be set.
"""
self._binning = x
log.info("Setting binning to %dx%d...", x, x)
async def list_binnings(self, **kwargs: Any) -> List[Tuple[int, int]]:
"""List available binnings.
Returns:
List of available binnings as (x, y) tuples.
"""
if "SupportedBins" in self._camera_info:
# create list of tuples
return [(b, b) for b in self._camera_info["SupportedBins"]]
else:
return []
async def _expose(self, exposure_time: float, open_shutter: bool, abort_event: asyncio.Event) -> Image:
"""Actually do the exposure, should be implemented by derived classes.
Args:
exposure_time: The requested exposure time in s.
open_shutter: Whether or not to open the shutter.
abort_event: Event that gets triggered when exposure should be aborted.
Returns:
The actual image.
Raises:
GrabImageError: If exposure was not successful.
"""
# no camera?
if self._camera is None:
raise ValueError("No camera initialised.")
# get image format
image_format = FORMATS[self._image_format]
# set window, divide width/height by binning
width = int(math.floor(self._window[2]) / self._binning)
height = int(math.floor(self._window[3]) / self._binning)
log.info(
"Set window to %dx%d (binned %dx%d with %dx%d) at %d,%d.",
self._window[2],
self._window[3],
width,
height,
self._binning,
self._binning,
self._window[0],
self._window[1],
)
self._camera.set_roi(int(self._window[0]), int(self._window[1]), width, height, self._binning, image_format)
# set status and exposure time in ms
self._camera.set_control_value(asi.ASI_EXPOSURE, int(exposure_time * 1e6))
# set gain
self._camera.set_control_value(asi.ASI_GAIN, int(self._gain))
log.info(
"Starting exposure with %s shutter for %s seconds and %s gain...", "open"
if open_shutter else "closed", exposure_time, self._gain
)
# do exposure
self._camera.start_exposure()
await asyncio.sleep(0.01)
# wait for image
while self._camera.get_exposure_status() == asi.ASI_EXP_WORKING:
# aborted?
if abort_event.is_set():
await self._change_exposure_status(ExposureStatus.IDLE)
raise InterruptedError("Aborted exposure.")
# sleep a little
await event_wait(abort_event, 0.01)
# success?
status = self._camera.get_exposure_status()
if status != asi.ASI_EXP_SUCCESS:
raise exc.GrabImageError("Could not capture image: %s" % status)
# get data
log.info("Exposure finished, reading out...")
await self._change_exposure_status(ExposureStatus.READOUT)
buffer = self._camera.get_data_after_exposure()
whbi = self._camera.get_roi_format()
# decide on image format
shape = [whbi[1], whbi[0]]
if image_format == asi.ASI_IMG_RAW8:
data = np.frombuffer(buffer, dtype=np.uint8)
elif image_format == asi.ASI_IMG_RAW16:
data = np.frombuffer(buffer, dtype=np.uint16)
elif image_format == asi.ASI_IMG_RGB24:
shape.append(3)
data = np.frombuffer(buffer, dtype=np.uint8)
else:
raise exc.GrabImageError("Unknown image format.")
# reshape
data = data.reshape(shape)
# special treatment for RGB images
if image_format == asi.ASI_IMG_RGB24:
# convert BGR to RGB
data = data[:, :, ::-1]
# now we need to separate the R, G, and B images
# this is easiest done by shifting the RGB axis from last to first position
# i.e. we go from RGBRGBRGBRGBRGB to RRRRRGGGGGBBBBB
data = np.moveaxis(data, 2, 0)
# get date obs
date_obs = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")
# create FITS image and set header
image = Image(data)
image.header["DATE-OBS"] = (date_obs, "Date and time of start of exposure")
image.header["EXPTIME"] = (exposure_time, "Exposure time [s]")
# instrument and detector
image.header["INSTRUME"] = (self._camera_name, "Name of instrument")
# binning
image.header["XBINNING"] = image.header["DET-BIN1"] = (self._binning, "Binning factor used on X axis")
image.header["YBINNING"] = image.header["DET-BIN2"] = (self._binning, "Binning factor used on Y axis")
# window
image.header["XORGSUBF"] = (self._window[0], "Subframe origin on X axis")
image.header["YORGSUBF"] = (self._window[1], "Subframe origin on Y axis")
# statistics
image.header["DATAMIN"] = (float(np.min(data)), "Minimum data value")
image.header["DATAMAX"] = (float(np.max(data)), "Maximum data value")
image.header["DATAMEAN"] = (float(np.mean(data)), "Mean data value")
# pixels
image.header["DET-PIXL"] = (self._camera_info["PixelSize"] / 1000.0, "Size of detector pixels (square) [mm]")
image.header["DET-GAIN"] = (self._camera_info["ElecPerADU"] * self._gain, "Detector gain [e-/ADU]")
# Bayer pattern?
if image_format in [asi.ASI_IMG_RAW8, asi.ASI_IMG_RAW16]:
image.header["BAYERPAT"] = image.header["COLORTYP"] = ("GBRG", "Bayer pattern for colors")
# temperature
temperature = self._get_temperature()
image.header["DET-TEMP"] = (temperature, "CCD temperature [C]")
# biassec/trimsec
self.set_biassec_trimsec(image.header, *self._window)
# return FITS image
log.info("Readout finished.")
return image
async def _abort_exposure(self) -> None:
"""Abort the running exposure. Should be implemented by derived class.
Raises:
ValueError: If an error occured.
"""
pass
async def set_image_format(self, fmt: ImageFormat, **kwargs: Any) -> None:
"""Set the camera image format.
Args:
fmt: New image format.
Raises:
ValueError: If format could not be set.
"""
if fmt not in FORMATS:
raise ValueError("Unsupported image format.")
self._image_format = fmt
async def get_image_format(self, **kwargs: Any) -> ImageFormat:
"""Returns the camera image format.
Returns:
Current image format.
"""
return self._image_format
async def list_image_formats(self, **kwargs: Any) -> List[str]:
"""List available image formats.
Returns:
List of available image formats.
"""
return [f.value for f in FORMATS.keys()]
async def get_temperatures(self, **kwargs: Any) -> Dict[str, float]:
temperature = self._get_temperature()
return {"CCD": temperature}
def _get_temperature(self) -> float:
"""
Gets the temperature from the camera.
Reading is divided by 10, since ASI_TEMPERATURE returns temp * 10
"""
return self._camera.get_control_value(asi.ASI_TEMPERATURE)[0] / 10
async def set_gain(self, gain: float, **kwargs: Any) -> None:
"""Set the camera gain.
Args:
gain: New camera gain.
Raises:
ValueError: If gain could not be set.
"""
self._gain = gain
async def get_gain(self, **kwargs: Any) -> float:
"""Returns the camera gain.
Returns:
Current gain.
"""
return self._gain
[docs]
class AsiCoolCamera(AsiCamera, ICooling):
"""A pyobs module for ASI cameras with cooling."""
def __init__(self, setpoint: int = -20, **kwargs: Any):
"""Initializes a new AsiCoolCamera.
Args:
setpoint: Cooling temperature setpoint.
"""
AsiCamera.__init__(self, **kwargs)
# variables
self._temp_setpoint = setpoint
[docs]
async def open(self) -> None:
"""Open module."""
await AsiCamera.open(self)
# no cooling support?
if not self._camera_info["IsCoolerCam"]:
raise ValueError("Camera has no support for cooling.")
# activate cooling
await self.set_cooling(True, self._temp_setpoint)
[docs]
async def get_cooling(self, **kwargs: Any) -> Tuple[bool, float, float]:
"""Returns the current status for the cooling.
Returns:
Tuple containing:
Enabled (bool): Whether the cooling is enabled
SetPoint (float): Setpoint for the cooling in celsius.
Power (float): Current cooling power in percent or None.
"""
# no camera?
if self._camera is None:
raise ValueError("No camera initialised.")
# return
enabled = self._camera.get_control_value(asi.ASI_COOLER_ON)[0]
temp = self._camera.get_control_value(asi.ASI_TARGET_TEMP)[0]
power = self._camera.get_control_value(asi.ASI_COOLER_POWER_PERC)[0]
return enabled, temp, power
[docs]
async def get_temperatures(self, **kwargs: Any) -> Dict[str, float]:
"""Returns all temperatures measured by this module.
Returns:
Dict containing temperatures.
"""
# no camera?
if self._camera is None:
raise ValueError("No camera initialised.")
# return
return {"CCD": self._camera.get_control_value(asi.ASI_TEMPERATURE)[0] / 10.0}
[docs]
async def set_cooling(self, enabled: bool, setpoint: float, **kwargs: Any) -> None:
"""Enables/disables cooling and sets setpoint.
Args:
enabled: Enable or disable cooling.
setpoint: Setpoint in celsius for the cooling.
Raises:
ValueError: If cooling could not be set.
"""
# no camera?
if self._camera is None:
raise ValueError("No camera initialised.")
# log
if enabled:
log.info("Enabling cooling with a setpoint of %.2f°C...", setpoint)
self._camera.set_control_value(asi.ASI_TARGET_TEMP, int(setpoint))
self._camera.set_control_value(asi.ASI_COOLER_ON, 1)
else:
log.info("Disabling cooling...")
self._camera.set_control_value(asi.ASI_COOLER_ON, 0)
__all__ = ["AsiCamera", "AsiCoolCamera"]