Source code for pyobs.robotic.utils.skyflats.flatfielder

from __future__ import annotations

import asyncio
from enum import Enum
import logging
from typing import Dict, Union, Callable, Optional, Tuple, Any, cast, Coroutine
import astropy.units as u
from astropy.time import TimeDelta
import numpy as np

from pyobs.object import Object
from pyobs.utils.enums import ImageType
from pyobs.utils.fits import fitssec
from pyobs.utils.parallel import Future, event_wait
from pyobs.utils.time import Time
from .exptimeeval import ExpTimeEval
from .pointing import SkyFlatsBasePointing
from pyobs.interfaces import (
    ITelescope,
    ICamera,
    IFilters,
    IExposureTime,
    IBinning,
    IWindow,
    IImageType,
    IPointingAltAz,
)
from pyobs.images import Image

log = logging.getLogger(__name__)


class FlatFielder(Object):
    """Automatized flat-fielding."""

    __module__ = "pyobs.utils.skyflats"

[docs] class Twilight(Enum): DUSK = "dusk" DAWN = "dawn"
[docs] class State(Enum): INIT = "init" WAITING = "waiting" TESTING = "testing" RUNNING = "running" FINISHED = "finished"
def __init__( self, functions: Union[str, Dict[str, Union[str, Dict[str, str]]]], target_count: float = 30000, min_exptime: float = 0.5, max_exptime: float = 5, test_frame: Optional[Tuple[float, float, float, float]] = None, counts_frame: Optional[Tuple[float, float, float, float]] = None, allowed_offset_frac: float = 0.2, min_counts: int = 100, pointing: Optional[Union[Dict[str, Any], SkyFlatsBasePointing]] = None, callback: Optional[Callable[..., Coroutine[Any, Any, None]]] = None, **kwargs: Any, ): """Initialize a new flat fielder. Args: functions: Function f(h) for each filter to describe ideal exposure time as a function of solar elevation h, i.e. something like exp(-0.9*(h+3.9)). See ExpTimeEval for details. target_count: Count rate to aim for. min_exptime: Minimum exposure time. max_exptime: Maximum exposure time. test_frame: Tupel (left, top, width, height) in percent that describe the frame for on-sky testing. counts_frame: Tupel (left, top, width, height) in percent that describe the frame for calculating mean count rate. allowed_offset_frac: Offset from target_count (given in fraction of it) that's still allowed for good flat-field min_counts: Minimum counts in frames. observer: Observer to use. vfs: VFS to use. callback: Callback function for statistics. """ Object.__init__(self, **kwargs) # store stuff self._target_count = target_count self._min_exptime = min_exptime self._max_exptime = max_exptime self._test_frame = (45, 45, 10, 10) if test_frame is None else test_frame self._counts_frame = (25, 25, 75, 75) if counts_frame is None else counts_frame self._allowed_offset_frac = allowed_offset_frac self._min_counts = min_counts self._callback = callback # parse function self._eval = ExpTimeEval(self._observer, functions) # abort event self._abort = asyncio.Event() # pointing self._pointing: Optional[SkyFlatsBasePointing] = None if pointing is not None: self._pointing = ( pointing if isinstance(pointing, SkyFlatsBasePointing) else self.pyobs_model_validate(SkyFlatsBasePointing, pointing) ) # state machine self._state = FlatFielder.State.INIT # current exposure time self._exptime = 0.0 # median of last image self._median = 0.0 # exposures to do self._exposures_total = 0 self._exposures_done = 0 self._exptime_done = 0.0 # bias level self._bias_level = 0.0 # which twilight are we in? just init something self._twilight = FlatFielder.Twilight.DAWN # current request self._cur_filter: Optional[str] = None self._cur_binning: Tuple[int, int] = (1, 1) async def __call__( self, telescope: ITelescope, camera: ICamera, filters: Optional[IFilters] = None, filter_name: Optional[str] = None, count: int = 20, binning: Tuple[int, int] = (1, 1), ) -> State: """Calls next step in state machine. Args: telescope: Telescope to use. camera: Camera to use. filters: Filter wheel to use. filter_name: Name of filter to do flat fields in. count: Number of flat fields to take. binning: Binning for flat fields. Returns: Current state of flat fielder. """ # camera must support exposure times if not isinstance(camera, IExposureTime): raise ValueError("Camera must support exposure times.") # store self._cur_filter = filter_name self._cur_binning = binning self._exposures_total = count # which state are we in? if self._state == FlatFielder.State.INIT: # init task await self._init_system(telescope, camera, filters) elif self._state == FlatFielder.State.WAITING: # wait until exposure time reaches good time await self._wait() elif self._state == FlatFielder.State.TESTING: # do actual tests on sky for exposure time await self._testing(cast(ICamera, camera)) elif self._state == FlatFielder.State.RUNNING: # take flat fields await self._flat_field(telescope, cast(ICamera, camera)) # return current state return self._state
[docs] async def reset(self) -> None: """Reset flat fielder""" self._state = FlatFielder.State.INIT self._exposures_done = 0 self._exptime_done = 0 if self._pointing is not None: await self._pointing.reset()
@property def has_filters(self) -> bool: """Returns True, if functions are based on filters.""" return len(self._eval.filters) > 0 @property def image_count(self) -> int: return self._exposures_done @property def total_exptime(self) -> float: return self._exptime_done def _initial_check(self) -> bool: """Do a quick initial check. Returns: False, if flat-field time for this filter is over, True otherwise. """ # get solar elevation and evaluate function sun_alt, self._exptime = self._eval_function(Time.now()) log.info( "Calculated optimal exposure time of %.2fs in %dx%d at solar elevation of %.2f°.", self._exptime, self._cur_binning[0], self._cur_binning[1], sun_alt, ) # then evaluate exposure time within a larger range state = self._eval_exptime(self._min_exptime * 0.5, self._max_exptime * 2.0) if state > 0: log.info("Missed flat-fielding time, finishing task...") self._state = FlatFielder.State.FINISHED return False else: log.info("Flat-field time is still coming, keep going...") return True async def _init_system( self, telescope: ITelescope, camera: Union[ICamera, IExposureTime], filters: Optional[IFilters] = None ) -> None: """Initialize whole system.""" # which twilight are we in? if self._observer is None: raise ValueError("No observer given.") sun = self.observer.sun_altaz(Time.now()) sun_10min = self.observer.sun_altaz(Time.now() + TimeDelta(10 * u.minute)) self._twilight = ( FlatFielder.Twilight.DUSK if sun_10min.alt.degree < sun.alt.degree else FlatFielder.Twilight.DAWN ) log.info("We are currently in %s twilight.", self._twilight.value) # do initial check if not self._initial_check(): return # set binning if isinstance(camera, IBinning): log.info("Setting binning to %dx%d...", self._cur_binning[0], self._cur_binning[1]) await camera.set_binning(*self._cur_binning) # get bias level self._bias_level = await self._get_bias(camera) if isinstance(camera, ICamera) else 0.0 # move telescope future_track = ( self._pointing(telescope) if self._pointing is not None and isinstance(telescope, IPointingAltAz) else None ) # get filter from first step and set it future_filter: asyncio.Task[Any] | Future if filters is not None and self._cur_filter is not None: log.info("Setting filter to %s...", self._cur_filter) future_filter = asyncio.create_task(filters.set_filter(self._cur_filter)) else: future_filter = Future(empty=True) # wait for both await Future.wait_all([future_track, future_filter]) log.info("Finished initializing system.") # change stats log.info("Waiting for flat-field time...") self._state = FlatFielder.State.WAITING async def _get_bias(self, camera: ICamera) -> float: """Take bias image to determine bias level. Returns: Median bias level. """ log.info("Taking BIAS image to determine median level...") # set full frame cam = camera if isinstance(camera, IWindow): full_frame = await camera.get_full_frame() await camera.set_window(*full_frame) # take image if isinstance(camera, IExposureTime): await camera.set_exposure_time(0.0) if isinstance(camera, IImageType): await camera.set_image_type(ImageType.BIAS) filename = await cam.grab_data(broadcast=False) # download image bias = await self.vfs.read_image(filename) avg = float(np.median(bias.data)) # return it log.info("Found median BIAS level of %.2f...", avg) return avg async def _wait(self) -> None: """Wait for flat-field time.""" # get solar elevation and evaluate function sun_alt, self._exptime = self._eval_function(Time.now()) log.info( "Calculated optimal exposure time of %.2fs in %dx%d at solar elevation of %.2f°.", self._exptime, self._cur_binning[0], self._cur_binning[1], sun_alt, ) # then evaluate exposure time within a larger range state = self._eval_exptime(self._min_exptime * 0.5, self._max_exptime * 2.0) if state < 0: log.info("Sleeping a little...") await event_wait(self._abort, 10) elif state == 0: log.info("Starting to take test flat-fields...") self._state = FlatFielder.State.TESTING else: log.info("Missed flat-fielding time, finish task...") self._state = FlatFielder.State.FINISHED def _eval_function(self, time: Time) -> Tuple[float, float]: """Evaluate function for given filter at given time. Args: time: Time to evaluate function at. Returns: Estimated exposure time. """ # get solar elevation and evaluate exptime if self._observer is None: raise ValueError("No observer given.") sun = self.observer.sun_altaz(time) exptime = self._eval(sun.alt.degree, binning=self._cur_binning, filter_name=self._cur_filter) # return solar altitude and exposure time return float(sun.alt.degree), exptime def _eval_exptime(self, min_exptime: Optional[float] = None, max_exptime: Optional[float] = None) -> int: """Evaluates current exposure time. Sets new state or waits of necessary. Returns: -1, if we have to wait, 0 during flat-field time and 1 if it has passed. """ # no min/max given? if min_exptime is None: min_exptime = self._min_exptime if max_exptime is None: max_exptime = self._max_exptime # need to wait, change status or are we finished? if (self._twilight == FlatFielder.Twilight.DUSK and self._exptime > max_exptime) or ( self._twilight == FlatFielder.Twilight.DAWN and self._exptime < min_exptime ): # in DUSK, if exptime is greater than max exptime, we're past flatfielding time # in DAWN, if exptime is less than min exptime, we're past flatfielding time return 1 elif (self._twilight == FlatFielder.Twilight.DUSK and self._exptime < min_exptime) or ( self._twilight == FlatFielder.Twilight.DAWN and self._exptime > max_exptime ): # in DUSK, if exptime is less than max exptime, we still need to wait # in DAWN, if exptime is greater than min exptime, we still need to wait return -1 else: # otherwise it seems that we're in the middle of flat-fielding time return 0 async def _testing(self, camera: ICamera) -> None: """Take flat-fields but don't store them.""" # set window await self._set_window(camera, testing=True) # do exposures, do not broadcast while testing log.info("Exposing test flat field for %.2fs...", self._exptime) cam = camera if isinstance(camera, IExposureTime): await camera.set_exposure_time(float(self._exptime)) if isinstance(camera, IImageType): await camera.set_image_type(ImageType.SKYFLAT) filename = await cam.grab_data(broadcast=False) # analyse image await self._analyse_image(filename) # then evaluate exposure time state = self._eval_exptime() if state < 0: log.info("Sleeping a little...") await event_wait(self._abort, 10) elif state == 0: log.info("Starting to store flat-fields...") self._state = FlatFielder.State.RUNNING else: log.info("Missed flat-fielding time, finish task...") self._state = FlatFielder.State.FINISHED async def _set_window(self, camera: Union[ICamera, IExposureTime], testing: bool) -> None: """Set camera window. Args: testing: Whether we're in testing mode or not. """ if isinstance(camera, IWindow): # get full frame left, top, width, height = await camera.get_full_frame() # if testing, take test frame, otherwise use full frame if testing: left, top, width, height = ( int(left + self._test_frame[0] / 100 * width), int(top + self._test_frame[1] / 100 * width), int(self._test_frame[2] / 100 * width), int(self._test_frame[3] / 100 * height), ) # set it log.info("Setting camera window to %dx%d at %d,%d...", width, height, left, top) await camera.set_window(left, top, width, height) async def _analyse_image(self, filename: str) -> bool: """Analyze image and return whether it's okay. Args: filename: Filename of image. Returns: Whether flat-field is okay. """ # download image flat_field = await self.vfs.read_image(filename) if flat_field is None: return False # get median from image self._median = self._get_image_median(flat_field, self._counts_frame) log.info("Got a flat field with median counts of %.2f.", self._median) # if count rate is too low, don't use this image to calculate new exposure time if self._median < self._min_counts: log.warning("Median counts (%d) too low, retrying last image with same exposure time...", self._median) return False else: # calculate deviation from target counts frac = abs(1.0 - self._median / self._target_count) # calculate new exposure time self._calc_new_exptime() # log and return if frac > self._target_count: log.warning("Deviation from target count (%.1f%%) is larger than allowed, retrying last image...", frac) return False else: log.info("Calculated new exposure time to be %.2fs.", self._exptime) return True @staticmethod def _get_image_median(image: Image, frame: Optional[Tuple[float, float, float, float]] = None) -> float: """Returns median of image after trimming it to TRIMSEC and to given frame. Args: image: Image to calculate median for. frame: Frame coordinates as (width, top, width, height) in percent of full frame. Returns: Median of image. """ # trim image to TRIMSEC data = fitssec(image, "TRIMSEC") # cut to frame if frame is not None: width, height = data.shape data = data[ int(frame[0] / 100 * width) : int((frame[0] + frame[2]) / 100 * width), int(frame[1] / 100 * height) : int((frame[1] + frame[3]) / 100 * height), ] # return median return float(np.median(data)) def _calc_new_exptime(self) -> None: """Calculate new exposure time.""" # calculate factor for new exposure time factor = (self._target_count - self._bias_level) / (self._median - self._bias_level) # limit factor to 0.1-10 factor = min(10.0, max(0.1, factor)) # calculate next exposure time self._exptime *= factor async def _flat_field(self, telescope: ITelescope, camera: ICamera) -> None: """Take flat-fields.""" # set window await self._set_window(camera, testing=False) # move telescope if self._pointing is not None and isinstance(telescope, IPointingAltAz): await self._pointing(telescope) # do exposures, do not broadcast while testing now = Time.now() log.info( "Exposing flat field %d/%d for %.2fs...", self._exposures_done + 1, self._exposures_total, self._exptime ) if isinstance(camera, IExposureTime): await camera.set_exposure_time(float(self._exptime)) if isinstance(camera, IImageType): await camera.set_image_type(ImageType.SKYFLAT) filename = await camera.grab_data() # analyse image if await self._analyse_image(filename): # increase count and quite here, if finished self._exptime_done += self._exptime self._exposures_done += 1 if self._exposures_done >= self._exposures_total: log.info("Finished all requested flat-fields..") self._state = FlatFielder.State.FINISHED return # call callback if self._callback is not None: if self._observer is None: raise ValueError("No observer given.") sun = self.observer.sun_altaz(now) await self._callback( datetime=now.isot, solalt=sun.alt.degree, exptime=self._exptime, counts=self._target_count, filter_name=self._cur_filter, binning=self._cur_binning, ) # then evaluate exposure time state = self._eval_exptime() if state < 0: log.info("Going back to testing...") self._state = FlatFielder.State.TESTING elif state == 0: pass else: log.info("Missed flat-fielding time, finish task...") self._state = FlatFielder.State.FINISHED
[docs] async def abort(self) -> None: """Abort current actions.""" self._abort.set()
__all__ = ["FlatFielder"]