Source code for pyobs.modules.camera.dummycamera

import asyncio
import glob
import logging
from datetime import datetime
from typing import Tuple, NamedTuple, Dict, Any, Optional, TYPE_CHECKING

from pyobs.interfaces import IWindow, IBinning, ICooling, IGain
from pyobs.modules.camera.basecamera import BaseCamera
from pyobs.images import Image
from pyobs.utils.enums import ExposureStatus
from pyobs.utils import exceptions as exc

if TYPE_CHECKING:
    from pyobs.utils.simulation import SimWorld


log = logging.getLogger(__name__)


class CoolingStatus(NamedTuple):
    enabled: bool = True
    set_point: float = -10.0
    power: float = 80.0
    temperatures: Dict[str, float] = {"CCD": 0.0, "Back": 3.14}


class DummyCamera(BaseCamera, IWindow, IBinning, ICooling, IGain):
    """A dummy camera for testing."""

    __module__ = "pyobs.modules.camera"

    def __init__(
        self,
        readout_time: float = 2,
        sim: Optional[Dict[str, Any]] = None,
        world: Optional["SimWorld"] = None,
        **kwargs: Any,
    ):
        """Creates a new dummy cammera.

        Args:
            readout_time: Readout time in seconds.
            sim: Dictionary with config for image simulator.
        """
        BaseCamera.__init__(self, **kwargs)

        # add thread func
        self.add_background_task(self._cooling_thread, True)

        # store
        self._readout_time = readout_time
        self._sim = sim if sim is not None else {}
        if "images" not in self._sim:
            self._sim["images"] = None

        # simulated world
        from pyobs.utils.simulation import SimCamera, SimWorld

        self._world = self.get_object(world, SimWorld) if world is not None else self.add_child_object(SimWorld)
        self._camera: SimCamera = self._world.camera

        # init camera
        self._cooling = CoolingStatus()
        self._exposing = True
        self._gain = 10.0

        # simulator
        self._sim_images = sorted(glob.glob(self._sim["images"])) if self._sim["images"] else None

    async def _cooling_thread(self) -> None:
        while True:
            # adjust temperature
            temps = self._cooling.temperatures
            temps["CCD"] -= (self._cooling.temperatures["CCD"] - self._cooling.set_point) * 0.05

            # cooling power
            power = (60.0 - self._cooling.temperatures["CCD"]) / 70.0 * 100.0

            # create new object
            self._cooling = CoolingStatus(
                enabled=self._cooling.enabled, set_point=self._cooling.set_point, power=power, temperatures=temps
            )

            # sleep for 1 second
            await asyncio.sleep(1)

