import logging
from typing import Any
from pyobs.events import ModeChangedEvent
from pyobs.interfaces import IMode, IMotion, IReady, ModeCapabilities, ModeState, MotionState, ReadyState
from pyobs.modules import Module
from pyobs.utils.enums import MotionStatus
from pyobs_zaber.zaberdriver import ZaberDriver
log = logging.getLogger(__name__)
_GROUP = "Mode"
class ZaberModeSelector(Module, IMode, IMotion):
"""Class for the Selection of Modus with a linear Motor (e.g. Spectroscopy or Photometry)."""
__module__ = "pyobs_zaber.ZaberModeSelector"
def __init__(
self,
modes: dict,
**kwargs: Any,
):
"""Creates a new ZaberModeSelector.
Args:
modes: dictionary of available modes in the form {name: position}
"""
Module.__init__(self, **kwargs)
self.driver = ZaberDriver(**kwargs)
self.modes = modes
self.current_mode = "undefined"
[docs]
async def open(self) -> None:
"""Open module."""
await Module.open(self)
await self.driver.open()
if self._comm:
await self.comm.register_event(ModeChangedEvent)
await self.comm.set_capabilities(IMode, ModeCapabilities(modes={_GROUP: list(self.modes.keys())}))
await self.comm.set_state(IMode, ModeState(modes={_GROUP: self.current_mode}))
await self.comm.set_state(IReady, ReadyState(ready=True))
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.PARKED))
[docs]
async def set_mode(self, mode: str, group: int = 0, **kwargs: Any) -> None:
"""Set the current mode.
Args:
mode: Name of mode to set.
group: Group number (unused, single group only).
Raises:
ValueError: If an invalid mode was given.
MoveError: If mode selector cannot be moved.
"""
if mode not in self.modes:
log.warning("Unknown mode %s. Available modes are: %s", mode, list(self.modes.keys()))
return
if self.current_mode == mode:
log.info("Mode %s already selected.", mode)
return
log.info("Moving mode selector ...")
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.SLEWING))
await self.driver.move_to(self.modes[mode])
self.current_mode = mode
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.POSITIONED))
await self.comm.send_event(ModeChangedEvent(_GROUP, mode))
await self.comm.set_state(IMode, ModeState(modes={_GROUP: self.current_mode}))
log.info("Mode %s ready.", mode)
[docs]
async def init(self, **kwargs: Any) -> None:
"""Initialize device."""
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.INITIALIZING))
await self.driver.home()
self.current_mode = "undefined"
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.IDLE))
[docs]
async def park(self, **kwargs: Any) -> None:
"""Park device."""
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.PARKING))
await self.driver.home()
self.current_mode = "undefined"
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.PARKED))
[docs]
async def stop_motion(self, device: str | None = None, **kwargs: Any) -> None:
"""Stop the motion."""
await self.driver.stop()
await self.comm.set_state(IMotion, MotionState(status=MotionStatus.IDLE))