Source code for pyobs_alpaca.dome

import asyncio
import logging
import time
from typing import Any

from pyobs.events import RoofClosingEvent, RoofOpenedEvent
from pyobs.interfaces import AltAzState, IPointingAltAz, IReady, ReadyState
from pyobs.mixins import FollowMixin
from pyobs.modules import timeout
from pyobs.modules.roof import BaseDome
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import MotionStatus
from pyobs.utils.parallel import event_wait
from pyobs.utils.threads import LockWithAbort

from .device import AlpacaDevice

log = logging.getLogger("pyobs")


class AlpacaDome(FollowMixin, BaseDome):
    __module__ = "pyobs_alpaca"

    def __init__(self, tolerance: float = 3, park_az: float = 180, follow: str | None = None, **kwargs: Any):
        """Initializes a new ASCOM Alpaca dome.

        Args:
            tolerance: Tolerance for azimuth.
            park_az: Azimuth for park position.
            follow: Name of other device (e.g. telescope) to follow.
        """
        BaseDome.__init__(self, **kwargs, motion_status_interfaces=["IDome"])

        # device
        self._device = self.add_child_object(AlpacaDevice, **kwargs)

        # store
        self._tolerance = tolerance
        self._park_az = park_az

        # move locks
        self._lock_shutter = asyncio.Lock()
        self._abort_shutter = asyncio.Event()
        self._lock_move = asyncio.Lock()
        self._abort_move = asyncio.Event()

        # status
        self._shutter = None
        self._altitude = 0.0
        self._azimuth = 0.0
        self._set_az = 0.0

        # start thread
        self.add_background_task(self._update_status)

        # mixins
        FollowMixin.__init__(
            self, device=follow, interval=10, tolerance=tolerance, mode=IPointingAltAz, only_follow_when_ready=False
        )

[docs] async def open(self) -> None: """Open module.""" await BaseDome.open(self) await self._change_motion_status(MotionStatus.IDLE) await self.comm.set_state(IReady, ReadyState(ready=self._device.connected))
async def _send_open_dome(self) -> None: """Send command to open dome.""" try: await self._device.put("OpenShutter") except ConnectionError: await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.InitError("Could not open dome.")
[docs] @timeout(1200000) async def init(self, **kwargs: Any) -> None: """Open dome.""" if not self.is_weather_good(): raise exc.InitError("Weather seems to be bad.") if self.motion_status() == MotionStatus.INITIALIZING: return async with LockWithAbort(self._lock_shutter, self._abort_shutter): log.info("Opening dome...") await self._change_motion_status(MotionStatus.INITIALIZING) await self._send_open_dome() time_attempt = time.time() status = None while status != 0: if status == 4: log.error("Could not open dome.") await self._change_motion_status(MotionStatus.UNKNOWN) return await event_wait(self._abort_shutter, 1) try: status = await self._device.get("ShutterStatus") except ConnectionError: await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.InitError("Could not open dome.") if time.time() - time_attempt > 10: await self._send_open_dome() time_attempt = time.time() log.info("Dome opened.") await self._change_motion_status(MotionStatus.POSITIONED) await self.comm.send_event(RoofOpenedEvent())
async def _send_close_dome(self) -> None: """Send command to close dome.""" try: await self._device.put("CloseShutter") await self._device.put("SlewToAzimuth", Azimuth=0) except ConnectionError: await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.ParkError("Could not close dome.")
[docs] @timeout(1200000) async def park(self, **kwargs: Any) -> None: """Close dome.""" if self.motion_status() == MotionStatus.PARKING: return async with LockWithAbort(self._lock_shutter, self._abort_shutter): log.info("Closing dome...") await self._change_motion_status(MotionStatus.PARKING) await self.comm.send_event(RoofClosingEvent()) await self._send_close_dome() time_attempt = time.time() status = None while status != 1: if status == 4: log.error("Could not close dome.") await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.ParkError("Could not close dome.") await event_wait(self._abort_shutter, 1) try: status = await self._device.get("ShutterStatus") except ConnectionError: await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.ParkError("Could not close dome.") if time.time() - time_attempt > 10: await self._send_close_dome() time_attempt = time.time() log.info("Dome closed.") await self._change_motion_status(MotionStatus.PARKED)
async def _move(self, az: float, abort: asyncio.Event) -> None: """Move the dome to the given azimuth.""" try: await self._device.put("SlewToAzimuth", Azimuth=self._adjust_azimuth(az)) except ConnectionError: await self._change_motion_status(MotionStatus.UNKNOWN) raise exc.MoveError("Could not move dome.") log_timer = 0 while 180 - abs(abs(az - self._azimuth) - 180) > self._tolerance: if abort.is_set(): raise InterruptedError("Moving dome aborted.") if log_timer == 0: log.info( "Moving dome from current az=%.2f° to %.2f° (%.2f° left)...", self._azimuth, az, 180 - abs(abs(az - self._azimuth) - 180), ) log_timer = (log_timer + 1) % 10 await event_wait(abort, 1) log.info("Moved to az=%.2f.", az)
[docs] @timeout(1200000) async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None: """Moves to given coordinates.""" # do nothing if not ready bad_states = [ MotionStatus.PARKED, MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.ERROR, MotionStatus.UNKNOWN, ] if not self._device.connected or self.motion_status() in bad_states: return if az == self._set_az: return self._set_az = az large_move = abs(az - self._azimuth) > 2.0 * self._tolerance tracking = self.is_following and not large_move async with LockWithAbort(self._lock_move, self._abort_move): self._altitude = alt await self._change_motion_status(MotionStatus.TRACKING if tracking else MotionStatus.SLEWING) await self._move(az, self._abort_move) await self._change_motion_status(MotionStatus.TRACKING if self.is_following else MotionStatus.POSITIONED)
[docs] async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None: """Stop the motion.""" pass
async def _update_status(self) -> None: """Update status from dome.""" while True: try: self._azimuth = self._adjust_azimuth(await self._device.get("Azimuth")) await self.comm.set_state(IPointingAltAz, AltAzState(alt=self._altitude, az=self._azimuth)) except ConnectionError: pass bad_states = [ MotionStatus.PARKED, MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.ERROR, MotionStatus.UNKNOWN, ] ready = self._device.connected and self.motion_status() not in bad_states await self.comm.set_state(IReady, ReadyState(ready=ready)) await asyncio.sleep(2) @staticmethod def _adjust_azimuth(az: float) -> float: """Baader measures azimuth as West of South — convert both ways.""" az += 180 if az >= 360: az -= 360 return az __all__ = ["AlpacaDome"]