Source code for pyobs.modules.robotic.scheduler

from __future__ import annotations

import asyncio
import json
import logging
import time
from typing import Any

import astropy.units as u
from astropy.time import TimeDelta

from pyobs.events import (
    Event,
    GoodWeatherEvent,
    TaskFailedEvent,
    TaskFinishedEvent,
    TaskStartedEvent,
)
from pyobs.interfaces import IRunnable, IStartStop
from pyobs.modules import Module
from pyobs.robotic import (
    ObservationArchive,
    ObservationList,
    Project,
    Task,
    TaskArchive,
)
from pyobs.robotic.scheduler import TaskScheduler
from pyobs.utils.time import Time

log = logging.getLogger(__name__)


class Scheduler(Module, IStartStop, IRunnable):
    """Scheduler."""

    __module__ = "pyobs.modules.robotic"

    def __init__(
        self,
        scheduler: dict[str, Any] | TaskScheduler,
        tasks: dict[str, Any] | TaskArchive,
        schedule: dict[str, Any] | ObservationArchive,
        trigger_on_task_started: bool = False,
        trigger_on_task_finished: bool = False,
        trigger_on_every_update: bool = False,
        schedule_range: float = 24.0,
        safety_time: float = 300,
        min_safety_time: float = 20,
        **kwargs: Any,
    ):
        """Initialize a new scheduler.

        Args:
            scheduler: Scheduler to use.
            tasks: Task archive to use.
            schedule: Task schedule to use.
            trigger_on_task_started: Whether to trigger a re-calculation of schedule, when task has started.
            trigger_on_task_finishes: Whether to trigger a re-calculation of schedule, when task has finished.
            schedule_range: Number of hours to schedule into the future
            safety_time: If no ETA for next task to start exists (from current task, weather became good, etc), use
                         this time in seconds to make sure that we don't schedule for a time when the scheduler is
                         still running
            min_safety_time: Minimum safety time.
        """
        Module.__init__(self, **kwargs)

        # get scheduler
        self._task_archive = self.add_child_object(tasks, TaskArchive, on_tasks_changed=self._update_schedule)
        self._schedule = self.add_child_object(schedule, ObservationArchive, auto_update=False)
        self._scheduler = self.add_child_object(scheduler, TaskScheduler, observation_archive=self._schedule)

        # store
        self._running = True
        self._initial_update_done = False
        self._need_update = False
        self._trigger_on_task_started = trigger_on_task_started
        self._trigger_on_task_finished = trigger_on_task_finished
        self._trigger_on_every_update = trigger_on_every_update
        self._schedule_range = schedule_range * u.hour
        self._safety_time = safety_time * u.second
        self._min_safety_time = min_safety_time * u.second

        # time to start next schedule from
        self._schedule_start: Time = Time.now()

        # ID of currently running task, and current (or last if finished) block
        self._current_task_id = None
        self._last_task_id = None

        # tasks
        self._tasks: list[Task] = []
        self._projects: list[Project] = []

        # update threads
        self.add_background_task(self._schedule_worker)

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) # subscribe to events if self._comm: await self.comm.register_event(TaskStartedEvent, self._on_task_started) await self.comm.register_event(TaskFinishedEvent, self._on_task_finished) await self.comm.register_event(TaskFailedEvent, self._on_task_finished) await self.comm.register_event(GoodWeatherEvent, self._on_good_weather)
[docs] async def start(self, **kwargs: Any) -> None: """Start scheduler.""" self._running = True
[docs] async def stop(self, **kwargs: Any) -> None: """Stop scheduler.""" self._running = False
[docs] async def is_running(self, **kwargs: Any) -> bool: """Whether scheduler is running.""" return self._running
async def _update_schedule(self) -> None: # get schedulable tasks and sort them log.info("Found update in schedulable block, downloading them...") tasks = sorted( await self._task_archive.get_schedulable_tasks(), key=lambda x: json.dumps(x.id, sort_keys=True), ) log.info("Downloaded %d schedulable tasks(s).", len(tasks)) # download projects self._projects = await self._task_archive.get_projects() # compare new and old lists removed, added = self._compare_task_lists(self._tasks, tasks) # schedule update self._need_update = True # no changes? if not self._trigger_on_every_update and len(removed) == 0 and len(added) == 0: # no need to re-schedule log.info("No change in list of blocks detected.") self._need_update = False # has only the current block been removed? log.info("Removed: %d, added: %d", len(removed), len(added)) if len(removed) == 1: log.info( "Found 1 removed block with ID %d. Last task ID was %s, current is %s.", removed[0], str(self._last_task_id), str(self._current_task_id), ) if len(removed) == 1 and len(added) == 0 and removed[0] == self._last_task_id: # no need to re-schedule log.info("Only one removed block detected, which is the one currently running.") self._need_update = False # check, if one of the removed blocks was actually in schedule if len(removed) > 0 and self._need_update: schedule = await self._schedule.get_schedule() removed_from_schedule = [s for s in schedule if s.task.id in removed] if len(removed_from_schedule) == 0: log.info("Found %s tasks, but none of them was scheduled.", len(removed)) self._need_update = False # store blocks self._tasks = tasks # schedule update if self._need_update: log.info("Triggering scheduler run...") # remember now self._initial_update_done = True @staticmethod def _compare_task_lists(tasks1: list[Task], tasks2: list[Task]) -> tuple[list[Any], list[Any]]: """Compares two lists of tasks and returns two lists, containing those that are missing in list 1 and list 2, respectively. Args: tasks1: First list of tasks. tasks2: Second list of tasks. Returns: (tuple): Tuple containing: unique1: Blocks that exist in tasks1, but not in tasks2. unique2: Blocks that exist in tasks2, but not in tasks1. """ # get dictionaries with block ids ids1 = {t.id: t for t in tasks1} ids2 = {t.id: t for t in tasks2} # find elements in ids1 that are missing in ids2 and vice versa additional1 = list(set(ids1.keys()).difference(ids2.keys())) additional2 = list(set(ids2.keys()).difference(ids1.keys())) return sorted(additional1), sorted(additional2) async def _schedule_worker(self) -> None: await asyncio.sleep(5) # run forever while True: # need update? if self._need_update and self._initial_update_done: # reset need for update self._need_update = False try: # TODO: add abort (see old robotic/scheduler.py) # start time start_time = time.time() # schedule start must be at least safety_time in the future start = self._schedule_start if start - Time.now() < self._safety_time: start = Time.now() + TimeDelta(self._safety_time) end = start + TimeDelta(self._schedule_range) # do we have an observation running? running_obs = await self._schedule.get_current_observation(self._task_archive) if running_obs is not None and running_obs.end < start: start = running_obs.end # clear future schedule await self._schedule.clear_schedule(start) # schedule scheduled_tasks = ObservationList() first = True async for scheduled_task in self._scheduler.schedule(self._tasks, self._projects, start, end): # remember for later scheduled_tasks.append(scheduled_task) if self._need_update: log.info("Not using scheduler results, since update was requested.") break # on first task, we have to clear the schedule if first: first = False log.info("Finished calculating next task:") self._log_scheduled_task(ObservationList([scheduled_task])) # set new safety_time as duration + 20%, but at least _min_safety_time self._safety_time = max((time.time() - start_time) * 1.2 * u.second, self._min_safety_time) # submit it await self._schedule.add_observations(ObservationList([scheduled_task])) if self._need_update: log.info("Not using scheduler results, since update was requested.") continue # log it log.info("Finished calculating schedule for %d block(s):", len(scheduled_tasks)) self._log_scheduled_task(scheduled_tasks) log.info("Done.") # submit it await self._schedule.add_observations(scheduled_tasks[1:]) # clean up del scheduled_tasks except asyncio.CancelledError: return except Exception: log.exception("Something went wrong") # sleep a little await asyncio.sleep(1) @staticmethod def _log_scheduled_task(scheduled_tasks: ObservationList) -> None: for scheduled_task in scheduled_tasks: msg = f" - {scheduled_task.start.strftime('%H:%M:%S')} to {scheduled_task.end.strftime('%H:%M:%S')}: " msg += f"{scheduled_task.task.name} ({scheduled_task.task.id}" if scheduled_task.priority is not None: msg += f", priority: {scheduled_task.priority}" msg += ")" log.info(msg)
[docs] async def run(self, **kwargs: Any) -> None: """Trigger a re-schedule.""" self._need_update = True
async def _on_task_started(self, event: Event, sender: str) -> bool: """Re-schedule when task has started and we can predict its end. Args: event: The task started event. sender: Who sent it. """ if not isinstance(event, TaskStartedEvent): return False # store it self._current_task_id = event.id self._last_task_id = event.id # trigger? if self._trigger_on_task_started: # get ETA in minutes eta = (event.eta - Time.now()).sec / 60 log.info("Received task started event with ETA of %.0f minutes, triggering new scheduler run...", eta) # set it self._need_update = True self._schedule_start = event.eta if event.eta is not None else Time.now() return True async def _on_task_finished(self, event: Event, sender: str) -> bool: """Reset current task, when it has finished. Args: event: The task finished event. sender: Who sent it. """ if not isinstance(event, TaskFinishedEvent): return False # reset current task self._current_task_id = None # trigger? if self._trigger_on_task_finished: # get ETA in minutes log.info("Received task finished event, triggering new scheduler run...") # set it self._need_update = True self._schedule_start = Time.now() return True async def _on_good_weather(self, event: Event, sender: str) -> bool: """Re-schedule on incoming good weather event. Args: event: The good weather event. sender: Who sent it. """ if not isinstance(event, GoodWeatherEvent): return False # get ETA in minutes eta = (event.eta - Time.now()).sec / 60 log.info("Received good weather event with ETA of %.0f minutes, triggering new scheduler run...", eta) # set it self._need_update = True self._schedule_start = event.eta if event.eta is not None else Time.now() return True
[docs] async def abort(self, **kwargs: Any) -> None: pass
__all__ = ["Scheduler"]