import asyncio
import datetime
import logging
import warnings
from abc import ABCMeta, abstractmethod
from typing import Tuple, Optional, Dict, Any, NamedTuple, List
import numpy as np
from astropy.io import fits
from numpy.typing import NDArray
from pyobs.mixins.fitsheader import ImageFitsHeaderMixin
from pyobs.utils.enums import ImageType, ExposureStatus
from pyobs.images import Image
from pyobs.modules import Module
from pyobs.events import NewImageEvent, ExposureStatusChangedEvent
from pyobs.interfaces import ICamera, IExposureTime, IImageType
from pyobs.modules import timeout
from pyobs.utils import exceptions as exc
log = logging.getLogger(__name__)
class CameraException(Exception):
pass
class ExposureInfo(NamedTuple):
"""Info about a running exposure."""
start: datetime.datetime
exposure_time: float
async def calc_expose_timeout(camera: IExposureTime, *args: Any, **kwargs: Any) -> float:
"""Calculates timeout for expose()."""
return await camera.get_exposure_time() + 30
class BaseCamera(Module, ImageFitsHeaderMixin, ICamera, IExposureTime, IImageType, metaclass=ABCMeta):
"""Base class for all camera modules."""
__module__ = "pyobs.modules.camera"
def __init__(
self,
fits_headers: Optional[Dict[str, Any]] = None,
centre: Optional[Tuple[float, float]] = None,
rotation: float = 0.0,
flip: bool = False,
filenames: str = "/cache/pyobs-{DAY-OBS|date:}-{FRAMENUM|string:04d}-{IMAGETYP|type}00.fits.gz",
fits_namespaces: Optional[List[str]] = None,
**kwargs: Any,
):
"""Creates a new BaseCamera.
Args:
fits_headers: Additional FITS headers.
centre: (x, y) tuple of camera centre.
rotation: Rotation east of north.
flip: Whether or not to flip the image along its first axis.
filenames: Template for file naming.
fits_namespaces: List of namespaces for FITS headers that this camera should request
"""
Module.__init__(self, **kwargs)
ImageFitsHeaderMixin.__init__(
self,
fits_namespaces=fits_namespaces,
fits_headers=fits_headers,
centre=centre,
rotation=rotation,
filenames=filenames,
)
# check
if self.comm is None:
log.warning("No comm module given, will not be able to signal new images!")
# store
self._flip = flip
self._exposure_time: float = 0.0
self._image_type = ImageType.OBJECT
# init camera
self._exposure: Optional[ExposureInfo] = None
self._camera_status = ExposureStatus.IDLE
# multi-threading
self.expose_abort = asyncio.Event()
# register exception
exc.register_exception(exc.GrabImageError, 3, timespan=600, callback=self._default_remote_error_callback)
[docs] async def open(self) -> None:
"""Open module."""
await Module.open(self)
# subscribe to events
if self.comm:
await self.comm.register_event(NewImageEvent)
await self.comm.register_event(ExposureStatusChangedEvent)
[docs] async def set_exposure_time(self, exposure_time: float, **kwargs: Any) -> None:
"""Set the exposure time in seconds.
Args:
exposure_time: Exposure time in seconds.
Raises:
ValueError: If exposure time could not be set.
"""
log.info("Setting exposure time to %.5fs...", exposure_time)
self._exposure_time = exposure_time
[docs] async def get_exposure_time(self, **kwargs: Any) -> float:
"""Returns the exposure time in seconds.
Returns:
Exposure time in seconds.
"""
return self._exposure_time
[docs] async def set_image_type(self, image_type: ImageType, **kwargs: Any) -> None:
"""Set the image type.
Args:
image_type: New image type.
"""
log.info("Setting image type to %s...", image_type)
self._image_type = image_type
[docs] async def get_image_type(self, **kwargs: Any) -> ImageType:
"""Returns the current image type.
Returns:
Current image type.
"""
return self._image_type
async def _change_exposure_status(self, status: ExposureStatus) -> None:
"""Change exposure status and send event,
Args:
status: New exposure status.
"""
# send event, if it changed
if self._camera_status != status:
await self.comm.send_event(ExposureStatusChangedEvent(last=self._camera_status, current=status))
# set it
self._camera_status = status
[docs] async def get_exposure_status(self, **kwargs: Any) -> ExposureStatus:
"""Returns the current status of the camera, which is one of 'idle', 'exposing', or 'readout'.
Returns:
Current status of camera.
"""
return self._camera_status
[docs] async def get_exposure_time_left(self, **kwargs: Any) -> float:
"""Returns the remaining exposure time on the current exposure in seconds.
Returns:
Remaining exposure time in seconds.
"""
# if we're not exposing, there is nothing left
if self._exposure is None:
return 0.0
# calculate difference between start of exposure and now, and return in ms
duration = datetime.timedelta(seconds=self._exposure.exposure_time)
diff = self._exposure.start + duration - datetime.datetime.utcnow()
return diff.total_seconds()
[docs] async def get_exposure_progress(self, **kwargs: Any) -> float:
"""Returns the progress of the current exposure in percent.
Returns:
Progress of the current exposure in percent.
"""
# if we're not exposing, there is no progress
if self._exposure is None:
return 0.0
# calculate difference between start of exposure and now
diff = datetime.datetime.utcnow() - self._exposure[0]
# zero exposure time?
if self._exposure.exposure_time == 0.0 or self._camera_status == ExposureStatus.READOUT:
return 100.0
else:
# return max of 100
percentage = diff.total_seconds() / self._exposure[1] * 100.0
return min(percentage, 100.0)
@abstractmethod
async def _expose(self, exposure_time: float, open_shutter: bool, abort_event: asyncio.Event) -> Image:
"""Actually do the exposure, should be implemented by derived classes.
Args:
exposure_time: The requested exposure time in seconds.
open_shutter: Whether or not to open the shutter.
abort_event: Event that gets triggered when exposure should be aborted.
Returns:
The actual image.
Raises:
GrabImageError: If exposure was not successful.
"""
...
async def __expose(self, exposure_time: float, image_type: ImageType, broadcast: bool) -> Tuple[Image, str]:
"""Wrapper for a single exposure.
Args:
exposure_time: The requested exposure time in seconds.
image_type: Type of image.
broadcast: Whether or not the new image should be broadcasted.
Returns:
Tuple of the image itself and its filename.
Raises:
GrabImageError: If there was a problem grabbing the image.
"""
# request fits headers
header_futures_before = await self.request_fits_headers(before=True)
# open the shutter?
open_shutter = image_type not in [ImageType.BIAS, ImageType.DARK]
# reset abort event
self.expose_abort.clear()
# do the exposure
self._exposure = ExposureInfo(start=datetime.datetime.utcnow(), exposure_time=exposure_time)
try:
image = await self._expose(exposure_time, open_shutter, abort_event=self.expose_abort)
if image is None or image.data is None:
raise exc.GrabImageError("Could not take image.")
except exc.PyObsError:
# exposure was not successful (aborted?), so reset everything and re-raise
self._exposure = None
raise
except Exception as e:
# exposure was not successful (aborted?), so reset everything and wrap exception
self._exposure = None
raise exc.GrabImageError(str(e))
# request fits headers again
header_futures_after = await self.request_fits_headers(before=False)
# flip it?
if self._flip:
# do we have three dimensions in array? need this for deciding which axis to flip
is_3d = len(image.data.shape) == 3
# flip image and make contiguous again
flipped: NDArray[Any] = np.flip(image.data, axis=1 if is_3d else 0)
image.data = np.ascontiguousarray(flipped)
# add HDU name
image.header["EXTNAME"] = "SCI"
# add image type
image.header["IMAGETYP"] = image_type.value
# add fits headers and format filename
await self.add_requested_fits_headers(image, header_futures_before)
await self.add_requested_fits_headers(image, header_futures_after)
await self.add_fits_headers(image)
filename = self.format_filename(image)
# don't want to save?
if filename is None:
self._exposure = None
raise exc.GrabImageError("No filename given.")
# upload file
try:
log.info("Uploading image to file server...")
await self.vfs.write_image(filename, image)
except FileNotFoundError:
raise ValueError("Could not upload image.")
# broadcast image path
if broadcast and self.comm:
log.info("Broadcasting image ID...")
await self.comm.send_event(NewImageEvent(filename, image_type))
# return image and unique
self._exposure = None
log.info("Finished image %s.", filename)
return image, filename
[docs] @timeout(calc_expose_timeout)
async def grab_image(self, broadcast: bool = True, **kwargs: Any) -> str:
"""Grabs an image ans returns reference.
Args:
broadcast: Broadcast existence of image.
Returns:
Name of image that was taken.
Raises:
GrabImageError: If there was a problem grabbing the image.
"""
# are we exposing?
if self._camera_status != ExposureStatus.IDLE:
raise CameraException("Cannot start new exposure because camera is not idle.")
await self._change_exposure_status(ExposureStatus.EXPOSING)
# expose
try:
image, filename = await self.__expose(self._exposure_time, self._image_type, broadcast)
finally:
await self._change_exposure_status(ExposureStatus.IDLE)
# check
if image is None:
raise exc.GrabImageError("Could not take image.")
else:
if filename is None:
raise ValueError("Image has not been saved, so cannot be retrieved by filename.")
# return filename
return filename
async def _abort_exposure(self) -> None:
"""Abort the running exposure. Should be implemented by derived class.
Raises:
ValueError: If an error occured.
"""
pass
[docs] async def abort(self, **kwargs: Any) -> None:
"""Aborts the current exposure and sequence.
Returns:
Success or not.
"""
# set abort event
log.info("Aborting current image and sequence...")
self.expose_abort.set()
# do camera-specific abort
await self._abort_exposure()
# wait until state is not EXPOSING anymore
while await self.get_exposure_status() == ExposureStatus.EXPOSING:
await asyncio.sleep(0.1)
[docs] @staticmethod
def set_biassec_trimsec(hdr: fits.Header, left: int, top: int, width: int, height: int) -> None:
"""Calculates and sets the BIASSEC and TRIMSEC areas.
Args:
hdr: FITS header (in/out)
left: left edge of data area
top: top edge of data area
width: width of data area
height: height of data area
"""
# get image area in unbinned coordinates
img_left = hdr["XORGSUBF"]
img_top = hdr["YORGSUBF"]
img_width = hdr["NAXIS1"] * hdr["XBINNING"]
img_height = hdr["NAXIS2"] * hdr["YBINNING"]
# get intersection
is_left = max(left, img_left)
is_right = min(left + width, img_left + img_width)
is_top = max(top, img_top)
is_bottom = min(top + height, img_top + img_height)
# for simplicity we allow prescan/overscan only in one dimension
if (left < is_left or left + width > is_right) and (top < is_top or top + height > is_bottom):
log.warning("BIASSEC/TRIMSEC can only be calculated with a prescan/overscan on one axis only.")
return
# comments
c1 = "Bias overscan area [x1:x2,y1:y2] (binned)"
c2 = "Image area [x1:x2,y1:y2] (binned)"
# rectangle empty?
if is_right <= is_left or is_bottom <= is_top:
# easy case, all is BIASSEC, no TRIMSEC at all
hdr["BIASSEC"] = ("[1:%d,1:%d]" % (hdr["NAXIS1"], hdr["NAXIS2"]), c1)
return
# we got a TRIMSEC, calculate its binned and windowd coordinates
is_left_binned = np.floor((is_left - hdr["XORGSUBF"]) / hdr["XBINNING"]) + 1
is_right_binned = np.ceil((is_right - hdr["XORGSUBF"]) / hdr["XBINNING"])
is_top_binned = np.floor((is_top - hdr["YORGSUBF"]) / hdr["YBINNING"]) + 1
is_bottom_binned = np.ceil((is_bottom - hdr["YORGSUBF"]) / hdr["YBINNING"])
# set it
hdr["TRIMSEC"] = ("[%d:%d,%d:%d]" % (is_left_binned, is_right_binned, is_top_binned, is_bottom_binned), c2)
hdr["DATASEC"] = ("[%d:%d,%d:%d]" % (is_left_binned, is_right_binned, is_top_binned, is_bottom_binned), c2)
# now get BIASSEC -- whatever we do, we only take the last (!) one
# which axis?
if img_left + img_width > left + width:
left_binned = np.floor((is_right - hdr["XORGSUBF"]) / hdr["XBINNING"]) + 1
hdr["BIASSEC"] = ("[%d:%d,1:%d]" % (left_binned, hdr["NAXIS1"], hdr["NAXIS2"]), c1)
elif img_left < left:
right_binned = np.ceil((is_left - hdr["XORGSUBF"]) / hdr["XBINNING"])
hdr["BIASSEC"] = ("[1:%d,1:%d]" % (right_binned, hdr["NAXIS2"]), c1)
elif img_top + img_height > top + height:
top_binned = np.floor((is_bottom - hdr["YORGSUBF"]) / hdr["YBINNING"]) + 1
hdr["BIASSEC"] = ("[1:%d,%d:%d]" % (hdr["NAXIS1"], top_binned, hdr["NAXIS2"]), c1)
elif img_top < top:
bottom_binned = np.ceil((is_top - hdr["YORGSUBF"]) / hdr["YBINNING"])
hdr["BIASSEC"] = ("[1:%d,1:%d]" % (hdr["NAXIS1"], bottom_binned), c1)
[docs] async def list_binnings(self, **kwargs: Any) -> List[Tuple[int, int]]:
"""List available binnings.
Returns:
List of available binnings as (x, y) tuples.
"""
warnings.warn(
"The default implementation for list_binnings() in BaseCamera will be removed in future versions",
DeprecationWarning,
)
return [(1, 1), (2, 2), (3, 3)]
__all__ = ["BaseCamera", "CameraException"]