Source code for pyobs.modules.roof.dummyroof

import asyncio
import logging
from typing import Any, Optional

from pyobs.events import RoofOpenedEvent, RoofClosingEvent
from pyobs.interfaces import IRoof
from pyobs.modules import timeout
from pyobs.modules.roof import BaseRoof
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort

log = logging.getLogger(__name__)


class DummyRoof(BaseRoof, IRoof):
    """A dummy camera for testing."""

    __module__ = "pyobs.modules.roof"

    _ROOF_CLOSED_PERCENTAGE = 0
    _ROOF_OPEN_PERCENTAGE = 100

    def __init__(self, **kwargs: Any):
        """Creates a new dummy root."""
        BaseRoof.__init__(self, **kwargs)

        # dummy state
        self._open_percentage: int = self._ROOF_CLOSED_PERCENTAGE

        self._lock_motion = asyncio.Lock()
        self._abort_motion = asyncio.Event()

[docs] async def open(self) -> None: """Open module.""" await BaseRoof.open(self) await self.comm.register_event(RoofOpenedEvent) await self.comm.register_event(RoofClosingEvent)
[docs] @timeout(15) async def init(self, **kwargs: Any) -> None: """Open the roof. Raises: AcquireLockFailed: If current motion could not be aborted. """ if self._is_open(): return async with LockWithAbort(self._lock_motion, self._abort_motion): await self._change_motion_status(MotionStatus.INITIALIZING) await self._move_roof(self._ROOF_OPEN_PERCENTAGE) await self._change_motion_status(MotionStatus.IDLE) self.comm.send_event(RoofOpenedEvent())
def _is_open(self): return self._open_percentage == self._ROOF_OPEN_PERCENTAGE
[docs] @timeout(15) async def park(self, **kwargs: Any) -> None: """Close the roof. Raises: AcquireLockFailed: If current motion could not be aborted. """ if self._is_closed(): return async with LockWithAbort(self._lock_motion, self._abort_motion): await self._change_motion_status(MotionStatus.PARKING) self.comm.send_event(RoofClosingEvent()) await self._move_roof(self._ROOF_CLOSED_PERCENTAGE) await self._change_motion_status(MotionStatus.PARKED)
def _is_closed(self): return self._open_percentage == self._ROOF_CLOSED_PERCENTAGE async def _move_roof(self, target_pos: int) -> None: step = 1 if target_pos > self._open_percentage else -1 while self._open_percentage != target_pos: if self._abort_motion.is_set(): await self._change_motion_status(MotionStatus.IDLE) return self._open_percentage += step await asyncio.sleep(0.1)
[docs] def get_percent_open(self) -> float: """Get the percentage the roof is open.""" return self._open_percentage
[docs] async def stop_motion(self, device: Optional[str] = None, **kwargs: Any) -> None: """Stop the motion. Args: device: Name of device to stop, or None for all. Raises: AcquireLockFailed: If current motion could not be aborted. """ await self._change_motion_status(MotionStatus.ABORTING) async with LockWithAbort(self._lock_motion, self._abort_motion): await self._change_motion_status(MotionStatus.IDLE)
__all__ = ["DummyRoof"]