Source code for pyobs.mixins.waitformotion

import logging
import time
from asyncio import Event
from typing import Union, List, Any, Optional

from pyobs.interfaces import IMotion
from pyobs.modules import Module
from pyobs.utils.enums import MotionStatus
from pyobs.utils.parallel import event_wait

log = logging.getLogger(__name__)


class WaitForMotionMixin:
    """Mixin for a device that should wait for the motion status of another device."""

    __module__ = "pyobs.mixins"

    def __init__(
        self,
        wait_for_modules: Optional[List[str]] = None,
        wait_for_states: Optional[List[Union[MotionStatus, str]]] = None,
        wait_for_timeout: float = 0,
        **kwargs: Any,
    ):
        """Initializes the mixin.

        Args:
            wait_for_modules: One or more modules to wait for.
            wait_for_states: List of states to wait for.
            wait_for_timeout: Wait timeout in seconds.
        """

        # store
        self.__wait_for_modules = wait_for_modules if wait_for_modules is not None else []
        self.__wait_for_states = (
            [s if isinstance(s, MotionStatus) else MotionStatus(s) for s in wait_for_states]
            if wait_for_states is not None
            else []
        )
        self.__wait_for_timeout = wait_for_timeout

[docs] async def _wait_for_motion(self, abort: Event) -> None: """Wait until all devices are in one of the given motion states. Args: abort: Abort event. Raises: TimeoutError: If wait timed out. """ # no device? if len(self.__wait_for_modules) == 0: return # check type this = self if not isinstance(self, Module): raise ValueError("This is not a module.") # get all proxies proxies = [await self.proxy(device) for device in this.__wait_for_modules] # all need to be derived from IMotion if not all([isinstance(p, IMotion) for p in proxies]): raise ValueError("Not all given devices are derived from IMotion!") # run until timeout start = time.time() log.info("Waiting for motion of other modules...") while not abort.is_set(): # timeout? if time.time() > start + this.__wait_for_timeout: raise TimeoutError # get all states and compare them states = [await p.get_motion_status() for p in proxies] # in a good state? good = [s in this.__wait_for_states for s in states] # if all good, we're finished waiting if all(good): log.info("All other modules have finished moving.") break # sleep a little await event_wait(abort, 1)
__all__ = ["WaitForMotionMixin"]