from __future__ import annotations
import asyncio
import logging
from abc import ABCMeta, abstractmethod
from typing import Any
import astropy.units as u
from astropy.coordinates import ICRS, AltAz, SkyCoord
from pyobs.events import MoveAltAzEvent, MoveRaDecEvent
from pyobs.interfaces import (
IFitsHeaderBefore,
IPointingAltAz,
IPointingRaDec,
ITelescope,
)
from pyobs.mixins import MotionStatusMixin, WaitForMotionMixin, WeatherAwareMixin
from pyobs.modules import Module, timeout
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time
log = logging.getLogger(__name__)
class BaseTelescope(
WeatherAwareMixin, MotionStatusMixin, WaitForMotionMixin, ITelescope, IFitsHeaderBefore, Module, metaclass=ABCMeta
):
"""Base class for telescopes."""
__module__ = "pyobs.modules.telescope"
def __init__(
self,
fits_headers: dict[str, Any] | None = None,
min_altitude: float = 10,
wait_for_dome: str | None = None,
**kwargs: Any,
):
"""Initialize a new base telescope.
Args:
fits_headers: Additional FITS headers to send.
min_altitude: Minimal altitude for telescope.
wait_for_dome: Name of dome module to wait for.
"""
Module.__init__(self, **kwargs)
# store
self._fits_headers = fits_headers if fits_headers is not None else {}
self._min_altitude = min_altitude
# some multi-threading stuff
self._lock_moving = asyncio.Lock()
self._abort_move = asyncio.Event()
# celestial status
self._celestial_headers: dict[str, Any] = {}
# add thread func
self.add_background_task(self._celestial, True)
# init mixins
WeatherAwareMixin.__init__(self, **kwargs)
MotionStatusMixin.__init__(self, **kwargs)
WaitForMotionMixin.__init__(
self,
wait_for_modules=None if wait_for_dome is None else [wait_for_dome],
wait_for_timeout=60000,
wait_for_states=[MotionStatus.POSITIONED, MotionStatus.TRACKING],
)
# register exception
exc.register_exception(exc.MotionError, 3, timespan=600, callback=self._default_remote_error_callback)
[docs]
async def open(self) -> None:
"""Open module."""
await Module.open(self)
# open mixins
await WeatherAwareMixin.open(self)
await MotionStatusMixin.open(self)
@abstractmethod
async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
"""Actually starts tracking on given coordinates. Must be implemented by derived classes.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
...
[docs]
@timeout(1200)
async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None:
"""Starts tracking on given coordinates.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
Raises:
MoveError: If telescope cannot be moved.
"""
# no RA/Dec telescope?
if not isinstance(self, IPointingRaDec):
raise NotImplementedError
# do nothing, if initializing, parking or parked
if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]:
return
# check observer
if self.observer is None:
raise ValueError("No observer given.")
# to alt/az
ra_dec = SkyCoord(ra * u.deg, dec * u.deg, frame=ICRS)
alt_az = self.observer.altaz(Time.now(), ra_dec)
# check altitude
if alt_az.alt.degree < self._min_altitude:
raise ValueError("Destination altitude below limit.")
# acquire lock
async with LockWithAbort(self._lock_moving, self._abort_move):
# log and event
await self._change_motion_status(MotionStatus.SLEWING)
log.info(
"Moving telescope to RA=%s (%.5f°), Dec=%s (%.5f°)...",
ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True),
ra,
ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True),
dec,
)
await self.comm.send_event(MoveRaDecEvent(ra=ra, dec=dec))
# track telescope
await self._move_radec(ra, dec, abort_event=self._abort_move)
log.info("Reached destination")
# move dome, if exists
await self._wait_for_motion(self._abort_move)
# finish slewing
await self._change_motion_status(MotionStatus.TRACKING)
# update headers now
asyncio.create_task(self._update_celestial_headers())
log.info("Finished moving telescope.")
@abstractmethod
async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None:
"""Actually moves to given coordinates. Must be implemented by derived classes.
Args:
alt: Alt in deg to move to.
az: Az in deg to move to.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
...
[docs]
@timeout(1200)
async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None:
"""Moves to given coordinates.
Args:
alt: Alt in deg to move to.
az: Az in deg to move to.
Raises:
MoveError: If telescope cannot be moved.
"""
# no Alt/Az telescope?
if not isinstance(self, IPointingAltAz):
raise NotImplementedError
# do nothing, if initializing, parking or parked
if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]:
return
# check altitude
if alt < self._min_altitude:
raise ValueError("Destination altitude below limit.")
# acquire lock
async with LockWithAbort(self._lock_moving, self._abort_move):
# log and event
log.info("Moving telescope to Alt=%.2f°, Az=%.2f°...", alt, az)
await self.comm.send_event(MoveAltAzEvent(alt=alt, az=az))
await self._change_motion_status(MotionStatus.SLEWING)
# move telescope
await self._move_altaz(alt, az, abort_event=self._abort_move)
log.info("Reached destination")
# move dome, if exists
await self._wait_for_motion(self._abort_move)
# finish slewing
await self._change_motion_status(MotionStatus.POSITIONED)
# update headers now
asyncio.create_task(self._update_celestial_headers())
log.info("Finished moving telescope.")
async def _celestial(self) -> None:
"""Thread for continuously calculating positions and distances to celestial objects like moon and sun."""
# wait a little
await asyncio.sleep(10)
# run until closing
while True:
# update headers
try:
await self._update_celestial_headers()
except Exception:
log.exception("Something went wrong.")
# sleep a little
await asyncio.sleep(30)
async def _update_celestial_headers(self) -> None:
"""Calculate positions and distances to celestial objects like moon and sun."""
# get now
now = Time.now()
alt: float | None
az: float | None
# no observer?
if self._observer is None:
return
# get telescope alt/az
tel_altaz = None
observer = self.observer
if isinstance(self, IPointingAltAz):
try:
alt, az = await self.get_altaz()
tel_altaz = SkyCoord(
alt=alt * u.deg, az=az * u.deg, location=observer.location, obstime=now, frame="altaz"
)
except Exception:
log.exception("Could not fetch telescope Alt/Az: %s", self)
return
# get current moon and sun information
moon_altaz = self.observer.moon_altaz(now)
moon_frac = self.observer.moon_illumination(now)
sun_altaz = self.observer.sun_altaz(now)
# store it
self._celestial_headers = {
"MOONALT": (float(moon_altaz.alt.degree), "Lunar altitude"),
"MOONFRAC": (float(moon_frac), "Fraction of the moon illuminated"),
"SUNALT": (float(sun_altaz.alt.degree), "Solar altitude"),
}
# calculate distance to telescope
if tel_altaz is not None:
moon_dist = tel_altaz.separation(moon_altaz) if tel_altaz is not None else None
sun_dist = tel_altaz.separation(sun_altaz) if tel_altaz is not None else None
self._celestial_headers["MOONDIST"] = (
None if moon_dist is None else float(moon_dist.degree),
"Lunar distance from target",
)
self._celestial_headers["SUNDIST"] = (
None if sun_dist is None else float(sun_dist.degree),
"Solar Distance from Target",
)
def _calculate_derotator_position(self, ra: float, dec: float, alt: float, obstime: Time) -> float:
target = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="gcrs")
if self._observer is None:
raise ValueError("No observer.")
parallactic = self.observer.parallactic_angle(time=obstime, target=target).deg
return float(parallactic - alt)
__all__ = ["BaseTelescope"]