Source code for pyobs.modules.camera.dummycamera

from __future__ import annotations

import asyncio
import glob
import logging
from datetime import UTC, datetime
from typing import Any, NamedTuple

import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.modeling import models
from astropy.table import Table
from astropy.wcs import WCS
from numpy.typing import NDArray

from pyobs.images import Image
from pyobs.interfaces import (
    BinningCapabilities,
    BinningState,
    CoolingState,
    GainState,
    IBinning,
    ICooling,
    IGain,
    IImageFormat,
    ImageFormatCapabilities,
    ImageFormatState,
    IPointingRaDec,
    ITemperatures,
    IWindow,
    RaDecState,
    SensorReading,
    TemperaturesState,
    WindowCapabilities,
    WindowState,
)
from pyobs.modules.camera.basecamera import BaseCamera
from pyobs.utils.enums import ExposureStatus, ImageFormat, ImageType
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class CoolingStatus(NamedTuple):
    enabled: bool = True
    set_point: float = -10.0
    power: int = 80
    temperatures: dict[str, float] = {"CCD": 0.0, "Back": 3.14}


class DummyCamera(BaseCamera, IWindow, IBinning, ICooling, IGain, IImageFormat):
    """A dummy camera for testing."""

    __module__ = "pyobs.modules.camera"

    def __init__(
        self,
        readout_time: float = 2,
        image_size: tuple[int, int] | None = None,
        pixel_size: float = 0.015,
        focal_length: float = 5000.0,
        images: str | None = None,
        max_mag: float = 20.0,
        seeing: float = 3.0,
        telescope: str | None = None,
        **kwargs: Any,
    ):
        """Creates a new dummy camera.

        Args:
            readout_time: Readout time in seconds.
            image_size: Size of simulated image in pixels (width, height).
            pixel_size: Square pixel size in mm.
            focal_length: Focal length in mm (for plate scale calculation).
            images: Filename pattern for pre-recorded images to use instead of simulation.
            max_mag: Maximum magnitude of simulated stars.
            seeing: Seeing in arcsec FWHM.
            telescope: Name of telescope module to read pointing from (for star simulation).
        """
        BaseCamera.__init__(self, **kwargs)
        self.add_background_task(self._cooling_thread, True)

        self._readout_time = readout_time

        # image geometry
        self._full_frame: tuple[int, int, int, int] = (
            (0, 0, image_size[0], image_size[1]) if image_size is not None else (0, 0, 512, 512)
        )
        self._window = self._full_frame
        self._binning = (1, 1)
        self._pixel_size = pixel_size
        self._focal_length = focal_length
        self._max_mag = max_mag
        self._seeing = seeing

        # pre-recorded images
        self._sim_images: list[str] | None = (
            sorted(glob.glob(images)) if images and ("*" in images or "?" in images) else ([images] if images else None)
        )

        # telescope state (updated via subscribe_state)
        self._telescope_module = telescope
        self._telescope_pos: SkyCoord = SkyCoord(0.0 * u.deg, 0.0 * u.deg, frame="icrs")

        # camera state
        self._cooling = CoolingStatus()
        self._exposing = True
        self._gain = 10.0
        self._gain_offset = 0.0
        self._image_format = ImageFormat.INT16
        self._image_type = ImageType.OBJECT

        # GAIA catalog cache
        self._catalog: Table | None = None
        self._catalog_coords: SkyCoord | None = None

    def _on_telescope_state(self, state: RaDecState) -> None:
        """Update cached telescope position from IPointingRaDec state."""
        self._telescope_pos = SkyCoord(ra=state.ra * u.deg, dec=state.dec * u.deg, frame="icrs")

