Source code for pyobs_zwoeaf.eaffocuser

import asyncio
import logging
from typing import Any

import pyobs.utils.exceptions as exc
from pyobs.interfaces import IFocuser, IReady, ITemperatures
from pyobs.interfaces.IFocuser import FocuserState
from pyobs.interfaces.IReady import ReadyState
from pyobs.interfaces.ITemperatures import SensorReading, TemperaturesState
from pyobs.mixins import MotionStatusMixin
from pyobs.modules import Module
from pyobs.utils.enums import MotionStatus

log = logging.getLogger(__name__)

_STEP_TO_MM = 0.00242105263


class EAFFocuser(Module, MotionStatusMixin, IFocuser, ITemperatures):
    """A pyobs module for the ZWO EAF electronic auto focuser."""

    __module__ = "pyobs_zwoeaf"

    def __init__(
        self,
        device_number: int = 0,
        max_steps: int = 60000,
        backlash: int = 0,
        direction: bool = True,
        sound: bool = True,
        **kwargs: Any,
    ) -> None:
        Module.__init__(self, **kwargs)
        MotionStatusMixin.__init__(self)

        self._device_number = device_number
        self._max_steps = max_steps
        self._backlash = backlash
        self._direction = direction
        self._sound = sound
        self._eaf: Any | None = None
        self._focus_setpoint = 0.0
        self._focus_offset = 0.0

        self.add_background_task(self._poll_temperature)

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) from .EAF_focuser import EAF # type: ignore[import-untyped] self._eaf = EAF() if not self._eaf.connect(self._device_number): raise ValueError("EAF focuser failed to connect. Is the device connected via USB? Correct device_number?") self._eaf.setMaximalStep(self._max_steps) self._eaf.setBacklash(self._backlash) self._eaf.setDirection(self._direction) self._eaf.setSound(self._sound) log.info("Connected to EAF focuser, temperature: %.2f°C", self._eaf.getTemperature()) await MotionStatusMixin.open(self) await self._change_motion_status(MotionStatus.IDLE) self._focus_setpoint = self._eaf.getPosition() * _STEP_TO_MM await self.comm.set_state(IFocuser, FocuserState(focus=self._focus_setpoint, focus_offset=self._focus_offset)) await self.comm.set_state(IReady, ReadyState(ready=True))
[docs] async def close(self) -> None: """Close module.""" if self._eaf is not None: self._eaf.disconnect() self._eaf = None await Module.close(self)
[docs] async def init(self, **kwargs: Any) -> None: pass
[docs] async def park(self, **kwargs: Any) -> None: """Park focuser at position zero.""" await self.stop_motion() await self.set_focus(0.0)
[docs] async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None: """Stop focuser motion.""" if self._eaf is not None: self._eaf.stop() await self._change_motion_status(MotionStatus.IDLE)
[docs] async def set_focus(self, focus: float, **kwargs: Any) -> None: """Move focuser to given position. Args: focus: New focus position in mm. Raises: MoveError: If focuser cannot be moved. """ if self._eaf is None: raise ValueError("Not connected.") total_mm = focus + self._focus_offset step = int(total_mm / _STEP_TO_MM) log.info("Moving EAF to %.4f mm (offset %.4f mm, step %d)...", focus, self._focus_offset, step) await self._change_motion_status(MotionStatus.SLEWING) if not self._eaf.move(step): await self._change_motion_status(MotionStatus.ERROR) raise exc.MoveError("Could not move EAF motor.") while self._eaf.isMoving(): await asyncio.sleep(0.5) self._focus_setpoint = focus await self._change_motion_status(MotionStatus.POSITIONED) await self.comm.set_state(IFocuser, FocuserState(focus=self._focus_setpoint, focus_offset=self._focus_offset))
[docs] async def set_focus_offset(self, offset: float, **kwargs: Any) -> None: """Set focus offset and re-apply. Args: offset: New focus offset in mm. Raises: MoveError: If focuser cannot be moved. """ log.info("Setting focus offset to %.4f mm.", offset) self._focus_offset = offset await self.set_focus(self._focus_setpoint)
async def _poll_temperature(self) -> None: """Background task: periodically reads EAF temperature.""" while True: try: if self._eaf is not None: temp = self._eaf.getTemperature() await self.comm.set_state( ITemperatures, TemperaturesState(readings=[SensorReading(name="EAF", value=temp)]), ) except Exception: pass await asyncio.sleep(10) __all__ = ["EAFFocuser"]