Source code for pyobs.modules.telescope.basetelescope

from __future__ import annotations

import asyncio
import logging
from abc import ABCMeta, abstractmethod
from typing import Any

import astropy.units as u
from astropy.coordinates import ICRS, AltAz, SkyCoord

from pyobs.events import MoveAltAzEvent, MoveRaDecEvent
from pyobs.interfaces import (
    IFitsHeaderBefore,
    IPointingAltAz,
    IPointingRaDec,
    ITelescope,
)
from pyobs.mixins import MotionStatusMixin, WaitForMotionMixin, WeatherAwareMixin
from pyobs.modules import Module, timeout
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class BaseTelescope(
    WeatherAwareMixin, MotionStatusMixin, WaitForMotionMixin, ITelescope, IFitsHeaderBefore, Module, metaclass=ABCMeta
):
    """Base class for telescopes."""

    __module__ = "pyobs.modules.telescope"

    def __init__(
        self,
        fits_headers: dict[str, Any] | None = None,
        min_altitude: float = 10,
        wait_for_dome: str | None = None,
        **kwargs: Any,
    ):
        """Initialize a new base telescope.

        Args:
            fits_headers: Additional FITS headers to send.
            min_altitude: Minimal altitude for telescope.
            wait_for_dome: Name of dome module to wait for.
        """
        Module.__init__(self, **kwargs)

        # store
        self._fits_headers = fits_headers if fits_headers is not None else {}
        self._min_altitude = min_altitude

        # some multi-threading stuff
        self._lock_moving = asyncio.Lock()
        self._abort_move = asyncio.Event()

        # celestial status
        self._celestial_headers: dict[str, Any] = {}

        # add thread func
        self.add_background_task(self._celestial, True)

        # init mixins
        WeatherAwareMixin.__init__(self, **kwargs)
        MotionStatusMixin.__init__(self, **kwargs)
        WaitForMotionMixin.__init__(
            self,
            wait_for_modules=None if wait_for_dome is None else [wait_for_dome],
            wait_for_timeout=60000,
            wait_for_states=[MotionStatus.POSITIONED, MotionStatus.TRACKING],
        )

        # register exception
        exc.register_exception(exc.MotionError, 3, timespan=600, callback=self._default_remote_error_callback)

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) # open mixins await WeatherAwareMixin.open(self) await MotionStatusMixin.open(self)
@abstractmethod async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None: """Actually starts tracking on given coordinates. Must be implemented by derived classes. Args: ra: RA in deg to track. dec: Dec in deg to track. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ ...
[docs] @timeout(1200) async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None: """Starts tracking on given coordinates. Args: ra: RA in deg to track. dec: Dec in deg to track. Raises: MoveError: If telescope cannot be moved. """ # no RA/Dec telescope? if not isinstance(self, IPointingRaDec): raise NotImplementedError # do nothing, if initializing, parking or parked if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]: return # check observer if self.observer is None: raise ValueError("No observer given.") # to alt/az ra_dec = SkyCoord(ra * u.deg, dec * u.deg, frame=ICRS) alt_az = self.observer.altaz(Time.now(), ra_dec) # check altitude if alt_az.alt.degree < self._min_altitude: raise ValueError("Destination altitude below limit.") # acquire lock async with LockWithAbort(self._lock_moving, self._abort_move): # log and event await self._change_motion_status(MotionStatus.SLEWING) log.info( "Moving telescope to RA=%s (%.5f°), Dec=%s (%.5f°)...", ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True), ra, ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True), dec, ) await self.comm.send_event(MoveRaDecEvent(ra=ra, dec=dec)) # track telescope await self._move_radec(ra, dec, abort_event=self._abort_move) log.info("Reached destination") # move dome, if exists await self._wait_for_motion(self._abort_move) # finish slewing await self._change_motion_status(MotionStatus.TRACKING) # update headers now asyncio.create_task(self._update_celestial_headers()) log.info("Finished moving telescope.")
@abstractmethod async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None: """Actually moves to given coordinates. Must be implemented by derived classes. Args: alt: Alt in deg to move to. az: Az in deg to move to. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ ...
[docs] @timeout(1200) async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None: """Moves to given coordinates. Args: alt: Alt in deg to move to. az: Az in deg to move to. Raises: MoveError: If telescope cannot be moved. """ # no Alt/Az telescope? if not isinstance(self, IPointingAltAz): raise NotImplementedError # do nothing, if initializing, parking or parked if await self.get_motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]: return # check altitude if alt < self._min_altitude: raise ValueError("Destination altitude below limit.") # acquire lock async with LockWithAbort(self._lock_moving, self._abort_move): # log and event log.info("Moving telescope to Alt=%.2f°, Az=%.2f°...", alt, az) await self.comm.send_event(MoveAltAzEvent(alt=alt, az=az)) await self._change_motion_status(MotionStatus.SLEWING) # move telescope await self._move_altaz(alt, az, abort_event=self._abort_move) log.info("Reached destination") # move dome, if exists await self._wait_for_motion(self._abort_move) # finish slewing await self._change_motion_status(MotionStatus.POSITIONED) # update headers now asyncio.create_task(self._update_celestial_headers()) log.info("Finished moving telescope.")
[docs] async def get_fits_header_before( self, namespaces: list[str] | None = None, **kwargs: Any ) -> dict[str, tuple[Any, str]]: """Returns FITS header for the current status of this module. Args: namespaces: If given, only return FITS headers for the given namespaces. Returns: Dictionary containing FITS headers. """ # define base header hdr: dict[str, Any | tuple[Any, str]] = {} # positions coords_ra_dec = None if isinstance(self, IPointingRaDec): try: ra, dec = await self.get_radec() coords_ra_dec = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame=ICRS) except Exception as e: log.warning("Could not fetch telescope RA/Dec: %s", e) coords_alt_az = None if isinstance(self, IPointingAltAz): try: alt, az = await self.get_altaz() coords_alt_az = SkyCoord(alt=alt * u.deg, az=az * u.deg, frame=AltAz) except Exception as e: log.warning("Could not fetch telescope Alt/Az: %s", e) # set coordinate headers if coords_ra_dec is not None: hdr["TEL-RA"] = (float(coords_ra_dec.ra.degree), "Right ascension of telescope [degrees]") hdr["TEL-DEC"] = (float(coords_ra_dec.dec.degree), "Declination of telescope [degrees]") if coords_alt_az is not None: hdr["TEL-ALT"] = (float(coords_alt_az.alt.degree), "Telescope altitude [degrees]") hdr["TEL-AZ"] = (float(coords_alt_az.az.degree), "Telescope azimuth [degrees]") hdr["TEL-ZD"] = (90.0 - hdr["TEL-ALT"][0], "Telescope zenith distance [degrees]") hdr["AIRMASS"] = (float(coords_alt_az.secz.value), "Airmass of observation start") # convert to sexagesimal if coords_ra_dec is not None: hdr["RA"] = (str(coords_ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True)), "Right ascension of object") hdr["DEC"] = (str(coords_ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True)), "Declination of object") # site location if self._observer is not None: hdr["LATITUDE"] = (float(self.observer.location.lat.degree), "Latitude of telescope [deg N]") hdr["LONGITUD"] = (float(self.observer.location.lon.degree), "Longitude of telescope [deg E]") hdr["HEIGHT"] = (float(self.observer.location.height.value), "Altitude of telescope [m]") # add static fits headers for key, value in self._fits_headers.items(): hdr[key] = tuple(value) # add celestial headers for key, value in self._celestial_headers.items(): hdr[key] = tuple(value) # finish return hdr
async def _celestial(self) -> None: """Thread for continuously calculating positions and distances to celestial objects like moon and sun.""" # wait a little await asyncio.sleep(10) # run until closing while True: # update headers try: await self._update_celestial_headers() except Exception: log.exception("Something went wrong.") # sleep a little await asyncio.sleep(30) async def _update_celestial_headers(self) -> None: """Calculate positions and distances to celestial objects like moon and sun.""" # get now now = Time.now() alt: float | None az: float | None # no observer? if self._observer is None: return # get telescope alt/az tel_altaz = None observer = self.observer if isinstance(self, IPointingAltAz): try: alt, az = await self.get_altaz() tel_altaz = SkyCoord( alt=alt * u.deg, az=az * u.deg, location=observer.location, obstime=now, frame="altaz" ) except Exception: log.exception("Could not fetch telescope Alt/Az: %s", self) return # get current moon and sun information moon_altaz = self.observer.moon_altaz(now) moon_frac = self.observer.moon_illumination(now) sun_altaz = self.observer.sun_altaz(now) # store it self._celestial_headers = { "MOONALT": (float(moon_altaz.alt.degree), "Lunar altitude"), "MOONFRAC": (float(moon_frac), "Fraction of the moon illuminated"), "SUNALT": (float(sun_altaz.alt.degree), "Solar altitude"), } # calculate distance to telescope if tel_altaz is not None: moon_dist = tel_altaz.separation(moon_altaz) if tel_altaz is not None else None sun_dist = tel_altaz.separation(sun_altaz) if tel_altaz is not None else None self._celestial_headers["MOONDIST"] = ( None if moon_dist is None else float(moon_dist.degree), "Lunar distance from target", ) self._celestial_headers["SUNDIST"] = ( None if sun_dist is None else float(sun_dist.degree), "Solar Distance from Target", ) def _calculate_derotator_position(self, ra: float, dec: float, alt: float, obstime: Time) -> float: target = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="gcrs") if self._observer is None: raise ValueError("No observer.") parallactic = self.observer.parallactic_angle(time=obstime, target=target).deg return float(parallactic - alt) __all__ = ["BaseTelescope"]