import asyncio
import logging
from typing import Any
import numpy as np
from pyobs.events import BadWeatherEvent, Event, FocusFoundEvent
from pyobs.interfaces import (
IAutoFocus,
ICamera,
IData,
IExposureTime,
IFilters,
IFocuser,
IImageType,
)
from pyobs.interfaces.IAutoFocus import AutoFocusResult, AutoFocusState
from pyobs.mixins import CameraSettingsMixin
from pyobs.modules import Module, raises, timeout
from pyobs.object import get_object
from pyobs.utils import exceptions as exc
from pyobs.utils.enums import ImageType
from pyobs.utils.focusseries import FocusSeries
log = logging.getLogger(__name__)
class AutoFocusSeries(Module, CameraSettingsMixin, IAutoFocus):
"""Module for auto-focusing a telescope."""
__module__ = "pyobs.modules.focus"
def __init__(
self,
focuser: str | IFocuser,
camera: str | IData,
series: dict[str, Any] | FocusSeries,
offset: bool = False,
init_offset_to_zero: bool = True,
filters: str | IFilters | None = None,
filter_name: str | None = None,
binning: int | None = None,
broadcast: bool = False,
final_image: bool = True,
**kwargs: Any,
):
"""Initialize a new auto focus system.
Args:
focuser: Name of IFocuser.
camera: Name of ICamera.
filters: Name of IFilters, if any.
filter_name: Name of filter to set.
offset: If True, offsets are used instead of absolute focus values.
init_offset_to_zero: If True, a zero offset is used as initial guess. Otherwise, the current value is used.
broadcast: Whether to broadcast focus series images.
final_image: Whether to take final image with optimal focus, which is always broadcasted.
"""
Module.__init__(self, **kwargs)
# store focuser and camera
self._focuser = focuser
self._camera = camera
self._filters = filters
self._offset = offset
self._init_offset_to_zero = init_offset_to_zero
self._abort = asyncio.Event()
self._running = False
self._broadcast = broadcast
self._final_image = final_image
# create focus series
self._series: FocusSeries = get_object(series, FocusSeries)
# init camera settings mixin
CameraSettingsMixin.__init__(self, filters=filters, filter_name=filter_name, binning=binning, **kwargs)
# register exceptions
if isinstance(camera, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=camera, callback=self._default_remote_error_callback
)
if isinstance(focuser, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=focuser, callback=self._default_remote_error_callback
)
[docs]
async def open(self) -> None:
"""Open module"""
await Module.open(self)
# register event
await self.comm.register_event(FocusFoundEvent)
await self.comm.register_event(BadWeatherEvent, self._on_bad_weather)
# check focuser and camera
if not await self.has_proxy(self._focuser, IFocuser) or not await self.has_proxy(self._camera, IData):
log.warning("Either camera or focuser do not exist or are not of correct type at the moment.")
# publish initial states
await self.comm.set_state(IAutoFocus, AutoFocusState())
[docs]
@raises(exc.AbortedError, exc.FocusError)
@timeout(600)
async def auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> AutoFocusResult:
"""Perform an autofocus series.
This method performs an autofocus series with "count" images on each side of the initial guess and the given
step size. With count=3, step=1 and guess=10, this takes images at the following focus values:
7, 8, 9, 10, 11, 12, 13
Args:
count: Number of images to take on each side of the initial guess. Should be an odd number.
step: Step size.
exposure_time: Exposure time for images.
Returns:
Result of autofocus.
Raises:
ValueError: If focus could not be obtained.
"""
try:
log.info("Performing auto-focus...")
self._running = True
focus, error = await self._auto_focus(count, step, exposure_time, **kwargs)
return AutoFocusResult(focus=focus, focus_err=error)
finally:
self._running = False
async def _auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]:
# do camera settings
async with self.proxy(self._camera, ICamera) as camera:
await self._do_camera_settings(camera)
async with self.proxy(self._camera, IImageType) as proxy:
await proxy.set_image_type(ImageType.FOCUS)
# get filter wheel and current filter
filter_name = "unknown"
try:
async with self.proxy(self._filters, IFilters) as proxy:
filter_state = proxy.get_state(IFilters)
filter_name = filter_state.filter if filter_state is not None else "unknown"
except ValueError:
log.warning("Filter module is not of type IFilters. Could not get filter.")
# get focus as first guess
try:
async with self.proxy(self._focuser, IFocuser) as focuser:
if self._offset:
foc_state = focuser.get_state(IFocuser)
guess = (
0.0 if self._init_offset_to_zero else (foc_state.focus_offset if foc_state is not None else 0.0)
)
log.info("Using focus offset of %.2fmm as initial guess.", guess)
else:
foc_state = focuser.get_state(IFocuser)
guess = foc_state.focus if foc_state is not None else 0.0
log.info("Using current focus of %.2fmm as initial guess.", guess)
except exc.RemoteError:
raise exc.FocusError("Could not fetch current focus value.")
# define array of focus values to iterate
focus_values = np.linspace(guess - count * step, guess + count * step, 2 * count + 1)
# reset
self._series.reset()
self._abort = asyncio.Event()
# loop focus values
log.info("Starting focus series...")
for foc in focus_values:
# set focus
if self._offset:
log.info("Changing focus offset to %.2fmm...", foc)
else:
log.info("Changing focus to %.2fmm...", foc)
if self._abort.is_set():
raise exc.AbortedError()
try:
async with self.proxy(self._focuser, IFocuser) as focuser:
if self._offset:
await focuser.set_focus_offset(float(foc))
else:
await focuser.set_focus(float(foc))
except exc.RemoteError:
raise exc.FocusError("Could not set new focus value.")
# do exposure
log.info("Taking picture...")
if self._abort.is_set():
raise exc.AbortedError()
try:
filename = await self._take_image(exposure_time)
except exc.RemoteError:
log.error("Could not take image.")
continue
# download image
log.info("Downloading image...")
image = await self.vfs.read_image(filename)
# get actual focus
async with self.proxy(self._focuser, IFocuser) as focuser:
foc_state = focuser.get_state(IFocuser)
if self._offset:
actual_focus = foc_state.focus_offset if foc_state is not None else 0.0
else:
actual_focus = foc_state.focus if foc_state is not None else 0.0
# analyse
log.info("Analysing picture...")
try:
await self._series.analyse_image(image, actual_focus)
except Exception:
# do nothing...
log.info("Could not analyse image.")
continue
# publish state
await self.comm.set_state(IAutoFocus, AutoFocusState(points=self._series.get_data_points()))
# fit focus
if self._abort.is_set():
raise exc.AbortedError()
try:
focus = self._series.fit_focus()
except Exception as e:
# restore initial guess
async with self.proxy(self._focuser, IFocuser) as focuser:
if self._offset:
await focuser.set_focus_offset(float(guess))
else:
await focuser.set_focus(float(guess))
raise exc.FocusError(f"Could not calculate best focus: {e}")
# did focus series fail?
if focus is None or focus[0] is None or np.isnan(focus[0]):
log.warning("Focus series failed.")
# reset to initial values
async with self.proxy(self._focuser, IFocuser) as focuser:
if self._offset:
log.info("Resetting focus offset to 0.")
await focuser.set_focus_offset(0)
else:
log.info("Resetting focus to initial guess of %.3f mm.", guess)
await focuser.set_focus(guess)
# raise error
raise exc.FocusError("Could not find best focus.")
# log and set focus
async with self.proxy(self._focuser, IFocuser) as focuser:
foc_state = focuser.get_state(IFocuser)
if self._offset:
log.info("Setting new focus offset of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + (foc_state.focus if foc_state is not None else 0.0)
await focuser.set_focus_offset(focus[0])
else:
log.info("Setting new focus value of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + (foc_state.focus_offset if foc_state is not None else 0.0)
await focuser.set_focus(focus[0])
# send event
await self.comm.send_event(FocusFoundEvent(absolute, focus[1], filter_name))
# take final image?
if self._final_image:
log.info("Exposing final image at %.2f mm and broadcasting it...", absolute)
async with self.proxy(self._camera, IExposureTime) as camera:
await camera.set_exposure_time(exposure_time)
async with self.proxy(self._camera, IData) as camera:
await camera.grab_data()
# return result
return focus[0], focus[1]
async def _take_image(self, exposure_time: float) -> str:
async with self.proxy(self._camera, IExposureTime) as camera:
await camera.set_exposure_time(exposure_time)
async with self.proxy(self._camera, IData) as camera:
return await camera.grab_data(broadcast=self._broadcast)
[docs]
@timeout(20)
async def abort(self, **kwargs: Any) -> None:
"""Abort current actions."""
self._abort.set()
async def _on_bad_weather(self, event: Event, sender: str) -> bool:
"""Abort series if a bad weather event occurs.
Args:
event: The bad weather event.
sender: Who sent it.
"""
# check
if not isinstance(event, BadWeatherEvent):
raise ValueError("Wrong event type.")
# park
if self._running:
log.warning("Aborting focus series due to bad weather.")
self._abort.set()
return True
return False
__all__ = ["AutoFocusSeries"]