Source code for pyobs_aravis.araviscamera

import asyncio
import logging
import time
from typing import Any

import numpy.typing as npt
from pyobs.interfaces import ExposureTimeState, IExposureTime
from pyobs.modules.camera import BaseVideo

log = logging.getLogger(__name__)


class AravisCamera(BaseVideo, IExposureTime):
    """A pyobs module for Aravis cameras."""

    __module__ = "pyobs_aravis"

    def __init__(
        self,
        device: str,
        settings: dict[str, Any] | None = None,
        buffers: int = 5,
        **kwargs: Any,
    ):
        """Initializes a new AravisCamera.

        Args:
            device: Name of camera to connect to.
            settings: Dictionary of camera settings to apply on connect.
            buffers: Number of acquisition buffers.
        """
        BaseVideo.__init__(self, **kwargs)
        from . import aravis

        self._device_name = device
        self._camera: aravis.Camera | None = None
        self._settings: dict[str, Any] = {} if settings is None else settings
        self._camera_lock = asyncio.Lock()
        self._buffers = buffers

        if device is not None:
            self.add_background_task(self._capture)
        else:
            log.error("No device name given, not connecting to any camera.")

[docs] async def open(self) -> None: """Open module.""" from . import aravis await BaseVideo.open(self) ids: list[str] = aravis.get_device_ids() # type: ignore[assignment] if self._device_name not in ids: raise ValueError("Could not find given device name in list of available cameras.") await self.activate_camera()
[docs] async def close(self) -> None: """Close the module.""" await BaseVideo.close(self) async with self._camera_lock: self._close_camera()
def _open_camera(self) -> None: """Open camera.""" from . import aravis log.info("Connecting to camera %s...", self._device_name) self._camera = aravis.Camera(self._device_name) # type: ignore[assignment] log.info("Connected.") for key, value in self._settings.items(): log.info("Setting value %s=%s...", key, value) self._camera.set_feature(key, value) # type: ignore[union-attr] self._camera.start_acquisition_continuous(nb_buffers=self._buffers) # type: ignore[union-attr] def _close_camera(self) -> None: """Close camera.""" if self._camera is not None: log.info("Closing camera...") self._camera.stop_acquisition() # type: ignore[union-attr] self._camera.shutdown() # type: ignore[union-attr] self._camera = None async def _activate_camera(self) -> None: """Open camera on activation.""" async with self._camera_lock: self._open_camera() async def _deactivate_camera(self) -> None: """Close camera on deactivation.""" async with self._camera_lock: self._close_camera() async def _capture(self) -> None: """Take new images in loop.""" last = time.time() while True: try: if self._camera is None or not self.camera_active: await asyncio.sleep(0.1) continue frame: npt.NDArray[Any] = self._camera.pop_frame() # type: ignore[union-attr] while frame is None: await asyncio.sleep(0.01) frame = self._camera.pop_frame() # type: ignore[union-attr] if time.time() - last < self._interval: await asyncio.sleep(0.01) continue last = time.time() await self._set_image(frame) except Exception: await asyncio.sleep(1)
[docs] async def set_exposure_time(self, exposure_time: float, **kwargs: Any) -> None: """Set the exposure time in seconds. Args: exposure_time: Exposure time in seconds. """ await self.activate_camera() self._camera.set_exposure_time(exposure_time * 1e6) # type: ignore[union-attr] await self.comm.set_state(IExposureTime, ExposureTimeState(exposure_time=exposure_time))
__all__ = ["AravisCamera"]