Source code for pyobs.modules.telescope.basetelescope

from __future__ import annotations

import asyncio
import logging
from abc import ABCMeta, abstractmethod
from typing import Any

import astropy.units as u
from astropy.coordinates import ICRS, SkyCoord

from pyobs.events import MoveAltAzEvent, MoveRaDecEvent
from pyobs.interfaces import (
    FitsHeaderEntry,
    IFitsHeaderBefore,
    IPointingAltAz,
    IPointingRaDec,
    ITelescope,
)
from pyobs.mixins import MotionStatusMixin, WaitForMotionMixin, WeatherAwareMixin
from pyobs.modules import Module, timeout
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class BaseTelescope(
    WeatherAwareMixin, MotionStatusMixin, WaitForMotionMixin, ITelescope, IFitsHeaderBefore, Module, metaclass=ABCMeta
):
    """Base class for telescopes."""

    __module__ = "pyobs.modules.telescope"

    def __init__(
        self,
        fits_headers: dict[str, Any] | None = None,
        min_altitude: float = 10,
        wait_for_dome: str | None = None,
        **kwargs: Any,
    ):
        """Initialize a new base telescope.

        Args:
            fits_headers: Additional FITS headers to send.
            min_altitude: Minimal altitude for telescope.
            wait_for_dome: Name of dome module to wait for.
        """
        Module.__init__(self, **kwargs)

        # store
        self._fits_headers = fits_headers if fits_headers is not None else {}
        self._min_altitude = min_altitude

        # some multi-threading stuff
        self._lock_moving = asyncio.Lock()
        self._abort_move = asyncio.Event()

        # celestial status
        self._celestial_headers: dict[str, Any] = {}

        # add thread func
        self.add_background_task(self._celestial, True)

        # init mixins
        WeatherAwareMixin.__init__(self, **kwargs)
        MotionStatusMixin.__init__(self, **kwargs)
        WaitForMotionMixin.__init__(
            self,
            wait_for_modules=None if wait_for_dome is None else [wait_for_dome],
            wait_for_timeout=60000,
            wait_for_states=[MotionStatus.POSITIONED, MotionStatus.TRACKING],
        )

        # register exception
        exc.register_exception(exc.MotionError, 3, timespan=600, callback=self._default_remote_error_callback)

    @property
    def _position_radec(self) -> tuple[float, float] | None:
        """Current RA/Dec position in degrees, or None if unknown. Override in subclasses."""
        return None

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) # open mixins await WeatherAwareMixin.open(self) await MotionStatusMixin.open(self)
@abstractmethod async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None: """Actually starts tracking on given coordinates. Must be implemented by derived classes. Args: ra: RA in deg to track. dec: Dec in deg to track. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ ...
[docs] @timeout(1200) async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None: """Starts tracking on given coordinates. Args: ra: RA in deg to track. dec: Dec in deg to track. Raises: MoveError: If telescope cannot be moved. """ # no RA/Dec telescope? if not isinstance(self, IPointingRaDec): raise NotImplementedError # do nothing, if initializing, parking or parked if self.motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]: return # check observer if self.observer is None: raise ValueError("No observer given.") # to alt/az ra_dec = SkyCoord(ra * u.deg, dec * u.deg, frame=ICRS) alt_az = self.observer.altaz(Time.now(), ra_dec) # check altitude if alt_az.alt.degree < self._min_altitude: raise ValueError("Destination altitude below limit.") # acquire lock async with LockWithAbort(self._lock_moving, self._abort_move): # log and event await self._change_motion_status(MotionStatus.SLEWING) log.info( "Moving telescope to RA=%s (%.5f°), Dec=%s (%.5f°)...", ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True), ra, ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True), dec, ) await self.comm.send_event(MoveRaDecEvent(ra=ra, dec=dec)) # track telescope await self._move_radec(ra, dec, abort_event=self._abort_move) log.info("Reached destination") # move dome, if exists await self._wait_for_motion(self._abort_move) # finish slewing await self._change_motion_status(MotionStatus.TRACKING) # update headers now asyncio.create_task(self._update_celestial_headers()) log.info("Finished moving telescope.")
@abstractmethod async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None: """Actually moves to given coordinates. Must be implemented by derived classes. Args: alt: Alt in deg to move to. az: Az in deg to move to. abort_event: Event that gets triggered when movement should be aborted. Raises: MoveError: If telescope cannot be moved. """ ...
[docs] @timeout(1200) async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None: """Moves to given coordinates. Args: alt: Alt in deg to move to. az: Az in deg to move to. Raises: MoveError: If telescope cannot be moved. """ # no Alt/Az telescope? if not isinstance(self, IPointingAltAz): raise NotImplementedError # do nothing, if initializing, parking or parked if self.motion_status() in [MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.PARKED]: return # check altitude if alt < self._min_altitude: raise ValueError("Destination altitude below limit.") # acquire lock async with LockWithAbort(self._lock_moving, self._abort_move): # log and event log.info("Moving telescope to Alt=%.2f°, Az=%.2f°...", alt, az) await self.comm.send_event(MoveAltAzEvent(alt=alt, az=az)) await self._change_motion_status(MotionStatus.SLEWING) # move telescope await self._move_altaz(alt, az, abort_event=self._abort_move) log.info("Reached destination") # move dome, if exists await self._wait_for_motion(self._abort_move) # finish slewing await self._change_motion_status(MotionStatus.POSITIONED) # update headers now asyncio.create_task(self._update_celestial_headers()) log.info("Finished moving telescope.")
[docs] async def get_fits_header_before( self, namespaces: list[str] | None = None, **kwargs: Any ) -> dict[str, FitsHeaderEntry]: """Returns FITS header for the current status of this module. Args: namespaces: If given, only return FITS headers for the given namespaces. Returns: Dictionary containing FITS headers. """ # define base header hdr: dict[str, FitsHeaderEntry] = {} # positions coords_ra_dec = None if isinstance(self, IPointingRaDec) and self._position_radec is not None: ra, dec = self._position_radec coords_ra_dec = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame=ICRS) coords_alt_az = None if isinstance(self, IPointingAltAz) and coords_ra_dec is not None and self._observer is not None: coords_alt_az = self.observer.altaz(Time.now(), coords_ra_dec) # set coordinate headers if coords_ra_dec is not None: hdr["TEL-RA"] = FitsHeaderEntry(float(coords_ra_dec.ra.degree), "Right ascension of telescope [degrees]") hdr["TEL-DEC"] = FitsHeaderEntry(float(coords_ra_dec.dec.degree), "Declination of telescope [degrees]") if coords_alt_az is not None: hdr["TEL-ALT"] = FitsHeaderEntry(float(coords_alt_az.alt.degree), "Telescope altitude [degrees]") hdr["TEL-AZ"] = FitsHeaderEntry(float(coords_alt_az.az.degree), "Telescope azimuth [degrees]") hdr["TEL-ZD"] = FitsHeaderEntry(90.0 - hdr["TEL-ALT"].value, "Telescope zenith distance [degrees]") hdr["AIRMASS"] = FitsHeaderEntry(float(coords_alt_az.secz.value), "Airmass of observation start") # convert to sexagesimal if coords_ra_dec is not None: hdr["RA"] = FitsHeaderEntry( str(coords_ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True)), "Right ascension of object" ) hdr["DEC"] = FitsHeaderEntry( str(coords_ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True)), "Declination of object" ) # site location if self._observer is not None: hdr["LATITUDE"] = FitsHeaderEntry(float(self.observer.location.lat.degree), "Latitude of telescope [deg N]") hdr["LONGITUD"] = FitsHeaderEntry( float(self.observer.location.lon.degree), "Longitude of telescope [deg E]" ) hdr["HEIGHT"] = FitsHeaderEntry(float(self.observer.location.height.value), "Altitude of telescope [m]") # add static fits headers for key, value in self._fits_headers.items(): hdr[key] = FitsHeaderEntry(value[0], value[1]) # add celestial headers for key, value in self._celestial_headers.items(): hdr[key] = FitsHeaderEntry(value[0], value[1]) # finish return hdr
async def _celestial(self) -> None: """Thread for continuously calculating positions and distances to celestial objects like moon and sun.""" # wait a little await asyncio.sleep(10) # run until closing while True: # update headers try: await self._update_celestial_headers() except Exception: log.exception("Something went wrong.") # sleep a little await asyncio.sleep(30) async def _update_celestial_headers(self) -> None: """Calculate positions and distances to celestial objects like moon and sun.""" # get now now = Time.now() # no observer? if self._observer is None: return # get telescope alt/az tel_altaz = None if isinstance(self, IPointingAltAz) and self._position_radec is not None: radec = SkyCoord(ra=self._position_radec[0] * u.deg, dec=self._position_radec[1] * u.deg, frame=ICRS) tel_altaz = self.observer.altaz(now, radec) # get current moon and sun information moon_altaz = self.observer.moon_altaz(now) moon_frac = self.observer.moon_illumination(now) sun_altaz = self.observer.sun_altaz(now) # store it self._celestial_headers = { "MOONALT": (float(moon_altaz.alt.degree), "Lunar altitude"), "MOONFRAC": (float(moon_frac), "Fraction of the moon illuminated"), "SUNALT": (float(sun_altaz.alt.degree), "Solar altitude"), } # calculate distance to telescope if tel_altaz is not None: moon_dist = tel_altaz.separation(moon_altaz) if tel_altaz is not None else None sun_dist = tel_altaz.separation(sun_altaz) if tel_altaz is not None else None self._celestial_headers["MOONDIST"] = ( None if moon_dist is None else float(moon_dist.degree), "Lunar distance from target", ) self._celestial_headers["SUNDIST"] = ( None if sun_dist is None else float(sun_dist.degree), "Solar Distance from Target", ) def _calculate_derotator_position(self, ra: float, dec: float, alt: float, obstime: Time) -> float: target = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="gcrs") if self._observer is None: raise ValueError("No observer.") parallactic = self.observer.parallactic_angle(time=obstime, target=target).deg return float(parallactic - alt) __all__ = ["BaseTelescope"]