Source code for pyobs.modules.telescope.dummytelescope

from __future__ import annotations
import asyncio
import logging
from typing import Any, TYPE_CHECKING
from astropy.coordinates import SkyCoord
import astropy.units as u

from pyobs.events import FilterChangedEvent, OffsetsRaDecEvent
from pyobs.interfaces import (
    IFocuser,
    IFitsHeaderBefore,
    IFilters,
    ITemperatures,
    IOffsetsRaDec,
    IPointingAltAz,
    IPointingRaDec,
)
from pyobs.mixins.fitsnamespace import FitsNamespaceMixin
from pyobs.modules.telescope.basetelescope import BaseTelescope
from pyobs.modules import timeout
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time

if TYPE_CHECKING:
    from pyobs.utils.simulation import SimWorld


log = logging.getLogger(__name__)


class DummyTelescope(
    BaseTelescope,
    IPointingRaDec,
    IPointingAltAz,
    IOffsetsRaDec,
    IFocuser,
    IFilters,
    IFitsHeaderBefore,
    ITemperatures,
    FitsNamespaceMixin,
):
    """A dummy telescope for testing."""

    __module__ = "pyobs.modules.telescope"

    def __init__(self, world: SimWorld | None = None, wait_secs: float = 1.0, **kwargs: Any):
        """Creates a new dummy telescope.

        Args:
            world: Optional SimWorld object.
        """
        BaseTelescope.__init__(self, **kwargs, motion_status_interfaces=["ITelescope", "IFocuser", "IFilters"])
        FitsNamespaceMixin.__init__(self, **kwargs)

        # init world and get telescope
        from pyobs.utils.simulation import SimWorld

        self._world = world if world is not None else self.add_child_object(SimWorld, None)
        self._telescope = self._world.telescope
        self._wait_secs = wait_secs

        # automatically send status updates
        self._telescope.status_callback = self._change_motion_status

        # stuff
        self._lock_focus = asyncio.Lock()
        self._abort_focus = asyncio.Event()

