Source code for pyobs_brot.brottelescope

import asyncio
import logging
from typing import Any

import numpy as np
from pybrotlib import BROT  # type: ignore
from pybrotlib.components.telescope import MotionState, TelescopeStatus  # type: ignore
from pybrotlib.transport import MQTTTransport  # type: ignore
from pyobs.interfaces import (
    AltAzOffsetState,
    AltAzState,
    FocuserState,
    IFocuser,
    IOffsetsAltAz,
    IOffsetsRaDec,
    IPointingAltAz,
    IPointingRaDec,
    IPointingSeries,
    IReady,
    ITemperatures,
    RaDecOffsetState,
    RaDecState,
    ReadyState,
    SensorReading,
    TemperaturesState,
)
from pyobs.mixins import FitsNamespaceMixin
from pyobs.modules import timeout
from pyobs.modules.telescope.basetelescope import BaseTelescope
from pyobs.utils.enums import MotionStatus
from pyobs.utils.publisher import CsvPublisher
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


[docs] class BrotBaseTelescope( BaseTelescope, IFocuser, ITemperatures, IPointingRaDec, IPointingAltAz, IPointingSeries, FitsNamespaceMixin, ): def __init__( self, host: str, name: str, port: int = 1883, temperatures: dict[str, str] | None = None, keepalive: int = 60, roof: str = "None", dome: str = "None", **kwargs: Any, ): BaseTelescope.__init__( self, **kwargs, motion_status_interfaces=["ITelescope", "IFocuser"], wait_for_dome=None if dome == "None" else dome, ) self.mqtt = MQTTTransport(host, port) self.brot = BROT(self.mqtt, name) self.temperatures: dict[str, str] = {} if temperatures is None else temperatures self.focus_offset = 0.0 self._roof = roof FitsNamespaceMixin.__init__(self, **kwargs) self.add_background_task(self._update_task) @property def _position_radec(self) -> tuple[float, float] | None: try: ra = self.brot.telescope._telemetry.POSITION.EQUATORIAL.RA_ICRS * 15 dec = self.brot.telescope._telemetry.POSITION.EQUATORIAL.DEC_ICRS return ra, dec except Exception: return None
[docs] async def open(self) -> None: await BaseTelescope.open(self) asyncio.create_task(self.mqtt.run()) await asyncio.sleep(2) # publish initial states await self.comm.set_state(IReady, ReadyState(ready=False)) await self.comm.set_state( IFocuser, FocuserState(focus=float(self.brot.focus.position), focus_offset=self.focus_offset) ) await self.comm.set_state(IOffsetsRaDec, RaDecOffsetState(ra=0.0, dec=0.0)) await self.comm.set_state(IOffsetsAltAz, AltAzOffsetState(alt=0.0, az=0.0))
[docs] async def close(self) -> None: await BaseTelescope.close(self) await self.mqtt.close()
async def _update_task(self) -> None: await asyncio.sleep(2) while True: try: await self._update() await asyncio.sleep(0.1) except asyncio.CancelledError: break async def _update(self) -> None: match self.brot.telescope.status: case TelescopeStatus.PARKED: await self._change_motion_status(MotionStatus.PARKED) await self.comm.set_state(IReady, ReadyState(ready=False)) case TelescopeStatus.INITIALIZING: await self._change_motion_status(MotionStatus.INITIALIZING) await self.comm.set_state(IReady, ReadyState(ready=False)) case TelescopeStatus.PARKING: await self._change_motion_status(MotionStatus.PARKING) await self.comm.set_state(IReady, ReadyState(ready=False)) case TelescopeStatus.ONLINE: if self.brot.telescope.motion_state == MotionState.SLEWING: await self._change_motion_status(MotionStatus.SLEWING) elif self.brot.telescope.motion_state == MotionState.TRACKING: await self._change_motion_status(MotionStatus.TRACKING) else: await self._change_motion_status(MotionStatus.POSITIONED) await self.comm.set_state(IReady, ReadyState(ready=True)) case TelescopeStatus.ERROR: await self._error_state() await self.comm.set_state(IReady, ReadyState(ready=False)) # publish pointing state ra = self.brot.telescope._telemetry.POSITION.EQUATORIAL.RA_ICRS * 15 dec = self.brot.telescope._telemetry.POSITION.EQUATORIAL.DEC_ICRS await self.comm.set_state(IPointingRaDec, RaDecState(ra=ra, dec=dec)) alt = self.brot.telescope._telemetry.POSITION.HORIZONTAL.ALT az = self.brot.telescope._telemetry.POSITION.HORIZONTAL.AZ await self.comm.set_state(IPointingAltAz, AltAzState(alt=alt, az=az)) # publish temperatures readings = [] for name, loc in self.temperatures.items(): for sensor in self.mqtt.telemetry.AUXILIARY.SENSOR.values(): if sensor.NAME == loc: readings.append(SensorReading(name=name, value=sensor.VALUE)) if readings: await self.comm.set_state(ITemperatures, TemperaturesState(readings=readings)) async def _error_state(self, mess: str = "Telescope is in error state.") -> None: log.error(mess) await self._change_motion_status(MotionStatus.ERROR) async def _wait_for_tracking(self) -> None: await asyncio.sleep(2) while True: match self.brot.telescope._telemetry.TELESCOPE.MOTION_STATE: case 0.0, 1.0: pass case 8.0: return case -1.0: await self._error_state("The telescope experienced an error during track command.") return case _: pass await asyncio.sleep(1) @timeout(5 * 60) async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -> None: await self.brot.telescope.track(ra, dec) await asyncio.sleep(10) await self._wait_for_tracking() @timeout(120) async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None: await self.brot.telescope.move(alt, az) await asyncio.sleep(5) while True: match self.brot.telescope._telemetry.TELESCOPE.MOTION_STATE: case 0.0, 1.0: break case 1.0, 8.0: pass case _: await self._change_motion_status(MotionStatus.ERROR) return await asyncio.sleep(1)
[docs] async def set_focus(self, focus: float, **kwargs: Any) -> None: await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser") await self.brot.focus.set(focus + self.focus_offset) await self._wait_for_focus() await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser") await self.comm.set_state(IFocuser, FocuserState(focus=focus, focus_offset=self.focus_offset))
[docs] async def set_focus_offset(self, offset: float, **kwargs: Any) -> None: await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser") focus = self.brot.focus.position await self.brot.focus.set(focus + offset) await self._wait_for_focus() self.focus_offset = offset await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser") await self.comm.set_state( IFocuser, FocuserState(focus=float(self.brot.focus.position - self.focus_offset), focus_offset=offset) )
async def _wait_for_focus(self) -> None: await asyncio.sleep(2.0) MAX_TARGET_DISTANCE = 0.01 while self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.FOCUS.TARGETDISTANCE > MAX_TARGET_DISTANCE: await asyncio.sleep(0.1)
[docs] @timeout(120) async def init(self, **kwargs: Any) -> None: match self.brot.telescope.status: case TelescopeStatus.PARKED: pass case TelescopeStatus.ONLINE: log.info("Telescope is already online.") return case TelescopeStatus.ERROR: await self._error_state("Telescope can not be initialized, it has errors.") return log.info("Initializing telescope...") await self.brot.telescope.power_on() while True: match self.brot.telescope._telemetry.TELESCOPE.READY_STATE: case 1.0: log.info("Telescope powered up and initialized.") return case -1.0: await self._error_state("Error during powerup of telescope.") return case 0.0: pass await asyncio.sleep(1)
[docs] @timeout(180) async def park(self, **kwargs: Any) -> None: match self.brot.telescope.status: case TelescopeStatus.PARKED: log.info("Telescope is already parked.") return case TelescopeStatus.ONLINE: pass case TelescopeStatus.ERROR: await self._error_state("Telescope can not be parked, it has errors.") return log.info("Parking telescope...") await self.brot.telescope.park() while True: match self.brot.telescope._telemetry.TELESCOPE.READY_STATE: case 0.0: log.info("Parked telescope.") return case -1.0: await self._error_state("Error during parking of the telescope.") return case _: pass await asyncio.sleep(1)
[docs] @timeout(20) async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None: await self.brot.telescope.stop() while True: match self.brot.telescope._telemetry.TELESCOPE.MOTION_STATE: case 0.0: log.info("Stopped telescope.") return case 1.0 | 8.0: pass case _: await self._error_state("Error during stopping of the telescope.") return await asyncio.sleep(1)
async def start_pointing_series(self, **kwargs: Any) -> None: pass async def stop_pointing_series(self, **kwargs: Any) -> None: pass
[docs] class BrotRaDecTelescope(BrotBaseTelescope, IOffsetsRaDec): def __init__( self, pointing_file: str = "/pyobs/pointing.csv", **kwargs: Any, ): BrotBaseTelescope.__init__(self, **kwargs) self._pointing_log = None if pointing_file is None else CsvPublisher(pointing_file)
[docs] async def set_offsets_radec(self, dra: float, ddec: float, **kwargs: Any) -> None: await self.brot.telescope.set_offset_ha(-1.0 * dra * 3600) await self.brot.telescope.set_offset_dec(ddec * 3600) MAX_TARGET_DISTANCE = 2.0 / 3600.0 while ( self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.HA.TARGETDISTANCE < MAX_TARGET_DISTANCE and self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.DEC.TARGETDISTANCE < MAX_TARGET_DISTANCE ): await asyncio.sleep(0.1) await self.comm.set_state(IOffsetsRaDec, RaDecOffsetState(ra=dra, dec=ddec))
[docs] async def get_fits_header_before( self, namespaces: list[str] | None = None, **kwargs: Any ) -> dict[str, tuple[Any, str]]: hdr = await BrotBaseTelescope.get_fits_header_before(self) hdr["TEL-FOCU"] = (self.brot.focus.position, "Focus position [mm]") hdr["HAOFF"] = (self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.HA.OFFSET, "Hour Angle offset") hdr["DECOFF"] = (self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.DEC.OFFSET, "Declination offset") return self._filter_fits_namespace(hdr, namespaces=namespaces, **kwargs)
[docs] async def add_pointing_measurement(self, **kwargs: Any) -> None: telemetry = self.brot.telescope._telemetry data = { "Time": Time.now().isot, "Ha": telemetry.OBJECT.EQUATORIAL.HA_APPARENT, "Dec": telemetry.OBJECT.EQUATORIAL.DEC_APPARENT, "HaOff": telemetry.POSITION.INSTRUMENTAL.HA.OFFSET / np.cos(np.radians(telemetry.OBJECT.EQUATORIAL.DEC_APPARENT)) + telemetry.POINTING.OFFSETS.HA, "DecOff": telemetry.POSITION.INSTRUMENTAL.DEC.OFFSET + telemetry.POINTING.OFFSETS.DEC, } if self._pointing_log is not None: await self._pointing_log(**data) log.info("Pointing measurement written.")
class BrotAltAzTelescope(BrotBaseTelescope, IOffsetsAltAz, IPointingSeries): def __init__( self, pointing_file: str = "/pyobs/pointing.csv", **kwargs: Any, ): BrotBaseTelescope.__init__(self, **kwargs) self._pointing_log = None if pointing_file is None else CsvPublisher(pointing_file) @timeout(120) async def set_offsets_altaz(self, dalt: float, daz: float, **kwargs: Any) -> None: await self.brot.telescope.set_offset_alt(dalt * 3600) await self.brot.telescope.set_offset_az(daz * 3600) await asyncio.sleep(1.0) MAX_TARGET_DISTANCE = 2.0 / 3600.0 while ( self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.ALT.TARGETDISTANCE > MAX_TARGET_DISTANCE or self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.AZ.TARGETDISTANCE > MAX_TARGET_DISTANCE ): await asyncio.sleep(0.1) await self.comm.set_state(IOffsetsAltAz, AltAzOffsetState(alt=dalt, az=daz)) async def get_fits_header_before( self, namespaces: list[str] | None = None, **kwargs: Any ) -> dict[str, tuple[Any, str]]: hdr = await BrotBaseTelescope.get_fits_header_before(self) hdr["TEL-FOCU"] = (self.brot.focus.position, "Focus position [mm]") hdr["HAOFF"] = (self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.HA.OFFSET, "Hour Angle offset") hdr["DECOFF"] = (self.brot.telescope._telemetry.POSITION.INSTRUMENTAL.DEC.OFFSET, "Declination offset") return self._filter_fits_namespace(hdr, namespaces=namespaces, **kwargs) async def add_pointing_measurement(self, **kwargs: Any) -> None: telemetry = self.brot.telescope._telemetry data = { "Time": Time.now().isot, "Az": telemetry.OBJECT.HORIZONTAL.AZ, "Alt": telemetry.OBJECT.HORIZONTAL.ALT, "AzOff": telemetry.POSITION.INSTRUMENTAL.AZ.OFFSET / np.cos(np.radians(telemetry.POSITION.HORIZONTAL.ALT)) + telemetry.POINTING.OFFSETS.AZ, "AltOff": telemetry.POSITION.INSTRUMENTAL.ALT.OFFSET + telemetry.POINTING.OFFSETS.ALT, } if self._pointing_log is not None: await self._pointing_log(**data) log.info("Pointing measurement written.") __all__ = ["BrotRaDecTelescope", "BrotAltAzTelescope"]