[docs] async def open(self) -> None: """Opens camera.""" # publish capabilities before super().open() await self.comm.set_capabilities( IWindow, WindowCapabilities( full_frame_x=self._full_frame[0], full_frame_y=self._full_frame[1], full_frame_width=self._full_frame[2], full_frame_height=self._full_frame[3], ), ) await self.comm.set_capabilities( IBinning, BinningCapabilities(binnings=[BinningState(x=i[0], y=i[1]) for i in [(1, 1), (2, 2), (3, 3)]]), ) await self.comm.set_capabilities( IImageFormat, ImageFormatCapabilities(image_formats=[ImageFormat.INT8, ImageFormat.INT16]) ) await BaseCamera.open(self) # subscribe to telescope pointing if given if self._telescope_module: await self.comm.subscribe_state(self._telescope_module, IPointingRaDec, self._on_telescope_state) # publish initial states await self.comm.set_state( ICooling, CoolingState(setpoint=self._cooling.set_point, power=self._cooling.power, enabled=self._cooling.enabled), ) await self.comm.set_state(IGain, GainState(gain=self._gain, offset=self._gain_offset)) await self.comm.set_state(IWindow, WindowState(*self._full_frame)) await self.comm.set_state(IBinning, BinningState(*self._binning)) await self.comm.set_state(IImageFormat, ImageFormatState(image_format=self._image_format))
async def _cooling_thread(self) -> None: while True: temps = dict(self._cooling.temperatures) temps["CCD"] -= (temps["CCD"] - self._cooling.set_point) * 0.05 power = (60.0 - temps["CCD"]) / 70.0 * 100.0 self._cooling = CoolingStatus( enabled=self._cooling.enabled, set_point=self._cooling.set_point, power=int(power), temperatures=temps, ) await self.comm.set_state( ICooling, CoolingState(setpoint=self._cooling.set_point, power=int(power), enabled=self._cooling.enabled), ) await self.comm.set_state( ITemperatures, TemperaturesState(readings=[SensorReading(name=name, value=value) for name, value in temps.items()]), ) await asyncio.sleep(1) def _sun_alt(self) -> float: """Returns current solar altitude in degrees, or -18 if no observer.""" if self._observer is not None: return float(self.observer.sun_altaz(Time.now()).alt.degree) return -18.0 def _simulate_image(self, exp_time: float, open_shutter: bool) -> NDArray[Any]: from photutils.datasets import make_model_image, make_noise_image shape = (int(self._window[3]), int(self._window[2])) data = make_noise_image(shape, distribution="gaussian", mean=10, stddev=1.0) if exp_time > 0: data += make_noise_image(shape, distribution="gaussian", mean=exp_time / 1e4, stddev=exp_time / 1e5) if open_shutter: sun_alt = self._sun_alt() flat_counts = 30000 / np.exp(-1.28 * (4.209 + sun_alt)) * exp_time data += make_noise_image(shape, distribution="gaussian", mean=flat_counts, stddev=flat_counts / 10.0) sources = self._get_sources_table(exp_time) sources = sources[ (sources["x_mean"] > 0) & (sources["x_mean"] < shape[1]) & (sources["y_mean"] > 0) & (sources["y_mean"] < shape[0]) ] model = models.Moffat2D() data += make_model_image(shape, model, sources, model_shape=(15, 15)) data[data > 65535] = 65535 return data.astype(np.uint16) def _create_header(self, exp_time: float, time: Time, data: NDArray[Any]) -> fits.Header: hdr = fits.Header() hdr["NAXIS1"] = data.shape[1] hdr["NAXIS2"] = data.shape[0] hdr["DATE-OBS"] = (time.isot, "Date and time of start of exposure") hdr["EXPTIME"] = (exp_time, "Exposure time [s]") hdr["XBINNING"] = hdr["DET-BIN1"] = (int(self._binning[0]), "Binning factor used on X axis") hdr["YBINNING"] = hdr["DET-BIN2"] = (int(self._binning[1]), "Binning factor used on Y axis") hdr["XORGSUBF"] = (int(self._window[0]), "Subframe origin on X axis") hdr["YORGSUBF"] = (int(self._window[1]), "Subframe origin on Y axis") hdr["TEL-FOCL"] = (self._focal_length, "Focal length [mm]") hdr["DET-PIXL"] = (self._pixel_size, "Size of detector pixels (square) [mm]") hdr["DATAMIN"] = float(np.min(data)) hdr["DATAMAX"] = float(np.max(data)) hdr["DATAMEAN"] = float(np.mean(data)) return hdr def _get_catalog(self, fov: float) -> Table: if self._catalog_coords is None or self._catalog_coords.separation(self._telescope_pos) > 10.0 * u.arcmin: from astroquery.utils.tap import TapPlus ra, dec = self._telescope_pos.ra.degree, self._telescope_pos.dec.degree tap = TapPlus(url="https://gea.esac.esa.int/tap-server/tap") query = f""" SELECT TOP 1000 DISTANCE(POINT('ICRS', ra, dec), POINT('ICRS', {ra}, {dec})) as dist, ra, dec, phot_g_mean_flux, phot_g_mean_mag FROM gaiadr2.gaia_source WHERE 1 = CONTAINS(POINT('ICRS', ra, dec), CIRCLE('ICRS', {ra}, {dec}, {fov * 1.5})) AND phot_g_mean_mag < {self._max_mag} ORDER BY phot_g_mean_mag ASC """ job = tap.launch_job(query) self._catalog = job.get_results() self._catalog_coords = self._telescope_pos return self._catalog # type: ignore[return-value] def _get_sources_table(self, exp_time: float) -> Table: tmp = 360.0 / (2.0 * np.pi) * self._pixel_size / self._focal_length cdelt1, cdelt2 = tmp * self._binning[0], tmp * self._binning[1] fov = np.max(cdelt2 * np.array(self._full_frame[2:])) cat = self._get_catalog(fov) w = WCS(naxis=2) w.wcs.crpix = [self._window[3] / 2.0, self._window[2] / 2.0] w.wcs.cdelt = np.array([-cdelt1, cdelt2]) w.wcs.crval = [self._telescope_pos.ra.degree, self._telescope_pos.dec.degree] w.wcs.ctype = ["RA---TAN", "DEC--TAN"] fwhm = self._seeing / 3600.0 / cdelt1 / 2.3548 cat["x"], cat["y"] = w.wcs_world2pix(cat["ra"], cat["dec"], 0) sources = cat["x", "y", "phot_g_mean_flux", "phot_g_mean_mag"] sources.rename_columns(["x", "y", "phot_g_mean_flux"], ["x_mean", "y_mean", "flux"]) sources.add_column([fwhm] * len(sources), name="x_stddev") sources.add_column([fwhm] * len(sources), name="y_stddev") sources["flux"] *= exp_time return sources def _get_image(self, exp_time: float, open_shutter: bool) -> Image: now = Time.now() if self._sim_images: filename = self._sim_images.pop(0) self._sim_images.append(filename) return Image.from_file(filename) data = self._simulate_image(exp_time, open_shutter) hdr = self._create_header(exp_time, now, data) return Image(data, header=hdr) async def _expose(self, exposure_time: float, open_shutter: bool, abort_event: asyncio.Event) -> Image: log.info("Starting exposure with %s shutter...", "open" if open_shutter else "closed") date_obs = datetime.now(UTC) self._exposing = True loop = asyncio.get_running_loop() hdu_future = loop.run_in_executor(None, self._get_image, exposure_time, open_shutter) steps = 10 for _ in range(steps): if abort_event.is_set() or not self._exposing: self._exposing = False await self._change_exposure_status(ExposureStatus.IDLE) raise InterruptedError("Exposure was aborted.") await asyncio.sleep(exposure_time / steps) self._exposing = False await self._change_exposure_status(ExposureStatus.READOUT) await asyncio.sleep(self._readout_time) image = await hdu_future image.header["EXPTIME"] = exposure_time image.header["DATE-OBS"] = date_obs.strftime("%Y-%m-%dT%H:%M:%S.%f") image.header["XBINNING"] = image.header["DET-BIN1"] = (self._binning[0], "Binning factor used on X axis") image.header["YBINNING"] = image.header["DET-BIN2"] = (self._binning[1], "Binning factor used on Y axis") image.header["XORGSUBF"] = (self._window[0], "Subframe origin on X axis") image.header["YORGSUBF"] = (self._window[1], "Subframe origin on Y axis") self.set_biassec_trimsec(image.header, *self._full_frame) log.info("Exposure finished.") return image async def _abort_exposure(self) -> None: self._exposing = False
[docs] async def set_window(self, left: int, top: int, width: int, height: int, **kwargs: Any) -> None: log.info("Set window to %dx%d at %d,%d.", width, height, top, left) self._window = (left, top, width, height) await self.comm.set_state(IWindow, WindowState(*self._window))
[docs] async def set_binning(self, x: int, y: int, **kwargs: Any) -> None: log.info("Set binning to %dx%d.", x, y) self._binning = (x, y) await self.comm.set_state(IBinning, BinningState(*self._binning))
[docs] async def set_cooling(self, enabled: bool, setpoint: float, **kwargs: Any) -> None: if enabled: log.info("Enabling cooling with a setpoint of %.2f°C.", setpoint) else: log.info("Disabling cooling.") self._cooling = CoolingStatus( enabled=enabled, set_point=setpoint, power=self._cooling.power, temperatures=self._cooling.temperatures, ) await self.comm.set_state( ICooling, CoolingState(setpoint=self._cooling.set_point, power=self._cooling.power, enabled=self._cooling.enabled), )
[docs] async def set_gain(self, gain: float, **kwargs: Any) -> None: log.info("Setting gain to %.2f...", gain) self._gain = gain await self.comm.set_state(IGain, GainState(gain=self._gain, offset=self._gain_offset))
[docs] async def set_offset(self, offset: float, **kwargs: Any) -> None: self._gain_offset = offset await self.comm.set_state(IGain, GainState(gain=self._gain, offset=self._gain_offset))
[docs] async def set_image_format(self, fmt: ImageFormat, **kwargs: Any) -> None: self._image_format = fmt await self.comm.set_state(IImageFormat, ImageFormatState(image_format=self._image_format))
async def _set_config_readout_time(self, readout_time: float) -> None: self._readout_time = readout_time async def _get_config_readout_time(self) -> float: return self._readout_time __all__ = ["DummyCamera"]