[docs] async def open(self) -> None: """Open module.""" await BaseTelescope.open(self) # subscribe to events if self._comm: await self.comm.register_event(FilterChangedEvent) await self.comm.register_event(OffsetsRaDecEvent) # init status await self._change_motion_status(MotionStatus.IDLE)
async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None: """Actually starts tracking on given coordinates. Must be implemented by derived classes. Args: ra: RA in deg to track. dec: Dec in deg to track. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ # start slewing await self.__move(ra, dec, abort_event) async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None: """Actually moves to given coordinates. Must be implemented by derived classes. Args: alt: Alt in deg to move to. az: Az in deg to move to. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ # alt/az coordinates to ra/dec coords = SkyCoord( alt=alt * u.degree, az=az * u.degree, obstime=Time.now(), location=self._location, frame="altaz" ) icrs = coords.icrs # start slewing await self.__move(icrs.ra.degree, icrs.dec.degree, abort_event) async def __move(self, ra: float, dec: float, abort_event: asyncio.Event) -> None: """Simulate move. Args: ra: RA in deg to track. dec: Dec in deg to track. abort_event: Event that gets triggered when movement should be aborted. """ # simulate slew self._telescope.move_ra_dec(SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="icrs")) # wait for it while self._telescope.status == MotionStatus.SLEWING and not abort_event.is_set(): await asyncio.sleep(self._wait_secs)
[docs] async def get_focus(self, **kwargs: Any) -> float: """Return current focus. Returns: Current focus. """ return self._telescope.focus
[docs] @timeout(60) async def set_focus(self, focus: float, **kwargs: Any) -> None: """Sets new focus. Args: focus: New focus value. Raises: MoveError: If telescope cannot be moved. """ # check if focus < 0 or focus > 100: raise ValueError("Invalid focus value.") # acquire lock async with LockWithAbort(self._lock_focus, self._abort_focus): log.info("Setting focus to %.2f..." % focus) await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser") ifoc = self._telescope.focus * 1.0 dfoc = (focus - ifoc) / 300.0 for i in range(300): # abort? if self._abort_focus.is_set(): raise InterruptedError("Setting focus was interrupted.") # move focus and sleep a little self._telescope.focus = ifoc + i * dfoc await asyncio.sleep(0.01) await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser") self._telescope.focus = focus
[docs] async def list_filters(self, **kwargs: Any) -> list[str]: """List available filters. Returns: List of available filters. """ return self._telescope.filters
[docs] async def get_filter(self, **kwargs: Any) -> str: """Get currently set filter. Returns: Name of currently set filter. """ return self._telescope.filter_name
[docs] async def set_filter(self, filter_name: str, **kwargs: Any) -> None: """Set the current filter. Args: filter_name: Name of filter to set. Raises: MoveError: If filter wheel cannot be moved. """ # valid filter? if filter_name not in self._telescope.filters: raise ValueError("Invalid filter name.") # log and send event if filter_name != self._telescope.filter_name: # set it logging.info("Setting filter to %s", filter_name) await self._change_motion_status(MotionStatus.SLEWING, interface="IFilters") await asyncio.sleep(3) await self._change_motion_status(MotionStatus.POSITIONED, interface="IFilters") self._telescope.filter_name = filter_name # send event await self.comm.send_event(FilterChangedEvent(filter_name)) logging.info("New filter set.")
[docs] @timeout(60) async def init(self, **kwargs: Any) -> None: """Initialize telescope. Raises: InitError: If device could not be initialized. """ # INIT, wait a little, then IDLE log.info("Initializing telescope...") await self._change_motion_status(MotionStatus.INITIALIZING) await asyncio.sleep(5) await self._change_motion_status(MotionStatus.IDLE) log.info("Telescope initialized.")
[docs] @timeout(60) async def park(self, **kwargs: Any) -> None: """Park telescope. Raises: ParkError: If telescope could not be parked. """ # PARK, wait a little, then PARKED log.info("Parking telescope...") await self._change_motion_status(MotionStatus.PARKING) await asyncio.sleep(5) await self._change_motion_status(MotionStatus.PARKED) log.info("Telescope parked.")
[docs] async def set_offsets_radec(self, dra: float, ddec: float, **kwargs: Any) -> None: """Move an RA/Dec offset. Args: dra: RA offset in degrees. ddec: Dec offset in degrees. Raises: MoveError: If telescope cannot be moved. """ log.info("Moving offset dra=%.5f, ddec=%.5f", dra, ddec) await self.comm.send_event(OffsetsRaDecEvent(ra=dra, dec=ddec)) self._telescope.set_offsets(dra, ddec)
[docs] async def get_offsets_radec(self, **kwargs: Any) -> tuple[float, float]: """Get RA/Dec offset. Returns: Tuple with RA and Dec offsets. """ return self._telescope.offsets
[docs] async def get_radec(self, **kwargs: Any) -> tuple[float, float]: """Returns current RA and Dec. Returns: Tuple of current RA and Dec in degrees. """ return float(self._telescope.position.ra.degree), float(self._telescope.position.dec.degree)
[docs] async def get_altaz(self, **kwargs: Any) -> tuple[float, float]: """Returns current Alt and Az. Returns: Tuple of current Alt and Az in degrees. """ if self._observer is not None: alt_az = self.observer.altaz(Time.now(), self._telescope.position) return float(alt_az.alt.degree), float(alt_az.az.degree) else: raise ValueError("No observer given.")
[docs] async def get_fits_header_before( self, namespaces: list[str] | None = None, **kwargs: Any ) -> dict[str, tuple[Any, str]]: """Returns FITS header for the current status of this module. Args: namespaces: If given, only return FITS headers for the given namespaces. Returns: Dictionary containing FITS headers. """ # fetch from BaseTelescope hdr = await BaseTelescope.get_fits_header_before(self) # focus hdr["TEL-FOCU"] = (self._telescope.focus, "Focus position [mm]") # finished return self._filter_fits_namespace(hdr, namespaces=namespaces, **kwargs)
[docs] async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None: """Stop the motion. Args: device: Name of device to stop, or None for all. """ pass
[docs] async def get_focus_offset(self, **kwargs: Any) -> float: """Return current focus offset. Returns: Current focus offset. """ return 0
[docs] async def get_temperatures(self, **kwargs: Any) -> dict[str, float]: """Returns all temperatures measured by this module. Returns: Dict containing temperatures. """ return {"M1": 10.0, "M2": 12.0}
[docs] async def set_focus_offset(self, offset: float, **kwargs: Any) -> None: log.error("Not implemented")
[docs] async def is_ready(self, **kwargs: Any) -> bool: log.error("Not implemented") return True
__all__ = ["DummyTelescope"]