import asyncio
import logging
from typing import Any
import astropy.units as u
from pyobs.events import TaskFailedEvent, TaskFinishedEvent, TaskStartedEvent
from pyobs.interfaces import IAutonomous, IFitsHeaderBefore
from pyobs.modules import Module
from pyobs.robotic import (
Observation,
ObservationArchive,
ObservationState,
Task,
TaskArchive,
TaskRunner,
)
from pyobs.utils.time import Time
log = logging.getLogger(__name__)
class Mastermind(Module, IAutonomous, IFitsHeaderBefore):
"""Mastermind for a full robotic mode."""
__module__ = "pyobs.modules.robotic"
def __init__(
self,
schedule: ObservationArchive | dict[str, Any],
runner: TaskRunner | dict[str, Any],
tasks: TaskArchive | dict[str, Any] | None = None,
allowed_late_start: int = 300,
allowed_overrun: int = 300,
after_task_sleep: int = 0,
**kwargs: Any,
):
"""Initialize a new auto focus system.
Args:
schedule: Object that can return schedule.
allowed_late_start: Allowed seconds to start late.
allowed_overrun: Allowed time for a task to exceed it's window in seconds
"""
Module.__init__(self, **kwargs)
# store
self._allowed_late_start = allowed_late_start
self._allowed_overrun = allowed_overrun
self._running = False
self._after_task_sleep = after_task_sleep
self._last_cant_run_reason: dict[Any, str] = {}
# add thread func
self.add_background_task(self._run_thread, True)
# get schedule and runner
self._task_archive = self.add_child_object(tasks, TaskArchive)
self._observation_archive = self.add_child_object(schedule, ObservationArchive)
self._task_runner = self.add_child_object(runner, TaskRunner, observation_archive=self._observation_archive)
# observation name and exposure number
self._task: Task | None = None
[docs]
async def open(self) -> None:
"""Open module."""
await Module.open(self)
# subscribe to events
if self._comm:
await self.comm.register_event(TaskStartedEvent)
await self.comm.register_event(TaskFinishedEvent)
# start
self._running = True
[docs]
async def start(self, **kwargs: Any) -> None:
"""Starts a service."""
log.info("Starting robotic system...")
self._running = True
[docs]
async def stop(self, **kwargs: Any) -> None:
"""Stops a service."""
log.info("Stopping robotic system...")
self._running = False
[docs]
async def is_running(self, **kwargs: Any) -> bool:
"""Whether a service is running."""
return self._running
async def _run_thread(self) -> None:
# wait a little
await asyncio.sleep(5)
# flags
first_late_start_warning = True
# run until closed
while True:
# not running?
if not self._running:
# sleep a little and continue
await asyncio.sleep(1)
continue
# get now
now = Time.now()
# find task that we want to run now
observation: Observation | None = await self._observation_archive.get_next_observation(
now, self._task_archive
)
if observation is None:
await asyncio.sleep(10)
continue
if not await self._task_runner.can_run(observation.task):
reason = self._task_runner.cant_run_reason(observation.task)
if reason is not None:
last = self._last_cant_run_reason.get(observation.task.id)
if last != reason:
log.info("Task %s cannot run: %s", observation.task.name, reason)
self._last_cant_run_reason[observation.task.id] = reason
await asyncio.sleep(10)
continue
# task can run — clear stored reason
self._last_cant_run_reason.pop(observation.task.id, None)
# starting too late?
if not observation.task.can_start_late:
late_start = now - observation.start
if late_start > self._allowed_late_start * u.second:
# only warn once
if first_late_start_warning:
log.warning(
"Time since start of window (%.1f) too long (>%.1f), skipping task...",
late_start.to_value("second"),
self._allowed_late_start,
)
first_late_start_warning = False
# sleep a little and skip
await asyncio.sleep(10)
continue
# reset warning
first_late_start_warning = True
# task is definitely not None here
self._task = observation.task
# ETA
now = Time.now()
eta = now + self._task.duration * u.second
# send event and change state
await self.comm.send_event(TaskStartedEvent(name=self._task.name, id=self._task.id, eta=eta))
observation.state = ObservationState.IN_PROGRESS
observation.start = now
observation.end = eta
await self._observation_archive.update_observation(observation)
# run task in thread
log.info("Running task %s...", self._task.name)
try:
await self._task_runner.run_task(self._task)
except Exception:
# something went wrong
log.exception("Task %s failed.", self._task.name)
observation.end = Time.now()
observation.state = ObservationState.FAILED
await self._observation_archive.update_observation(observation)
await self.comm.send_event(TaskFailedEvent(name=self._task.name, id=self._task.id))
self._task = None
continue
# send event and change state
await self.comm.send_event(TaskFinishedEvent(name=self._task.name, id=self._task.id))
observation.end = Time.now()
observation.state = ObservationState.COMPLETED
await self._observation_archive.update_observation(observation)
# finish
log.info("Finished task %s.", self._task.name)
self._task = None
# sleep?
await asyncio.sleep(self._after_task_sleep)
__all__ = ["Mastermind"]