Source code for pyobs.modules.robotic.pointing

import logging
from typing import Any
from astropy.coordinates import SkyCoord

from pyobs.interfaces import IAcquisition, IPointingRaDec, IPointingSeries
from pyobs.modules import Module
from pyobs.utils import exceptions as exc
from pyobs.interfaces import IAutonomous
from pyobs.utils.grids.filters import GridFilter
from pyobs.utils.grids.grid import Grid
from pyobs.utils.grids.gridnode import GridNode
from pyobs.utils.grids.pipeline import GridPipeline

log = logging.getLogger(__name__)


class PointingSeries(Module, IAutonomous):
    """Module for running pointing series."""

    __module__ = "pyobs.modules.robotic"

    def __init__(
        self,
        grid: list[Grid | GridFilter | dict[str, Any]],
        finish: int = 90,
        acquisition: str | None = None,
        telescope: str = "telescope",
        **kwargs: Any,
    ):
        """Initialize a new pointing series.

        Args:
            grid: Grid to use for pointing series.
            finish: When this number in percent of points have been finished, terminate mastermind.
            acquisition: IAcquisition unit to use.
            telescope: ITelescope unit to use.
        """
        Module.__init__(self, **kwargs)

        # store
        self._grid = grid
        self._finish = 1.0 - finish / 100.0
        self._acquisition = acquisition
        self._telescope = telescope

        # add thread func
        self.add_background_task(self._run_thread, False)

[docs] async def start(self, **kwargs: Any) -> None: """Starts a service.""" pass
[docs] async def stop(self, **kwargs: Any) -> None: """Stops a service.""" pass
[docs] async def is_running(self, **kwargs: Any) -> bool: """Whether a service is running.""" return True
async def _run_thread(self) -> None: """Run a pointing series.""" # check observer if self._observer is None: raise ValueError("No observer given.") try: await self._run_pointing_series() except: log.exception("Error running series.") raise async def _run_pointing_series(self) -> None: # get grid and get count grid = self.get_object(GridPipeline, GridNode, steps=self._grid) count = len([coord for coord in grid]) log.info(f"Found {count} grid points.") # iterate over all grid points grid = self.get_object(GridPipeline, GridNode, steps=self._grid) finished = 0 for coord in grid: if not isinstance(coord, SkyCoord): raise ValueError("Coordinate given is not a SkyCoord.") # log finding grid.log_last() # perform acquisition on given coordinates if not await self._process_point(coord): grid.append_last() continue # got it finished += 1 log.info(f"Finished {finished} of {count} grid points.") # finished log.info("Pointing series finished.") async def _process_point(self, coord: SkyCoord) -> bool: # get acquisition and telescope units acquisition = await self.proxy(self._acquisition, IAcquisition) telescope = await self.proxy(self._telescope, IPointingRaDec) # acquire target and process result try: # move telescope await telescope.move_radec(float(coord.ra.degree), float(coord.dec.degree)) # acquire target acq = await acquisition.acquire_target() # process result await self._process_acquisition(**acq) # if telescope implements IPointingSeries, let it know if isinstance(telescope, IPointingSeries): await telescope.add_pointing_measurement() return True except (ValueError, exc.RemoteError): log.info("Could not acquire position.") return False async def _process_acquisition( self, datetime: str, ra: float, dec: float, alt: float, az: float, off_ra: float | None = None, off_dec: float | None = None, off_alt: float | None = None, off_az: float | None = None, ) -> None: """Process the result of the acquisition. Either ra_off/dec_off or alt_off/az_off must be given. Args: datetime: Date and time of observation. ra: Right ascension without offsets at destination. dec: Declination without offsets at destination. alt: Altitude without offsets at destination. az: Azimuth without offsets at destination. off_ra: Found RA offset. off_dec: Found Dec offset. off_alt: Found Alt offset. off_az: Found Az offset. """ pass __all__ = ["PointingSeries"]