[docs] async def get_full_frame(self, **kwargs: Any) -> Tuple[int, int, int, int]: """Returns full size of CCD. Returns: Tuple with left, top, width, and height set. """ return self._camera.full_frame
def _get_image(self, exp_time: float, open_shutter: bool) -> Image: """Actually get (i.e. simulate) the image.""" # random image or pre-defined? if self._sim_images: filename = self._sim_images.pop(0) self._sim_images.append(filename) return Image.from_file(filename) else: image = self._camera.get_image(exp_time, open_shutter) return image async def _expose(self, exposure_time: float, open_shutter: bool, abort_event: asyncio.Event) -> Image: """Actually do the exposure, should be implemented by derived classes. Args: exposure_time: The requested exposure time in seconds. open_shutter: Whether or not to open the shutter. abort_event: Event that gets triggered when exposure should be aborted. Returns: The actual image. Raises: GrabImageError: If exposure was not successful. """ # start exposure log.info("Starting exposure with {0:s} shutter...".format("open" if open_shutter else "closed")) date_obs = datetime.utcnow() self._exposing = True # request image loop = asyncio.get_running_loop() hdu_future = loop.run_in_executor(None, self._get_image, exposure_time, open_shutter) # wait a little steps = 10 for i in range(steps): if abort_event.is_set() or not self._exposing: self._exposing = False await self._change_exposure_status(ExposureStatus.IDLE) raise InterruptedError("Exposure was aborted.") await asyncio.sleep(exposure_time / steps) self._exposing = False # readout await self._change_exposure_status(ExposureStatus.READOUT) await asyncio.sleep(self._readout_time) # get image image = await hdu_future # add headers image.header["EXPTIME"] = exposure_time image.header["DATE-OBS"] = date_obs.strftime("%Y-%m-%dT%H:%M:%S.%f") image.header["XBINNING"] = image.header["DET-BIN1"] = (self._camera.binning[0], "Binning factor used on X axis") image.header["YBINNING"] = image.header["DET-BIN2"] = (self._camera.binning[1], "Binning factor used on Y axis") image.header["XORGSUBF"] = (self._camera.window[0], "Subframe origin on X axis") image.header["YORGSUBF"] = (self._camera.window[1], "Subframe origin on Y axis") # biassec/trimsec self.set_biassec_trimsec(image.header, *self._camera.full_frame) # finished log.info("Exposure finished.") return image async def _abort_exposure(self) -> None: """Abort the running exposure. Should be implemented by derived class. Returns: Success or not. """ self._exposing = False
[docs] async def get_window(self, **kwargs: Any) -> Tuple[int, int, int, int]: """Returns the camera window. Returns: Tuple with left, top, width, and height set. """ return self._camera.window
[docs] async def set_window(self, left: int, top: int, width: int, height: int, **kwargs: Any) -> None: """Set the camera window. Args: left: X offset of window. top: Y offset of window. width: Width of window. height: Height of window. Raises: ValueError: If binning could not be set. """ log.info("Set window to %dx%d at %d,%d.", width, height, top, left) self._camera.window = (left, top, width, height)
[docs] async def get_binning(self, **kwargs: Any) -> Tuple[int, int]: """Returns the camera binning. Returns: Tuple with x and y. """ return self._camera.binning
[docs] async def set_binning(self, x: int, y: int, **kwargs: Any) -> None: """Set the camera binning. Args: x: X binning. y: Y binning. Raises: ValueError: If binning could not be set. """ log.info("Set binning to %dx%d.", x, y) self._camera.binning = (x, y)
[docs] async def set_cooling(self, enabled: bool, setpoint: float, **kwargs: Any) -> None: """Enables/disables cooling and sets setpoint. Args: enabled: Enable or disable cooling. setpoint: Setpoint in celsius for the cooling. Raises: ValueError: If cooling could not be set. """ # log if enabled: log.info("Enabling cooling with a setpoint of %.2f°C.", setpoint) else: log.info("Disabling cooling.") # set self._cooling = CoolingStatus( enabled=enabled, set_point=setpoint, power=self._cooling.power, temperatures=self._cooling.temperatures )
[docs] async def get_cooling(self, **kwargs: Any) -> Tuple[bool, float, float]: """Returns the current status for the cooling. Returns: (tuple): Tuple containing: Enabled: Whether the cooling is enabled SetPoint: Setpoint for the cooling in celsius. Power: Current cooling power in percent or None. """ return self._cooling.enabled, self._cooling.set_point, self._cooling.power
[docs] async def get_temperatures(self, **kwargs: Any) -> Dict[str, float]: """Returns all temperatures measured by this module. Returns: Dict containing temperatures. """ return self._cooling.temperatures
async def _set_config_readout_time(self, readout_time: float) -> None: """Set readout time.""" self._readout_time = readout_time async def _get_config_readout_time(self) -> float: """Returns readout time.""" return self._readout_time
[docs] async def set_gain(self, gain: float, **kwargs: Any) -> None: """Set the camera gain. Args: gain: New camera gain. Raises: ValueError: If gain could not be set. """ log.info("Setting gain to %.2f...", gain) self._gain = gain
[docs] async def get_gain(self, **kwargs: Any) -> float: """Returns the camera binning. Returns: Current gain. """ return self._gain
__all__ = ["DummyCamera"]