import asyncio
import logging
from typing import Any
import numpy as np
from pyobs.events import BadWeatherEvent, Event, FocusFoundEvent
from pyobs.interfaces import (
IAutoFocus,
ICamera,
IData,
IExposureTime,
IFilters,
IFocuser,
IImageType,
)
from pyobs.mixins import CameraSettingsMixin
from pyobs.modules import Module, raises, timeout
from pyobs.object import get_object
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import ImageType
from pyobs.utils.focusseries import FocusSeries
log = logging.getLogger(__name__)
class AutoFocusSeries(Module, CameraSettingsMixin, IAutoFocus):
"""Module for auto-focusing a telescope."""
__module__ = "pyobs.modules.focus"
def __init__(
self,
focuser: str | IFocuser,
camera: str | IData,
series: dict[str, Any] | FocusSeries,
offset: bool = False,
init_offset_to_zero: bool = True,
filters: str | IFilters | None = None,
filter_name: str | None = None,
binning: int | None = None,
broadcast: bool = False,
final_image: bool = True,
**kwargs: Any,
):
"""Initialize a new auto focus system.
Args:
focuser: Name of IFocuser.
camera: Name of ICamera.
filters: Name of IFilters, if any.
filter_name: Name of filter to set.
offset: If True, offsets are used instead of absolute focus values.
init_offset_to_zero: If True, a zero offset is used as initial guess. Otherwise, the current value is used.
broadcast: Whether to broadcast focus series images.
final_image: Whether to take final image with optimal focus, which is always broadcasted.
"""
Module.__init__(self, **kwargs)
# store focuser and camera
self._focuser = focuser
self._camera = camera
self._filters = filters
self._offset = offset
self._init_offset_to_zero = init_offset_to_zero
self._abort = asyncio.Event()
self._running = False
self._broadcast = broadcast
self._final_image = final_image
# create focus series
self._series: FocusSeries = get_object(series, FocusSeries)
# init camera settings mixin
CameraSettingsMixin.__init__(self, filters=filters, filter_name=filter_name, binning=binning, **kwargs)
# register exceptions
if isinstance(camera, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=camera, callback=self._default_remote_error_callback
)
if isinstance(focuser, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=focuser, callback=self._default_remote_error_callback
)
[docs]
async def open(self) -> None:
"""Open module"""
await Module.open(self)
# register event
await self.comm.register_event(FocusFoundEvent)
await self.comm.register_event(BadWeatherEvent, self._on_bad_weather)
# check focuser and camera
try:
await self.proxy(self._focuser, IFocuser)
await self.proxy(self._camera, IData)
except ValueError:
log.warning("Either camera or focuser do not exist or are not of correct type at the moment.")
[docs]
@raises(exc.AbortedError, exc.FocusError)
@timeout(600)
async def auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]:
"""Perform an auto-focus series.
This method performs an auto-focus series with "count" images on each side of the initial guess and the given
step size. With count=3, step=1 and guess=10, this takes images at the following focus values:
7, 8, 9, 10, 11, 12, 13
Args:
count: Number of images to take on each side of the initial guess. Should be an odd number.
step: Step size.
exposure_time: Exposure time for images.
Returns:
Tuple of obtained best focus value and its uncertainty. Or Nones, if focus series failed.
Raises:
FileNotFoundException: If image could not be downloaded.
"""
try:
log.info("Performing auto-focus...")
self._running = True
return await self._auto_focus(count, step, exposure_time, **kwargs)
finally:
self._running = False
async def _auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]:
# get focuser
log.info("Getting proxy for focuser...")
focuser = await self.proxy(self._focuser, IFocuser)
# get camera
log.info("Getting proxy for camera...")
camera = await self.proxy(self._camera, IData)
# do camera settings
await self._do_camera_settings(camera)
if isinstance(camera, IImageType):
await camera.set_image_type(ImageType.FOCUS)
# get filter wheel and current filter
filter_name = "unknown"
try:
filter_wheel = await self.proxy(self._filters, IFilters)
filter_name = await filter_wheel.get_filter()
except ValueError:
log.warning("Filter module is not of type IFilters. Could not get filter.")
# get focus as first guess
try:
if self._offset:
guess = 0.0 if self._init_offset_to_zero else await focuser.get_focus_offset()
log.info("Using focus offset of %.2fmm as initial guess.", guess)
else:
guess = await focuser.get_focus()
log.info("Using current focus of %.2fmm as initial guess.", guess)
except exc.RemoteError:
raise exc.FocusError("Could not fetch current focus value.")
# define array of focus values to iterate
focus_values = np.linspace(guess - count * step, guess + count * step, 2 * count + 1)
# reset
self._series.reset()
self._abort = asyncio.Event()
# loop focus values
log.info("Starting focus series...")
for foc in focus_values:
# set focus
if self._offset:
log.info("Changing focus offset to %.2fmm...", foc)
else:
log.info("Changing focus to %.2fmm...", foc)
if self._abort.is_set():
raise exc.AbortedError()
try:
if self._offset:
await focuser.set_focus_offset(float(foc))
else:
await focuser.set_focus(float(foc))
except exc.RemoteError:
raise exc.FocusError("Could not set new focus value.")
# do exposure
log.info("Taking picture...")
if self._abort.is_set():
raise exc.AbortedError()
try:
filename = await self._take_image(camera, exposure_time)
except exc.RemoteError:
log.error("Could not take image.")
continue
# download image
log.info("Downloading image...")
image = await self.vfs.read_image(filename)
# get actual focus
if self._offset:
actual_focus = await focuser.get_focus_offset()
else:
actual_focus = await focuser.get_focus()
# analyse
log.info("Analysing picture...")
try:
await self._series.analyse_image(image, actual_focus)
except Exception:
# do nothing...
log.info("Could not analyse image.")
continue
# fit focus
if self._abort.is_set():
raise exc.AbortedError()
try:
focus = self._series.fit_focus()
except Exception as e:
# restore initial guess
if self._offset:
await focuser.set_focus_offset(float(guess))
else:
await focuser.set_focus(float(guess))
raise exc.FocusError(f"Could not calculate best focus: {e}")
# did focus series fail?
if focus is None or focus[0] is None or np.isnan(focus[0]):
log.warning("Focus series failed.")
# reset to initial values
if self._offset:
log.info("Resetting focus offset to 0.")
await focuser.set_focus_offset(0)
else:
log.info("Resetting focus to initial guess of %.3f mm.", guess)
await focuser.set_focus(guess)
# raise error
raise exc.FocusError("Could not find best focus.")
# log and set focus
if self._offset:
log.info("Setting new focus offset of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + await focuser.get_focus()
await focuser.set_focus_offset(focus[0])
else:
log.info("Setting new focus value of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + await focuser.get_focus_offset()
await focuser.set_focus(focus[0])
# send event
await self.comm.send_event(FocusFoundEvent(absolute, focus[1], filter_name))
# take final image?
if self._final_image:
log.info("Exposing final image at %.2f mm and broadcasting it...", absolute)
if isinstance(camera, IExposureTime):
await camera.set_exposure_time(exposure_time)
if isinstance(camera, IData):
await camera.grab_data()
# return result
return focus[0], focus[1]
async def _take_image(self, camera: ICamera, exposure_time: float) -> str:
if isinstance(camera, IExposureTime):
await camera.set_exposure_time(exposure_time)
if isinstance(camera, IData):
return await camera.grab_data(broadcast=self._broadcast)
else:
raise exc.GeneralError("Cannot grab data from camera.")
[docs]
async def auto_focus_status(self, **kwargs: Any) -> dict[str, Any]:
"""Returns current status of auto focus.
Returned dictionary contains a list of focus/fwhm pairs in X and Y direction.
Returns:
Dictionary with current status.
"""
return {}
[docs]
@timeout(20)
async def abort(self, **kwargs: Any) -> None:
"""Abort current actions."""
self._abort.set()
async def _on_bad_weather(self, event: Event, sender: str) -> bool:
"""Abort series if a bad weather event occurs.
Args:
event: The bad weather event.
sender: Who sent it.
"""
# check
if not isinstance(event, BadWeatherEvent):
raise ValueError("Wrong event type.")
# park
if self._running:
log.warning("Aborting focus series due to bad weather.")
self._abort.set()
return True
return False
__all__ = ["AutoFocusSeries"]