Source code for pyobs.modules.pointing.acquisition

from __future__ import annotations

import asyncio
import logging
from typing import Any

import astropy.units as u
import numpy as np

import pyobs.utils.exceptions as exc
from pyobs.images.meta import OnSkyDistance
from pyobs.images.meta.exptime import ExpTime
from pyobs.interfaces import (
    AltAzOffsetState,
    AltAzState,
    IAcquisition,
    IOffsetsAltAz,
    IOffsetsRaDec,
    IPointingAltAz,
    IPointingRaDec,
    RaDecOffsetState,
    RaDecState,
)
from pyobs.mixins import CameraSettingsMixin
from pyobs.modules import Module, raises, timeout
from pyobs.utils.enums import ImageType
from pyobs.utils.publisher import CsvPublisher
from pyobs.utils.time import Time

from ...interfaces import (
    ICamera,
    IData,
    IExposureTime,
    IImageType,
    ITelescope,
)
from ._base import BasePointing

log = logging.getLogger(__name__)


class Acquisition(BasePointing, CameraSettingsMixin, IAcquisition):
    """Class for telescope acquisition."""

    __module__ = "pyobs.modules.pointing"

    def __init__(
        self,
        exposure_time: float,
        target_pixel: tuple[float, float] | None = None,
        attempts: int = 5,
        tolerance: float = 1,
        max_offset: float = 120,
        log_file: str | None = None,
        oneshot: bool = False,
        broadcast: bool = False,
        **kwargs: Any,
    ):
        """Create a new acquisition.

        Args:
            exposure_time: Default exposure time.
            target_pixel: (x, y) tuple of pixel that the star should be positioned on. If None, center of image is used.
            attempts: Number of attempts before giving up.
            tolerance: Tolerance in position to reach in arcsec.
            max_offset: Maximum offset to move in arcsec.
            log_file: Name of file to write log to.
            oneshot: For a oneshot the number of attempts is automatically set to 1 and the method finishes whether
                     successful or not.
            broadcast: Whether to broadcast acquisition images.
        """
        BasePointing.__init__(self, **kwargs)

        # store
        self._default_exposure_time = exposure_time
        self._is_running = False
        self._target_pixel = target_pixel
        self._attempts = attempts
        self._tolerance = tolerance * u.arcsec
        self._max_offset = max_offset * u.arcsec
        self._abort_event = asyncio.Event()
        self._oneshot = oneshot
        self._broadcast = broadcast

        # init log file
        self._publisher = CsvPublisher(log_file) if log_file is not None else None

        # init camera settings mixin
        CameraSettingsMixin.__init__(self, **kwargs)

[docs] async def open(self) -> None: """Open module""" await Module.open(self) # check telescope and camera if not await self.has_proxy(self._telescope, ITelescope): log.warning("Telescope does not exist or is not of correct type at the moment.") if not await self.has_proxy(self._camera, ICamera): log.warning("Camera does not exist or is not of correct type at the moment.")
[docs] async def is_running(self, **kwargs: Any) -> bool: """Whether a service is running.""" return self._is_running
[docs] @raises(exc.AbortedError, exc.AcquisitionError) @timeout(120) async def acquire_target(self, **kwargs: Any) -> dict[str, Any]: """Acquire target at given coordinates. If no RA/Dec are given, start from current position. Might not work for some implementations that require coordinates. Returns: A dictionary with entries for datetime, ra, dec, alt, az, and either off_ra, off_dec or off_alt, off_az. Raises: ValueError: If target could not be acquired. """ try: self._is_running = True self._abort_event = asyncio.Event() return await self._acquire(self._default_exposure_time) finally: self._is_running = False
async def _acquire(self, exposure_time: float) -> dict[str, Any]: """Actually acquire target.""" # do camera settings async with self.proxy(self._camera, ICamera) as camera: await self._do_camera_settings(camera) async with self.proxy(self._camera, IImageType) as camera: await camera.set_image_type(ImageType.ACQUISITION) # try given number of attempts for a in range(self._attempts): # abort? if self._abort_event.is_set(): raise exc.AbortedError() # set exposure time and image type and take image async with self.safe_proxy(self._camera, IExposureTime) as camera: if camera: log.info("Exposing image for %.1f seconds...", exposure_time) await camera.set_exposure_time(exposure_time) else: log.info("Exposing image...") async with self.safe_proxy(self._camera, IData) as camera: if camera: filename = await camera.grab_data(broadcast=self._broadcast) else: raise exc.GeneralError("Cannot grab data from camera.") # download image log.info("Downloading image...") if filename is None: log.warning("Did not receive an image.") continue image = await self.vfs.read_image(filename) # get offset log.info("Analysing image...") try: image = await self.run_pipeline(image) except Exception as e: log.warning("Error in pipeline: %s. Skipping image.", e) continue # calculate distance from offset if not image.has_meta(OnSkyDistance): log.warning("No on-sky distance found in meta.") continue # raise exc.ImageError("No on sky distance found in meta.") osd = image.get_meta(OnSkyDistance) if osd is None or np.isnan(osd.distance): log.warning("On-sky distance found in meta is None or NaN.") continue log.info("Found a distance to target of %.2f arcsec.", osd.distance.arcsec) # get distance if osd.distance < self._tolerance: # we're finished! log.info("Target successfully acquired.") return await self._create_log_and_return() # abort? if osd.distance > self._max_offset: # move a maximum of 120"=2' raise exc.ImageError("Calculated offsets too large.") # apply offsets async with self.proxy(self._telescope, ITelescope) as telescope: if await self._apply(image, telescope, self._location): log.info("Finished image.") else: log.warning("Could not apply offsets.") if self._oneshot: # we're finished! log.info("Finishing acquisition after oneshot.") return await self._create_log_and_return() # new exposure time? if image.has_meta(ExpTime): exposure_time = image.get_meta(ExpTime).exptime # could not acquire target raise exc.AcquisitionError("Could not acquire target within given tolerance.") async def _create_log_and_return(self) -> dict[str, Any]: # get current Alt/Az async with self.proxy(self._telescope, IPointingAltAz) as telescope: altaz: AltAzState | None = telescope.get_state(IPointingAltAz) cur_alt, cur_az = (altaz.alt, altaz.az) if altaz is not None else (0.0, 0.0) async with self.proxy(self._telescope, IPointingRaDec) as telescope: radec: RaDecState | None = telescope.get_state(IPointingRaDec) cur_ra, cur_dec = (radec.ra, radec.dec) if radec is not None else (0.0, 0.0) # prepare log entry log_entry = {"datetime": Time.now().isot, "ra": cur_ra, "dec": cur_dec, "alt": cur_alt, "az": cur_az} # Alt/Az or RA/Dec? async with self.safe_proxy(self._telescope, IOffsetsRaDec) as telescope: if telescope: s: RaDecOffsetState | None = telescope.get_state(IOffsetsRaDec) if s is not None: log_entry["off_ra"], log_entry["off_dec"] = s.ra, s.dec async with self.safe_proxy(self._telescope, IOffsetsAltAz) as telescope: if telescope: s2: AltAzOffsetState | None = telescope.get_state(IOffsetsAltAz) if s2 is not None: log_entry["off_alt"], log_entry["off_az"] = s2.alt, s2.az # write log if self._publisher is not None: await self._publisher(**log_entry) # finished return log_entry
[docs] async def abort(self, **kwargs: Any) -> None: """Abort current actions.""" self._abort_event.set()
__all__ = ["Acquisition"]