Source code for pyobs_brot.brotdome

import asyncio
import logging
from typing import Any

from pybrotlib import BROT
from pybrotlib.components.dome import DomeShutterStatus, DomeStatus
from pybrotlib.transport import MQTTTransport
from pyobs.events import RoofClosingEvent, RoofOpenedEvent
from pyobs.interfaces import AltAzState, IDome, IPointingAltAz
from pyobs.modules import timeout
from pyobs.modules.roof.basedome import BaseDome
from pyobs.utils.enums import MotionStatus

log = logging.getLogger(__name__)


[docs] class BrotDome(BaseDome, IDome): def __init__( self, host: str, name: str, port: int = 1883, keepalive: int = 60, **kwargs: Any, ): BaseDome.__init__(self, **kwargs) self.mqtt = MQTTTransport(host, port) self.brot = BROT(self.mqtt, name) self.add_background_task(self._update_status)
[docs] async def open(self) -> None: await BaseDome.open(self) asyncio.create_task(self.mqtt.run()) await asyncio.sleep(2) await self.comm.register_event(RoofOpenedEvent) await self.comm.register_event(RoofClosingEvent) await self.comm.set_state(IPointingAltAz, AltAzState(alt=0.0, az=self.brot.dome.azimuth)) if self.brot.dome.status == DomeStatus.ERROR: await self._error_state() elif self.brot.dome.in_motion: await self._change_motion_status(MotionStatus.SLEWING) log.info("Dome is already in motion. Please make sure it is not used by another instance!") elif self.brot.dome.status == DomeStatus.PARKED and self.brot.dome.shutter == DomeShutterStatus.CLOSED: await self._change_motion_status(MotionStatus.PARKED) log.info("Dome is closed and parked.") else: await self._change_motion_status(MotionStatus.POSITIONED) log.info("Dome is already online. Please make sure it is not used by another instance!")
async def _update_status(self) -> None: while True: try: current_state = self.motion_status() new_state = current_state if current_state == MotionStatus.INITIALIZING: pass elif current_state == MotionStatus.PARKING: pass elif self.brot.dome.status == DomeStatus.ERROR: new_state = MotionStatus.ERROR elif self.brot.dome.status == DomeStatus.PARKED and self.brot.dome.shutter == DomeShutterStatus.CLOSED: new_state = MotionStatus.PARKED elif self.brot.dome.in_motion: new_state = MotionStatus.SLEWING else: new_state = MotionStatus.POSITIONED if new_state != current_state: await self._change_motion_status(new_state) await self.comm.set_state(IPointingAltAz, AltAzState(alt=0.0, az=self.brot.dome.azimuth)) except asyncio.CancelledError: return except Exception: pass await asyncio.sleep(1)
[docs] @timeout(300) async def init(self, **kwargs: Any) -> None: log.info("Opening roof") if self.brot.dome.shutter == DomeShutterStatus.OPEN: return elif self.brot.dome.status == DomeStatus.ERROR: await self._error_state("Dome is in error state. Cannot open.") return await self._change_motion_status(MotionStatus.INITIALIZING) await self.brot.dome.open() while True: match self.brot.dome.shutter: case DomeShutterStatus.OPEN: log.info("Dome is open.") break case _: pass await asyncio.sleep(1) await self.brot.dome.start_tracking() while True: match self.brot.dome.status: case DomeStatus.TRACKING: log.info("Dome is tracking the telescope azimuth.") break case DomeStatus.ERROR: await self._error_state() return case _: pass await asyncio.sleep(1) await self._change_motion_status(MotionStatus.POSITIONED) await self.comm.send_event(RoofOpenedEvent())
[docs] @timeout(300) async def park(self, **kwargs: Any) -> None: if self.brot.dome.status == DomeStatus.PARKED and self.brot.dome.shutter == DomeShutterStatus.CLOSED: return elif self.brot.dome.status == DomeStatus.ERROR: await self._error_state("Dome is in error state. Cannot close/park.") return await self._change_motion_status(MotionStatus.PARKING) await self.comm.send_event(RoofClosingEvent()) await self.brot.dome.stop_tracking() while True: match self.brot.dome.status: case DomeStatus.TRACKING: pass case DomeStatus.ERROR: await self._error_state() return case _: break await asyncio.sleep(1) await self.brot.dome.close() while True: match self.brot.dome.shutter: case DomeShutterStatus.CLOSED: log.info("Dome shutter is closed.") break case _: pass await asyncio.sleep(1) await self.brot.dome.park() while True: match self.brot.dome.status: case DomeStatus.PARKED: log.info("Dome is parked.") break case DomeStatus.ERROR: await self._error_state() return case _: pass await asyncio.sleep(1) await self._change_motion_status(MotionStatus.PARKED)
[docs] async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None: pass # no stopping of the roof possible
[docs] async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None: pass # dome tracks telescope automatically
async def _error_state(self, mess: str = "Dome is in error state.") -> None: log.error(mess) await self._change_motion_status(MotionStatus.ERROR)
__all__ = ["BrotDome"]