import asyncio
import logging
from typing import Any
from pyobs.interfaces import IRunnable
from pyobs.interfaces import IFlatField, IFilters, IBinning
from pyobs.modules import Module
from pyobs.modules import timeout
from pyobs.object import get_object
from pyobs.utils.parallel import event_wait
from pyobs.robotic.utils.skyflats.priorities.base import SkyflatPriorities
from pyobs.robotic.utils.skyflats.scheduler import Scheduler
from pyobs.utils.time import Time
log = logging.getLogger(__name__)
class FlatFieldScheduler(Module, IRunnable):
"""Run the flat-field scheduler."""
__module__ = "pyobs.modules.flatfield"
def __init__(
self,
flatfield: str | IFlatField,
functions: str | dict[str, str | dict[str, str]],
priorities: dict[str, Any] | SkyflatPriorities,
min_exptime: float = 0.5,
max_exptime: float = 5,
timespan: float = 7200,
filter_change: float = 30,
count: int = 20,
**kwargs: Any,
):
"""Initialize a new flat field scheduler.
Args:
flatfield: Flat field module to use
functions: Dict with flat functions
priorities: Class handling priorities
min_exptime: Minimum exposure time [s]
max_exptime: Maximum exposure time [s]
timespan: Time to scheduler after start [s]
filter_change: Time required for filter change [s]
count: Number of flats to take per filter/binning
"""
Module.__init__(self, **kwargs)
# store
self._flatfield = flatfield
self._count = count
self._running = False
# abort
self._abort = asyncio.Event()
# priorities
prio = get_object(priorities, SkyflatPriorities)
# create scheduler
self._scheduler = Scheduler(
functions,
prio,
self._observer,
min_exptime=min_exptime,
max_exptime=max_exptime,
timespan=timespan,
filter_change=filter_change,
count=count,
)
[docs]
async def open(self) -> None:
"""Open module"""
await Module.open(self)
# check flat field
try:
await self.proxy(self._flatfield, IFlatField)
except ValueError:
log.warning("Flatfield module does not exist or is not of correct type at the moment.")
[docs]
@timeout(7200)
async def run(self, **kwargs: Any) -> None:
"""Perform flat-fielding"""
# check
if self._running:
raise ValueError("Already running.")
self._running = True
try:
# start
log.info("Performing flat fielding...")
self._abort = asyncio.Event()
# get flat fielder
log.info("Getting proxy for flat fielder...")
flatfield = await self.proxy(self._flatfield, IFlatField)
# do schedule
log.info("Scheduling flats...")
await self._scheduler(Time.now())
# do flat fields
for item in self._scheduler:
# aborted?
if self._abort.is_set():
log.info("Scheduler aborted.")
break
# start
log.info("Taking %d flats in %s %dx%d...", self._count, item.filter_name, item.binning, item.binning)
if isinstance(flatfield, IFilters):
await flatfield.set_filter(item.filter_name)
if isinstance(flatfield, IBinning):
await flatfield.set_binning(*item.binning)
future = asyncio.create_task(flatfield.flat_field(self._count))
# wait for it
while not future.done():
# aborted?
if self._abort.is_set():
log.info("Aborting current flat field...")
await flatfield.abort()
# sleep a little
await event_wait(self._abort, 1)
# finished
log.info("Finished.")
finally:
self._running = False
[docs]
@timeout(20)
async def abort(self, **kwargs: Any) -> None:
"""Abort current actions."""
self._abort.set()
__all__ = ["FlatFieldScheduler"]