Source code for pyobs.modules.focus.focusseries

import asyncio
import logging
from typing import Any

import numpy as np

from pyobs.events import BadWeatherEvent, Event, FocusFoundEvent
from pyobs.interfaces import (
    IAutoFocus,
    ICamera,
    IData,
    IExposureTime,
    IFilters,
    IFocuser,
    IImageType,
)
from pyobs.mixins import CameraSettingsMixin
from pyobs.modules import Module, raises, timeout
from pyobs.object import get_object
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import ImageType
from pyobs.utils.focusseries import FocusSeries

log = logging.getLogger(__name__)


class AutoFocusSeries(Module, CameraSettingsMixin, IAutoFocus):
    """Module for auto-focusing a telescope."""

    __module__ = "pyobs.modules.focus"

    def __init__(
        self,
        focuser: str | IFocuser,
        camera: str | IData,
        series: dict[str, Any] | FocusSeries,
        offset: bool = False,
        init_offset_to_zero: bool = True,
        filters: str | IFilters | None = None,
        filter_name: str | None = None,
        binning: int | None = None,
        broadcast: bool = False,
        final_image: bool = True,
        **kwargs: Any,
    ):
        """Initialize a new auto focus system.

        Args:
            focuser: Name of IFocuser.
            camera: Name of ICamera.
            filters: Name of IFilters, if any.
            filter_name: Name of filter to set.
            offset: If True, offsets are used instead of absolute focus values.
            init_offset_to_zero: If True, a zero offset is used as initial guess. Otherwise, the current value is used.
            broadcast: Whether to broadcast focus series images.
            final_image: Whether to take final image with optimal focus, which is always broadcasted.
        """
        Module.__init__(self, **kwargs)

        # store focuser and camera
        self._focuser = focuser
        self._camera = camera
        self._filters = filters
        self._offset = offset
        self._init_offset_to_zero = init_offset_to_zero
        self._abort = asyncio.Event()
        self._running = False
        self._broadcast = broadcast
        self._final_image = final_image

        # create focus series
        self._series: FocusSeries = get_object(series, FocusSeries)

        # init camera settings mixin
        CameraSettingsMixin.__init__(self, filters=filters, filter_name=filter_name, binning=binning, **kwargs)

        # register exceptions
        if isinstance(camera, str):
            exc.register_exception(
                exc.RemoteError, 3, timespan=600, module=camera, callback=self._default_remote_error_callback
            )
        if isinstance(focuser, str):
            exc.register_exception(
                exc.RemoteError, 3, timespan=600, module=focuser, callback=self._default_remote_error_callback
            )

