Source code for pyobs_sbig.sbigfiltercamera

import asyncio
import logging
from typing import List, Optional, Any

from pyobs.images import Image
from pyobs.mixins import MotionStatusMixin
from pyobs.events import FilterChangedEvent
from pyobs.interfaces import IFilters
from pyobs.utils.enums import MotionStatus
from pyobs.utils.threads import LockWithAbort
from .sbigcamera import SbigCamera


log = logging.getLogger(__name__)


class SbigFilterCamera(MotionStatusMixin, SbigCamera, IFilters):
    """A pyobs module for SBIG cameras."""

    __module__ = "pyobs_sbig"

    def __init__(self, filter_wheel: str, filter_names: Optional[List[str]] = None, **kwargs: Any):
        """Initializes a new SbigCamera.

        Args:
            filter_names: List of filter names.
        """
        SbigCamera.__init__(self, **kwargs)
        from .sbigudrv import FilterWheelPosition, FilterWheelModel  # type: ignore

        # filter wheel
        self.filter_wheel = FilterWheelModel[filter_wheel]

        # and filter names
        if filter_names is None:
            filter_names = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
        positions = [p for p in FilterWheelPosition]
        self._filter_names = dict(zip(positions[1:], filter_names))
        self._filter_names[FilterWheelPosition.UNKNOWN] = "UNKNOWN"

        # allow to abort motion (filter wheel)
        self._lock_motion = asyncio.Lock()
        self._abort_motion = asyncio.Event()

        # current position
        self._position = FilterWheelPosition.UNKNOWN

        # init mixins
        MotionStatusMixin.__init__(self, **kwargs, motion_status_interfaces=["IFilters"])

[docs] async def open(self) -> None: """Open module. Raises: ValueError: If cannot connect to camera or set filter wheel. """ from .sbigudrv import FilterWheelModel # set filter wheel model if self.filter_wheel != FilterWheelModel.UNKNOWN: log.info("Initialising filter wheel...") try: self._cam.set_filter_wheel(self.filter_wheel) except ValueError as e: raise ValueError("Could not set filter wheel: %s" % str(e)) # open camera await SbigCamera.open(self) # init status of filter wheel if self.filter_wheel != FilterWheelModel.UNKNOWN: await self._change_motion_status(MotionStatus.POSITIONED, interface="IFilters") # subscribe to events if self.comm: await self.comm.register_event(FilterChangedEvent)
async def _expose(self, exposure_time: float, open_shutter: bool, abort_event: asyncio.Event) -> Image: """Actually do the exposure, should be implemented by derived classes. Args: exposure_time: The requested exposure time in ms. open_shutter: Whether or not to open the shutter. abort_event: Event that gets triggered when exposure should be aborted. Returns: The actual image. Raises: pyobs.utils.exceptions.GrabImageError: If exposure was not successful. """ from .sbigudrv import FilterWheelModel # do expsure img = await SbigCamera._expose(self, exposure_time, open_shutter, abort_event) # add filter to FITS headers if self.filter_wheel != FilterWheelModel.UNKNOWN: img.header["FILTER"] = (await self.get_filter(), "Current filter") # finished return img
[docs] async def set_filter(self, filter_name: str, **kwargs: Any) -> None: """Set the current filter. Args: filter_name: Name of filter to set. Raises: ValueError: If binning could not be set. NotImplementedError: If camera doesn't have a filter wheel. """ from .sbigudrv import FilterWheelModel, FilterWheelStatus # do we have a filter wheel? if self.filter_wheel == FilterWheelModel.UNKNOWN: raise NotImplementedError # reverse dict and search for name filters = {y: x for x, y in self._filter_names.items()} if filter_name not in filters: raise ValueError("Unknown filter: %s" % filter_name) # there already? position, status = self._cam.get_filter_position_and_status() if position == filters[filter_name] and status == FilterWheelStatus.IDLE: log.info("Filter changed.") return # set status await self._change_motion_status(MotionStatus.SLEWING, interface="IFilters") # acquire lock async with LockWithAbort(self._lock_motion, self._abort_motion): # set it log.info("Changing filter to %s...", filter_name) self._cam.set_filter(filters[filter_name]) # wait for it while True: # break, if wheel is idle and filter is set position, status = self._cam.get_filter_position_and_status() if position == filters[filter_name] and status == FilterWheelStatus.IDLE: break # abort? if self._abort_motion.is_set(): raise InterruptedError("Filter change aborted.") # sleep a little await asyncio.sleep(0.1) # send event log.info("Filter changed.") self.comm.send_event(FilterChangedEvent(filter_name)) # set status await self._change_motion_status(MotionStatus.POSITIONED, interface="IFilters")
[docs] async def get_filter(self, **kwargs: Any) -> str: """Get currently set filter. Returns: Name of currently set filter. Raises: ValueError: If filter could not be fetched. NotImplementedError: If camera doesn't have a filter wheel. """ from .sbigudrv import FilterWheelModel # do we have a filter wheel? if self.filter_wheel == FilterWheelModel.UNKNOWN: raise NotImplementedError try: self._position, _ = self._cam.get_filter_position_and_status() except ValueError: # use existing position pass return self._filter_names[self._position]
[docs] async def list_filters(self, **kwargs: Any) -> List[str]: """List available filters. Returns: List of available filters. Raises: NotImplementedError: If camera doesn't have a filter wheel. """ from .sbigudrv import FilterWheelModel # do we have a filter wheel? if self.filter_wheel == FilterWheelModel.UNKNOWN: raise NotImplementedError # return names return [f for f in self._filter_names.values() if f is not None]
[docs] async def init(self, **kwargs: Any) -> None: """Initialize device. Raises: pyobs.utils.exceptions.InitError: If device could not be initialized. """ pass
[docs] async def park(self, **kwargs: Any) -> None: """Park device. Raises: pyobs.utils.exceptions.ParkError: If device could not be parked. """ pass
[docs] async def stop_motion(self, device: Optional[str] = None, **kwargs: Any) -> None: """Stop the motion. Args: device: Name of device to stop, or None for all. """ pass
[docs] async def is_ready(self, **kwargs: Any) -> bool: """Returns the device is "ready", whatever that means for the specific device. Returns: Whether device is ready """ return True
__all__ = ["SbigFilterCamera"]