from __future__ import annotations
import asyncio
import logging
from typing import Any, TYPE_CHECKING
from astropy.coordinates import SkyCoord
import astropy.units as u
from pyobs.events import FilterChangedEvent, OffsetsRaDecEvent
from pyobs.interfaces import (
IFocuser,
IFitsHeaderBefore,
IFilters,
ITemperatures,
IOffsetsRaDec,
IPointingAltAz,
IPointingRaDec,
)
from pyobs.mixins.fitsnamespace import FitsNamespaceMixin
from pyobs.modules.telescope.basetelescope import BaseTelescope
from pyobs.modules import timeout
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time
if TYPE_CHECKING:
from pyobs.utils.simulation import SimWorld
log = logging.getLogger(__name__)
class DummyTelescope(
BaseTelescope,
IPointingRaDec,
IPointingAltAz,
IOffsetsRaDec,
IFocuser,
IFilters,
IFitsHeaderBefore,
ITemperatures,
FitsNamespaceMixin,
):
"""A dummy telescope for testing."""
__module__ = "pyobs.modules.telescope"
def __init__(self, world: SimWorld | None = None, wait_secs: float = 1.0, **kwargs: Any):
"""Creates a new dummy telescope.
Args:
world: Optional SimWorld object.
"""
BaseTelescope.__init__(self, **kwargs, motion_status_interfaces=["ITelescope", "IFocuser", "IFilters"])
FitsNamespaceMixin.__init__(self, **kwargs)
# init world and get telescope
from pyobs.utils.simulation import SimWorld
self._world = world if world is not None else self.add_child_object(SimWorld, None)
self._telescope = self._world.telescope
self._wait_secs = wait_secs
# automatically send status updates
self._telescope.status_callback = self._change_motion_status
# stuff
self._lock_focus = asyncio.Lock()
self._abort_focus = asyncio.Event()
[docs]
async def open(self) -> None:
"""Open module."""
await BaseTelescope.open(self)
# subscribe to events
if self._comm:
await self.comm.register_event(FilterChangedEvent)
await self.comm.register_event(OffsetsRaDecEvent)
# init status
await self._change_motion_status(MotionStatus.IDLE)
async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
"""Actually starts tracking on given coordinates. Must be implemented by derived classes.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
# start slewing
await self.__move(ra, dec, abort_event)
async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None:
"""Actually moves to given coordinates. Must be implemented by derived classes.
Args:
alt: Alt in deg to move to.
az: Az in deg to move to.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
# alt/az coordinates to ra/dec
coords = SkyCoord(
alt=alt * u.degree, az=az * u.degree, obstime=Time.now(), location=self._location, frame="altaz"
)
icrs = coords.icrs
# start slewing
await self.__move(icrs.ra.degree, icrs.dec.degree, abort_event)
async def __move(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
"""Simulate move.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
abort_event: Event that gets triggered when movement should be aborted.
"""
# simulate slew
self._telescope.move_ra_dec(SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="icrs"))
# wait for it
while self._telescope.status == MotionStatus.SLEWING and not abort_event.is_set():
await asyncio.sleep(self._wait_secs)
[docs]
async def get_focus(self, **kwargs: Any) -> float:
"""Return current focus.
Returns:
Current focus.
"""
return self._telescope.focus
[docs]
@timeout(60)
async def set_focus(self, focus: float, **kwargs: Any) -> None:
"""Sets new focus.
Args:
focus: New focus value.
Raises:
MoveError: If telescope cannot be moved.
"""
# check
if focus < 0 or focus > 100:
raise ValueError("Invalid focus value.")
# acquire lock
async with LockWithAbort(self._lock_focus, self._abort_focus):
log.info("Setting focus to %.2f..." % focus)
await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser")
ifoc = self._telescope.focus * 1.0
dfoc = (focus - ifoc) / 300.0
for i in range(300):
# abort?
if self._abort_focus.is_set():
raise InterruptedError("Setting focus was interrupted.")
# move focus and sleep a little
self._telescope.focus = ifoc + i * dfoc
await asyncio.sleep(0.01)
await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser")
self._telescope.focus = focus
[docs]
async def list_filters(self, **kwargs: Any) -> list[str]:
"""List available filters.
Returns:
List of available filters.
"""
return self._telescope.filters
[docs]
async def get_filter(self, **kwargs: Any) -> str:
"""Get currently set filter.
Returns:
Name of currently set filter.
"""
return self._telescope.filter_name
[docs]
async def set_filter(self, filter_name: str, **kwargs: Any) -> None:
"""Set the current filter.
Args:
filter_name: Name of filter to set.
Raises:
MoveError: If filter wheel cannot be moved.
"""
# valid filter?
if filter_name not in self._telescope.filters:
raise ValueError("Invalid filter name.")
# log and send event
if filter_name != self._telescope.filter_name:
# set it
logging.info("Setting filter to %s", filter_name)
await self._change_motion_status(MotionStatus.SLEWING, interface="IFilters")
await asyncio.sleep(3)
await self._change_motion_status(MotionStatus.POSITIONED, interface="IFilters")
self._telescope.filter_name = filter_name
# send event
await self.comm.send_event(FilterChangedEvent(filter_name))
logging.info("New filter set.")
[docs]
@timeout(60)
async def init(self, **kwargs: Any) -> None:
"""Initialize telescope.
Raises:
InitError: If device could not be initialized.
"""
# INIT, wait a little, then IDLE
log.info("Initializing telescope...")
await self._change_motion_status(MotionStatus.INITIALIZING)
await asyncio.sleep(5)
await self._change_motion_status(MotionStatus.IDLE)
log.info("Telescope initialized.")
[docs]
@timeout(60)
async def park(self, **kwargs: Any) -> None:
"""Park telescope.
Raises:
ParkError: If telescope could not be parked.
"""
# PARK, wait a little, then PARKED
log.info("Parking telescope...")
await self._change_motion_status(MotionStatus.PARKING)
await asyncio.sleep(5)
await self._change_motion_status(MotionStatus.PARKED)
log.info("Telescope parked.")
[docs]
async def set_offsets_radec(self, dra: float, ddec: float, **kwargs: Any) -> None:
"""Move an RA/Dec offset.
Args:
dra: RA offset in degrees.
ddec: Dec offset in degrees.
Raises:
MoveError: If telescope cannot be moved.
"""
log.info("Moving offset dra=%.5f, ddec=%.5f", dra, ddec)
await self.comm.send_event(OffsetsRaDecEvent(ra=dra, dec=ddec))
self._telescope.set_offsets(dra, ddec)
[docs]
async def get_offsets_radec(self, **kwargs: Any) -> tuple[float, float]:
"""Get RA/Dec offset.
Returns:
Tuple with RA and Dec offsets.
"""
return self._telescope.offsets
[docs]
async def get_radec(self, **kwargs: Any) -> tuple[float, float]:
"""Returns current RA and Dec.
Returns:
Tuple of current RA and Dec in degrees.
"""
return float(self._telescope.position.ra.degree), float(self._telescope.position.dec.degree)
[docs]
async def get_altaz(self, **kwargs: Any) -> tuple[float, float]:
"""Returns current Alt and Az.
Returns:
Tuple of current Alt and Az in degrees.
"""
if self._observer is not None:
alt_az = self.observer.altaz(Time.now(), self._telescope.position)
return float(alt_az.alt.degree), float(alt_az.az.degree)
else:
raise ValueError("No observer given.")
[docs]
async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None:
"""Stop the motion.
Args:
device: Name of device to stop, or None for all.
"""
pass
[docs]
async def get_focus_offset(self, **kwargs: Any) -> float:
"""Return current focus offset.
Returns:
Current focus offset.
"""
return 0
[docs]
async def get_temperatures(self, **kwargs: Any) -> dict[str, float]:
"""Returns all temperatures measured by this module.
Returns:
Dict containing temperatures.
"""
return {"M1": 10.0, "M2": 12.0}
[docs]
async def set_focus_offset(self, offset: float, **kwargs: Any) -> None:
log.error("Not implemented")
[docs]
async def is_ready(self, **kwargs: Any) -> bool:
log.error("Not implemented")
return True
__all__ = ["DummyTelescope"]