import logging
from typing import Any
import threading
import numpy as np
from pyobs.interfaces import IAutoFocus, ICamera
from pyobs.events import FocusFoundEvent, BadWeatherEvent, Event
from pyobs.interfaces import IExposureTime, IImageType, IFocuser, IFilters, IData
from pyobs.object import get_object
from pyobs.mixins import CameraSettingsMixin
from pyobs.modules import Module, timeout, raises
from pyobs.utils.enums import ImageType
from pyobs.utils.focusseries import FocusSeries
from pyobs.utils import exceptions as exc
log = logging.getLogger(__name__)
class AutoFocusSeries(Module, CameraSettingsMixin, IAutoFocus):
"""Module for auto-focusing a telescope."""
__module__ = "pyobs.modules.focus"
def __init__(
self,
focuser: str | IFocuser,
camera: str | IData,
series: dict[str, Any] | FocusSeries,
offset: bool = False,
init_offset_to_zero: bool = True,
filters: str | IFilters | None = None,
filter_name: str | None = None,
binning: int | None = None,
broadcast: bool = False,
final_image: bool = True,
**kwargs: Any,
):
"""Initialize a new auto focus system.
Args:
focuser: Name of IFocuser.
camera: Name of ICamera.
filters: Name of IFilters, if any.
filter_name: Name of filter to set.
offset: If True, offsets are used instead of absolute focus values.
init_offset_to_zero: If True, a zero offset is used as initial guess. Otherwise, the current value is used.
broadcast: Whether to broadcast focus series images.
final_image: Whether to take final image with optimal focus, which is always broadcasted.
"""
Module.__init__(self, **kwargs)
# store focuser and camera
self._focuser = focuser
self._camera = camera
self._filters = filters
self._offset = offset
self._init_offset_to_zero = init_offset_to_zero
self._abort = threading.Event()
self._running = False
self._broadcast = broadcast
self._final_image = final_image
# create focus series
self._series: FocusSeries = get_object(series, FocusSeries)
# init camera settings mixin
CameraSettingsMixin.__init__(self, filters=filters, filter_name=filter_name, binning=binning, **kwargs)
# register exceptions
if isinstance(camera, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=camera, callback=self._default_remote_error_callback
)
if isinstance(focuser, str):
exc.register_exception(
exc.RemoteError, 3, timespan=600, module=focuser, callback=self._default_remote_error_callback
)
[docs]
async def open(self) -> None:
"""Open module"""
await Module.open(self)
# register event
await self.comm.register_event(FocusFoundEvent)
await self.comm.register_event(BadWeatherEvent, self._on_bad_weather)
# check focuser and camera
try:
await self.proxy(self._focuser, IFocuser)
await self.proxy(self._camera, IData)
except ValueError:
log.warning("Either camera or focuser do not exist or are not of correct type at the moment.")
[docs]
@raises(exc.AbortedError, exc.FocusError)
@timeout(600)
async def auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]:
"""Perform an auto-focus series.
This method performs an auto-focus series with "count" images on each side of the initial guess and the given
step size. With count=3, step=1 and guess=10, this takes images at the following focus values:
7, 8, 9, 10, 11, 12, 13
Args:
count: Number of images to take on each side of the initial guess. Should be an odd number.
step: Step size.
exposure_time: Exposure time for images.
Returns:
Tuple of obtained best focus value and its uncertainty. Or Nones, if focus series failed.
Raises:
FileNotFoundException: If image could not be downloaded.
"""
try:
log.info("Performing auto-focus...")
self._running = True
return await self._auto_focus(count, step, exposure_time, **kwargs)
finally:
self._running = False
async def _auto_focus(self, count: int, step: float, exposure_time: float, **kwargs: Any) -> tuple[float, float]:
# get focuser
log.info("Getting proxy for focuser...")
focuser = await self.proxy(self._focuser, IFocuser)
# get camera
log.info("Getting proxy for camera...")
camera = await self.proxy(self._camera, IData)
# do camera settings
await self._do_camera_settings(camera)
if isinstance(camera, IImageType):
await camera.set_image_type(ImageType.FOCUS)
# get filter wheel and current filter
filter_name = "unknown"
try:
filter_wheel = await self.proxy(self._filters, IFilters)
filter_name = await filter_wheel.get_filter()
except ValueError:
log.warning("Filter module is not of type IFilters. Could not get filter.")
# get focus as first guess
try:
if self._offset:
guess = 0.0 if self._init_offset_to_zero else await focuser.get_focus_offset()
log.info(f"Using focus offset of {guess:.2f}mm as initial guess.")
else:
guess = await focuser.get_focus()
log.info("Using current focus of %.2fmm as initial guess.", guess)
except exc.RemoteError:
raise exc.FocusError("Could not fetch current focus value.")
# define array of focus values to iterate
focus_values = np.linspace(guess - count * step, guess + count * step, 2 * count + 1)
# reset
self._series.reset()
self._abort = threading.Event()
# loop focus values
log.info("Starting focus series...")
for foc in focus_values:
# set focus
if self._offset:
log.info("Changing focus offset to %.2fmm...", foc)
else:
log.info("Changing focus to %.2fmm...", foc)
if self._abort.is_set():
raise exc.AbortedError()
try:
if self._offset:
await focuser.set_focus_offset(float(foc))
else:
await focuser.set_focus(float(foc))
except exc.RemoteError:
raise exc.FocusError("Could not set new focus value.")
# do exposure
log.info("Taking picture...")
if self._abort.is_set():
raise exc.AbortedError()
try:
filename = await self._take_image(camera, exposure_time)
except exc.RemoteError:
log.error("Could not take image.")
continue
# download image
log.info("Downloading image...")
image = await self.vfs.read_image(filename)
# get actual focus
if self._offset:
actual_focus = await focuser.get_focus_offset()
else:
actual_focus = await focuser.get_focus()
# analyse
log.info("Analysing picture...")
try:
await self._series.analyse_image(image, actual_focus)
except:
# do nothing...
log.info("Could not analyse image.")
continue
# fit focus
if self._abort.is_set():
raise exc.AbortedError()
try:
focus = self._series.fit_focus()
except Exception as e:
# restore initial guess
if self._offset:
await focuser.set_focus_offset(float(guess))
else:
await focuser.set_focus(float(guess))
raise exc.FocusError(f"Could not calculate best focus: {e}")
# did focus series fail?
if focus is None or focus[0] is None or np.isnan(focus[0]):
log.warning("Focus series failed.")
# reset to initial values
if self._offset:
log.info("Resetting focus offset to 0.")
await focuser.set_focus_offset(0)
else:
log.info("Resetting focus to initial guess of %.3f mm.", guess)
await focuser.set_focus(guess)
# raise error
raise exc.FocusError("Could not find best focus.")
# log and set focus
if self._offset:
log.info("Setting new focus offset of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + await focuser.get_focus()
await focuser.set_focus_offset(focus[0])
else:
log.info("Setting new focus value of (%.3f+-%.3f) mm.", focus[0], focus[1])
absolute = focus[0] + await focuser.get_focus_offset()
await focuser.set_focus(focus[0])
# send event
await self.comm.send_event(FocusFoundEvent(absolute, focus[1], filter_name))
# take final image?
if self._final_image:
log.info("Exposing final image at %.2f mm and broadcasting it...", absolute)
if isinstance(camera, IExposureTime):
await camera.set_exposure_time(exposure_time)
if isinstance(camera, IData):
await camera.grab_data()
# return result
return focus[0], focus[1]
async def _take_image(self, camera: ICamera, exposure_time: float) -> str:
if isinstance(camera, IExposureTime):
await camera.set_exposure_time(exposure_time)
if isinstance(camera, IData):
return await camera.grab_data(broadcast=self._broadcast)
else:
raise exc.GeneralError("Cannot grab data from camera.")
[docs]
async def auto_focus_status(self, **kwargs: Any) -> dict[str, Any]:
"""Returns current status of auto focus.
Returned dictionary contains a list of focus/fwhm pairs in X and Y direction.
Returns:
Dictionary with current status.
"""
return {}
[docs]
@timeout(20)
async def abort(self, **kwargs: Any) -> None:
"""Abort current actions."""
self._abort.set()
async def _on_bad_weather(self, event: Event, sender: str) -> bool:
"""Abort series if a bad weather event occurs.
Args:
event: The bad weather event.
sender: Who sent it.
"""
# check
if not isinstance(event, BadWeatherEvent):
raise ValueError("Wrong event type.")
# park
if self._running:
log.warning("Aborting focus series due to bad weather.")
self._abort.set()
return True
return False
__all__ = ["AutoFocusSeries"]