Source code for pyobs.mixins.motionstatus

import logging
from typing import Any, Optional, List

from pyobs.modules import Module
from import MotionStatusChangedEvent
from pyobs.utils.enums import MotionStatus

log = logging.getLogger(__name__)

class MotionStatusMixin:
    """Mixin for IMotion devices for handling status."""

    __module__ = "pyobs.mixins"

    def __init__(self, motion_status_interfaces: Optional[List[str]] = None, **kwargs: Any):
        """Initializes the mixin.

            interfaces: List of interfaces to handle or None
        self.__motion_status_interfaces = [] if motion_status_interfaces is None else motion_status_interfaces
        self.__motion_status = MotionStatus.UNKNOWN
        self.__motion_status_single = {i: MotionStatus.UNKNOWN for i in self.__motion_status_interfaces}

    async def open(self) -> None:
        # subscribe to events
        if isinstance(self, Module) and self.comm:
            await self.comm.register_event(MotionStatusChangedEvent)

[docs] async def _change_motion_status(self, status: MotionStatus, interface: Optional[str] = None) -> None: """Change motion status and send event, Args: status: New motion status interface: Interface to set motion status for """ # did something change? changed = False # global or individual? if interface is None: # did status change? if self.__motion_status != status: # set it changed = True self.__motion_status = status # also set all individual interfaces for i in self.__motion_status_interfaces: if self.__motion_status_single[i] != status: changed = True self.__motion_status_single[i] = status else: # does it exist? if interface not in self.__motion_status_interfaces: return # did status change? if self.__motion_status_single[interface] != status: # set it self.__motion_status_single[interface] = status changed = True # combine status self.__motion_status = self._combine_motion_status() # send event if changed: this = self # log it if interface is None:"Changed motion status to %s.", status) else:"Changed motion status of %s to %s.", interface, status) # send event if not isinstance(self, Module): raise ValueError("This is not a module.") await self.comm.send_event( MotionStatusChangedEvent(status=this.__motion_status, interfaces=this.__motion_status_single) )
[docs] def _combine_motion_status(self) -> MotionStatus: """Method for combining motion statuses for individual interfaces into the global one. Can be overriden.""" # none? if len(self.__motion_status_interfaces) == 0: return MotionStatus.UNKNOWN # if any interface is of state ERROR, UNKNOWN, INITIALIZING, PARKING, SLEWING # we use that as global status (in that order) for status in [ MotionStatus.ERROR, MotionStatus.UNKNOWN, MotionStatus.INITIALIZING, MotionStatus.PARKING, MotionStatus.SLEWING, ]: if status in self.__motion_status_single.values(): return status # otherwise just take status of first interface return self.__motion_status_single[self.__motion_status_interfaces[0]]
[docs] async def get_motion_status(self, device: Optional[str] = None, **kwargs: Any) -> MotionStatus: """Returns current motion status. Args: device: Name of device to get status for, or None. Returns: A string from the Status enumerator. """ # global or individual? if device is None: return self.__motion_status else: # does it exist? if device in self.__motion_status_single: return self.__motion_status_single[device] else: raise KeyError
__all__ = ["MotionStatusMixin"]