from __future__ import annotations
import asyncio
import json
import logging
import time
from typing import Union, Any, Dict
import astropy.units as u
from astropy.time import TimeDelta
from pyobs.events import GoodWeatherEvent, Event, TaskFailedEvent, TaskStartedEvent, TaskFinishedEvent
from pyobs.robotic.scheduler import TaskScheduler
from pyobs.utils.time import Time
from pyobs.interfaces import IStartStop, IRunnable
from pyobs.modules import Module
from pyobs.robotic import TaskArchive, ObservationArchive, Task, ObservationList, Project
log = logging.getLogger(__name__)
class Scheduler(Module, IStartStop, IRunnable):
"""Scheduler."""
__module__ = "pyobs.modules.robotic"
def __init__(
self,
scheduler: dict[str, Any] | TaskScheduler,
tasks: Union[Dict[str, Any], TaskArchive],
schedule: Union[Dict[str, Any], ObservationArchive],
trigger_on_task_started: bool = False,
trigger_on_task_finished: bool = False,
trigger_on_every_update: bool = False,
schedule_range: float = 24.0,
safety_time: float = 300,
min_safety_time: float = 20,
**kwargs: Any,
):
"""Initialize a new scheduler.
Args:
scheduler: Scheduler to use.
tasks: Task archive to use.
schedule: Task schedule to use.
trigger_on_task_started: Whether to trigger a re-calculation of schedule, when task has started.
trigger_on_task_finishes: Whether to trigger a re-calculation of schedule, when task has finished.
schedule_range: Number of hours to schedule into the future
safety_time: If no ETA for next task to start exists (from current task, weather became good, etc), use
this time in seconds to make sure that we don't schedule for a time when the scheduler is
still running
min_safety_time: Minimum safety time.
"""
Module.__init__(self, **kwargs)
# get scheduler
self._task_archive = self.add_child_object(tasks, TaskArchive, on_tasks_changed=self._update_schedule)
self._schedule = self.add_child_object(schedule, ObservationArchive, auto_update=False)
self._scheduler = self.add_child_object(scheduler, TaskScheduler, observation_archive=self._schedule)
# store
self._running = True
self._initial_update_done = False
self._need_update = False
self._trigger_on_task_started = trigger_on_task_started
self._trigger_on_task_finished = trigger_on_task_finished
self._trigger_on_every_update = trigger_on_every_update
self._schedule_range = schedule_range * u.hour
self._safety_time = safety_time * u.second
self._min_safety_time = min_safety_time * u.second
# time to start next schedule from
self._schedule_start: Time = Time.now()
# ID of currently running task, and current (or last if finished) block
self._current_task_id = None
self._last_task_id = None
# tasks
self._tasks: list[Task] = []
self._projects: list[Project] = []
# update threads
self.add_background_task(self._schedule_worker)
[docs]
async def open(self) -> None:
"""Open module."""
await Module.open(self)
# subscribe to events
if self._comm:
await self.comm.register_event(TaskStartedEvent, self._on_task_started)
await self.comm.register_event(TaskFinishedEvent, self._on_task_finished)
await self.comm.register_event(TaskFailedEvent, self._on_task_finished)
await self.comm.register_event(GoodWeatherEvent, self._on_good_weather)
[docs]
async def start(self, **kwargs: Any) -> None:
"""Start scheduler."""
self._running = True
[docs]
async def stop(self, **kwargs: Any) -> None:
"""Stop scheduler."""
self._running = False
[docs]
async def is_running(self, **kwargs: Any) -> bool:
"""Whether scheduler is running."""
return self._running
async def _update_schedule(self) -> None:
# get schedulable tasks and sort them
log.info("Found update in schedulable block, downloading them...")
tasks = sorted(
await self._task_archive.get_schedulable_tasks(),
key=lambda x: json.dumps(x.id, sort_keys=True),
)
log.info("Downloaded %d schedulable tasks(s).", len(tasks))
# download projects
self._projects = await self._task_archive.get_projects()
# compare new and old lists
removed, added = self._compare_task_lists(self._tasks, tasks)
# schedule update
self._need_update = True
# no changes?
if not self._trigger_on_every_update and len(removed) == 0 and len(added) == 0:
# no need to re-schedule
log.info("No change in list of blocks detected.")
self._need_update = False
# has only the current block been removed?
log.info("Removed: %d, added: %d", len(removed), len(added))
if len(removed) == 1:
log.info(
"Found 1 removed block with ID %d. Last task ID was %s, current is %s.",
removed[0],
str(self._last_task_id),
str(self._current_task_id),
)
if len(removed) == 1 and len(added) == 0 and removed[0] == self._last_task_id:
# no need to re-schedule
log.info("Only one removed block detected, which is the one currently running.")
self._need_update = False
# check, if one of the removed blocks was actually in schedule
if len(removed) > 0 and self._need_update:
schedule = await self._schedule.get_schedule()
removed_from_schedule = [s for s in schedule if s.task.id in removed]
if len(removed_from_schedule) == 0:
log.info(f"Found {len(removed)} tasks, but none of them was scheduled.")
self._need_update = False
# store blocks
self._tasks = tasks
# schedule update
if self._need_update:
log.info("Triggering scheduler run...")
# remember now
self._initial_update_done = True
@staticmethod
def _compare_task_lists(tasks1: list[Task], tasks2: list[Task]) -> tuple[list[Any], list[Any]]:
"""Compares two lists of tasks and returns two lists, containing those that are missing in list 1
and list 2, respectively.
Args:
tasks1: First list of tasks.
tasks2: Second list of tasks.
Returns:
(tuple): Tuple containing:
unique1: Blocks that exist in tasks1, but not in tasks2.
unique2: Blocks that exist in tasks2, but not in tasks1.
"""
# get dictionaries with block ids
ids1 = {t.id: t for t in tasks1}
ids2 = {t.id: t for t in tasks2}
# find elements in ids1 that are missing in ids2 and vice versa
additional1 = list(set(ids1.keys()).difference(ids2.keys()))
additional2 = list(set(ids2.keys()).difference(ids1.keys()))
return sorted(additional1), sorted(additional2)
async def _schedule_worker(self) -> None:
await asyncio.sleep(5)
# run forever
while True:
# need update?
if self._need_update and self._initial_update_done:
# reset need for update
self._need_update = False
try:
# TODO: add abort (see old robotic/scheduler.py)
# start time
start_time = time.time()
# schedule start must be at least safety_time in the future
start = self._schedule_start
if start - Time.now() < self._safety_time:
start = Time.now() + TimeDelta(self._safety_time)
end = start + TimeDelta(self._schedule_range)
# do we have an observation running?
running_obs = await self._schedule.get_current_observation(self._task_archive)
if running_obs is not None and running_obs.end < start:
start = running_obs.end
# clear future schedule
await self._schedule.clear_schedule(start)
# schedule
scheduled_tasks = ObservationList()
first = True
async for scheduled_task in self._scheduler.schedule(self._tasks, self._projects, start, end):
# remember for later
scheduled_tasks.append(scheduled_task)
if self._need_update:
log.info("Not using scheduler results, since update was requested.")
break
# on first task, we have to clear the schedule
if first:
first = False
log.info("Finished calculating next task:")
self._log_scheduled_task(ObservationList([scheduled_task]))
# set new safety_time as duration + 20%, but at least _min_safety_time
self._safety_time = max((time.time() - start_time) * 1.2 * u.second, self._min_safety_time)
# submit it
await self._schedule.add_observations(ObservationList([scheduled_task]))
if self._need_update:
log.info("Not using scheduler results, since update was requested.")
continue
# log it
log.info("Finished calculating schedule for %d block(s):", len(scheduled_tasks))
self._log_scheduled_task(scheduled_tasks)
log.info("Done.")
# submit it
await self._schedule.add_observations(scheduled_tasks[1:])
# clean up
del scheduled_tasks
except asyncio.CancelledError:
return
except:
log.exception("Something went wrong")
# sleep a little
await asyncio.sleep(1)
@staticmethod
def _log_scheduled_task(scheduled_tasks: ObservationList) -> None:
for scheduled_task in scheduled_tasks:
msg = f" - {scheduled_task.start.strftime('%H:%M:%S')} to {scheduled_task.end.strftime('%H:%M:%S')}: "
msg += f"{scheduled_task.task.name} ({scheduled_task.task.id}"
if scheduled_task.priority is not None:
msg += f", priority: {scheduled_task.priority}"
msg += ")"
log.info(msg)
[docs]
async def run(self, **kwargs: Any) -> None:
"""Trigger a re-schedule."""
self._need_update = True
async def _on_task_started(self, event: Event, sender: str) -> bool:
"""Re-schedule when task has started and we can predict its end.
Args:
event: The task started event.
sender: Who sent it.
"""
if not isinstance(event, TaskStartedEvent):
return False
# store it
self._current_task_id = event.id
self._last_task_id = event.id
# trigger?
if self._trigger_on_task_started:
# get ETA in minutes
eta = (event.eta - Time.now()).sec / 60
log.info("Received task started event with ETA of %.0f minutes, triggering new scheduler run...", eta)
# set it
self._need_update = True
self._schedule_start = event.eta if event.eta is not None else Time.now()
return True
async def _on_task_finished(self, event: Event, sender: str) -> bool:
"""Reset current task, when it has finished.
Args:
event: The task finished event.
sender: Who sent it.
"""
if not isinstance(event, TaskFinishedEvent):
return False
# reset current task
self._current_task_id = None
# trigger?
if self._trigger_on_task_finished:
# get ETA in minutes
log.info("Received task finished event, triggering new scheduler run...")
# set it
self._need_update = True
self._schedule_start = Time.now()
return True
async def _on_good_weather(self, event: Event, sender: str) -> bool:
"""Re-schedule on incoming good weather event.
Args:
event: The good weather event.
sender: Who sent it.
"""
if not isinstance(event, GoodWeatherEvent):
return False
# get ETA in minutes
eta = (event.eta - Time.now()).sec / 60
log.info("Received good weather event with ETA of %.0f minutes, triggering new scheduler run...", eta)
# set it
self._need_update = True
self._schedule_start = event.eta if event.eta is not None else Time.now()
return True
[docs]
async def abort(self, **kwargs: Any) -> None:
pass
__all__ = ["Scheduler"]