[docs] async def open(self) -> None: """Open module""" await Module.open(self) # register event await self.comm.register_event(FocusFoundEvent) await self.comm.register_event(BadWeatherEvent, self._on_bad_weather) # check focuser and camera try: await self.proxy(self._focuser, IFocuser) await self.proxy(self._camera, IData) except ValueError: log.warning("Either camera or focuser do not exist or are not of correct type at the moment.")
[docs] @raises(exc.AbortedError, exc.FocusError) @timeout(600) async def auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]: """Perform an auto-focus series. This method performs an auto-focus series with "count" images on each side of the initial guess and the given step size. With count=3, step=1 and guess=10, this takes images at the following focus values: 7, 8, 9, 10, 11, 12, 13 Args: count: Number of images to take on each side of the initial guess. Should be an odd number. step: Step size. exposure_time: Exposure time for images. Returns: Tuple of obtained best focus value and its uncertainty. Or Nones, if focus series failed. Raises: FileNotFoundException: If image could not be downloaded. """ try: log.info("Performing auto-focus...") self._running = True return await self._auto_focus(count, step, exposure_time, **kwargs) finally: self._running = False
async def _auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]: # get focuser log.info("Getting proxy for focuser...") focuser = await self.proxy(self._focuser, IFocuser) # get camera log.info("Getting proxy for camera...") camera = await self.proxy(self._camera, IData) # do camera settings await self._do_camera_settings(camera) if isinstance(camera, IImageType): await camera.set_image_type(ImageType.FOCUS) # get filter wheel and current filter filter_name = "unknown" try: filter_wheel = await self.proxy(self._filters, IFilters) filter_name = await filter_wheel.get_filter() except ValueError: log.warning("Filter module is not of type IFilters. Could not get filter.") # get focus as first guess try: if self._offset: guess = 0.0 if self._init_offset_to_zero else await focuser.get_focus_offset() log.info("Using focus offset of %.2fmm as initial guess.", guess) else: guess = await focuser.get_focus() log.info("Using current focus of %.2fmm as initial guess.", guess) except exc.RemoteError: raise exc.FocusError("Could not fetch current focus value.") # define array of focus values to iterate focus_values = np.linspace(guess - count * step, guess + count * step, 2 * count + 1) # reset self._series.reset() self._abort = asyncio.Event() # loop focus values log.info("Starting focus series...") for foc in focus_values: # set focus if self._offset: log.info("Changing focus offset to %.2fmm...", foc) else: log.info("Changing focus to %.2fmm...", foc) if self._abort.is_set(): raise exc.AbortedError() try: if self._offset: await focuser.set_focus_offset(float(foc)) else: await focuser.set_focus(float(foc)) except exc.RemoteError: raise exc.FocusError("Could not set new focus value.") # do exposure log.info("Taking picture...") if self._abort.is_set(): raise exc.AbortedError() try: filename = await self._take_image(camera, exposure_time) except exc.RemoteError: log.error("Could not take image.") continue # download image log.info("Downloading image...") image = await self.vfs.read_image(filename) # get actual focus if self._offset: actual_focus = await focuser.get_focus_offset() else: actual_focus = await focuser.get_focus() # analyse log.info("Analysing picture...") try: await self._series.analyse_image(image, actual_focus) except Exception: # do nothing... log.info("Could not analyse image.") continue # fit focus if self._abort.is_set(): raise exc.AbortedError() try: focus = self._series.fit_focus() except Exception as e: # restore initial guess if self._offset: await focuser.set_focus_offset(float(guess)) else: await focuser.set_focus(float(guess)) raise exc.FocusError(f"Could not calculate best focus: {e}") # did focus series fail? if focus is None or focus[0] is None or np.isnan(focus[0]): log.warning("Focus series failed.") # reset to initial values if self._offset: log.info("Resetting focus offset to 0.") await focuser.set_focus_offset(0) else: log.info("Resetting focus to initial guess of %.3f mm.", guess) await focuser.set_focus(guess) # raise error raise exc.FocusError("Could not find best focus.") # log and set focus if self._offset: log.info("Setting new focus offset of (%.3f+-%.3f) mm.", focus[0], focus[1]) absolute = focus[0] + await focuser.get_focus() await focuser.set_focus_offset(focus[0]) else: log.info("Setting new focus value of (%.3f+-%.3f) mm.", focus[0], focus[1]) absolute = focus[0] + await focuser.get_focus_offset() await focuser.set_focus(focus[0]) # send event await self.comm.send_event(FocusFoundEvent(absolute, focus[1], filter_name)) # take final image? if self._final_image: log.info("Exposing final image at %.2f mm and broadcasting it...", absolute) if isinstance(camera, IExposureTime): await camera.set_exposure_time(exposure_time) if isinstance(camera, IData): await camera.grab_data() # return result return focus[0], focus[1] async def _take_image(self, camera: ICamera, exposure_time: float) -> str: if isinstance(camera, IExposureTime): await camera.set_exposure_time(exposure_time) if isinstance(camera, IData): return await camera.grab_data(broadcast=self._broadcast) else: raise exc.GeneralError("Cannot grab data from camera.")
[docs] async def auto_focus_status(self, **kwargs: Any) -> dict[str, Any]: """Returns current status of auto focus. Returned dictionary contains a list of focus/fwhm pairs in X and Y direction. Returns: Dictionary with current status. """ return {}
[docs] @timeout(20) async def abort(self, **kwargs: Any) -> None: """Abort current actions.""" self._abort.set()
async def _on_bad_weather(self, event: Event, sender: str) -> bool: """Abort series if a bad weather event occurs. Args: event: The bad weather event. sender: Who sent it. """ # check if not isinstance(event, BadWeatherEvent): raise ValueError("Wrong event type.") # park if self._running: log.warning("Aborting focus series due to bad weather.") self._abort.set() return True return False __all__ = ["AutoFocusSeries"]