import asyncio
from abc import ABCMeta, abstractmethod
from typing import Dict, Any, Tuple, Union, List, Optional
from astropy.coordinates import SkyCoord, ICRS, AltAz
import astropy.units as u
import logging
from pyobs.events import MoveRaDecEvent, MoveAltAzEvent
from pyobs.interfaces import ITelescope, IFitsHeaderBefore
from pyobs.modules import Module
from pyobs.mixins import MotionStatusMixin, WeatherAwareMixin, WaitForMotionMixin
from pyobs.modules import timeout
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time
from pyobs.utils import exceptions as exc
log = logging.getLogger(__name__)
class BaseTelescope(
WeatherAwareMixin, MotionStatusMixin, WaitForMotionMixin, ITelescope, IFitsHeaderBefore, Module, metaclass=ABCMeta
):
"""Base class for telescopes."""
__module__ = "pyobs.modules.telescope"
def __init__(
self,
fits_headers: Optional[Dict[str, Any]] = None,
min_altitude: float = 10,
wait_for_dome: Optional[str] = None,
**kwargs: Any,
):
"""Initialize a new base telescope.
Args:
fits_headers: Additional FITS headers to send.
min_altitude: Minimal altitude for telescope.
wait_for_dome: Name of dome module to wait for.
"""
Module.__init__(self, **kwargs)
# store
self._fits_headers = fits_headers if fits_headers is not None else {}
self._min_altitude = min_altitude
# some multi-threading stuff
self._lock_moving = asyncio.Lock()
self._abort_move = asyncio.Event()
# celestial status
self._celestial_headers: Dict[str, Any] = {}
# add thread func
self.add_background_task(self._celestial, True)
# init mixins
WeatherAwareMixin.__init__(self, **kwargs)
MotionStatusMixin.__init__(self, **kwargs)
WaitForMotionMixin.__init__(
self,
wait_for_modules=None if wait_for_dome is None else [wait_for_dome],
wait_for_timeout=60000,
wait_for_states=[MotionStatus.POSITIONED, MotionStatus.TRACKING],
)
# register exception
exc.register_exception(exc.MotionError, 3, timespan=600, callback=self._default_remote_error_callback)
[docs] async def open(self) -> None:
"""Open module."""
await Module.open(self)
# open mixins
await WeatherAwareMixin.open(self)
await MotionStatusMixin.open(self)
@abstractmethod
async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
"""Actually starts tracking on given coordinates. Must be implemented by derived classes.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
...
[docs] @timeout(1200)
async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None:
"""Starts tracking on given coordinates.
Args:
ra: RA in deg to track.
dec: Dec in deg to track.
Raises:
MoveError: If telescope cannot be moved.
"""
# do nothing, if initializing, parking or parked
if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]:
return
# check observer
if self.observer is None:
raise ValueError("No observer given.")
# to alt/az
ra_dec = SkyCoord(ra * u.deg, dec * u.deg, frame=ICRS)
alt_az = self.observer.altaz(Time.now(), ra_dec)
# check altitude
if alt_az.alt.degree < self._min_altitude:
raise ValueError("Destination altitude below limit.")
# acquire lock
async with LockWithAbort(self._lock_moving, self._abort_move):
# log and event
await self._change_motion_status(MotionStatus.SLEWING)
log.info(
"Moving telescope to RA=%s (%.5f°), Dec=%s (%.5f°)...",
ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True),
ra,
ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True),
dec,
)
await self.comm.send_event(MoveRaDecEvent(ra=ra, dec=dec))
# track telescope
await self._move_radec(ra, dec, abort_event=self._abort_move)
log.info("Reached destination")
# move dome, if exists
await self._wait_for_motion(self._abort_move)
# finish slewing
await self._change_motion_status(MotionStatus.TRACKING)
# update headers now
asyncio.create_task(self._update_celestial_headers())
log.info("Finished moving telescope.")
@abstractmethod
async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None:
"""Actually moves to given coordinates. Must be implemented by derived classes.
Args:
alt: Alt in deg to move to.
az: Az in deg to move to.
abort_event: Event that gets triggered when movement should be aborted.
Raises:
MoveError: If telescope cannot be moved.
"""
...
[docs] @timeout(1200)
async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None:
"""Moves to given coordinates.
Args:
alt: Alt in deg to move to.
az: Az in deg to move to.
Raises:
MoveError: If telescope cannot be moved.
"""
# do nothing, if initializing, parking or parked
if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]:
return
# check altitude
if alt < self._min_altitude:
raise ValueError("Destination altitude below limit.")
# acquire lock
async with LockWithAbort(self._lock_moving, self._abort_move):
# log and event
log.info("Moving telescope to Alt=%.2f°, Az=%.2f°...", alt, az)
await self.comm.send_event(MoveAltAzEvent(alt=alt, az=az))
await self._change_motion_status(MotionStatus.SLEWING)
# move telescope
await self._move_altaz(alt, az, abort_event=self._abort_move)
log.info("Reached destination")
# move dome, if exists
await self._wait_for_motion(self._abort_move)
# finish slewing
await self._change_motion_status(MotionStatus.POSITIONED)
# update headers now
asyncio.create_task(self._update_celestial_headers())
log.info("Finished moving telescope.")
async def _celestial(self) -> None:
"""Thread for continuously calculating positions and distances to celestial objects like moon and sun."""
# wait a little
await asyncio.sleep(10)
# run until closing
while True:
# update headers
await self._update_celestial_headers()
# sleep a little
await asyncio.sleep(30)
async def _update_celestial_headers(self) -> None:
"""Calculate positions and distances to celestial objects like moon and sun."""
# get now
now = Time.now()
alt: Optional[float]
az: Optional[float]
# no observer?
if self.observer is None:
return
# get telescope alt/az
try:
alt, az = await self.get_altaz()
tel_altaz = SkyCoord(alt=alt * u.deg, az=az * u.deg, frame="altaz")
except:
alt, az, tel_altaz = None, None, None
# get current moon and sun information
moon_altaz = self.observer.moon_altaz(now)
moon_frac = self.observer.moon_illumination(now)
sun_altaz = self.observer.sun_altaz(now)
# calculate distance to telescope
moon_dist = tel_altaz.separation(moon_altaz) if tel_altaz is not None else None
sun_dist = tel_altaz.separation(sun_altaz) if tel_altaz is not None else None
# store it
self._celestial_headers = {
"MOONALT": (float(moon_altaz.alt.degree), "Lunar altitude"),
"MOONFRAC": (float(moon_frac), "Fraction of the moon illuminated"),
"MOONDIST": (None if moon_dist is None else float(moon_dist.degree), "Lunar distance from target"),
"SUNALT": (float(sun_altaz.alt.degree), "Solar altitude"),
"SUNDIST": (None if sun_dist is None else float(sun_dist.degree), "Solar Distance from Target"),
}
__all__ = ["BaseTelescope"]