Source code for pyobs.robotic.scheduler.ondemandscheduler

from __future__ import annotations
import asyncio
import logging
from typing import Any, TYPE_CHECKING
from collections.abc import AsyncIterator
import numpy as np
from astropy.time import TimeDelta
import astropy.units as u

from pyobs.object import Object
from . import DataProvider
from .constraints import Constraint
from .taskscheduler import TaskScheduler
from pyobs.utils.time import Time
from ..observationarchive import ObservationArchive
from .observationarchiveevolution import ObservationArchiveEvolution

if TYPE_CHECKING:
    from pyobs.robotic import Task, Project
    from pyobs.robotic import Observation


log = logging.getLogger(__name__)


class OnDemandScheduler(TaskScheduler):
    """Scheduler based on merits."""

    __module__ = "pyobs.modules.robotic"

    def __init__(
        self,
        twilight: str = "astronomical",
        observation_archive: ObservationArchive | dict[str, Any] | None = None,
        constraints: list[Constraint] | list[dict[str, Any]] | None = None,
        **kwargs: Any,
    ):
        """Initialize a new scheduler.

        Args:
            twilight: astronomical or nautical
        """
        Object.__init__(self, **kwargs)

        # get obs archive
        self._obs_archive = (
            self.add_child_object(observation_archive, ObservationArchive) if observation_archive is not None else None
        )

        # store
        self._twilight = twilight
        self._abort: asyncio.Event = asyncio.Event()

        # global constraints
        constraints = constraints or []
        self._global_constraints: list[Constraint] = [Constraint.create(self, c) for c in constraints]

    async def schedule(
        self, tasks: list[Task], projects: list[Project], start: Time, end: Time
    ) -> AsyncIterator[Observation]:
        if self._observer is None:
            raise RuntimeError("No observer given.")

        archive = ObservationArchiveEvolution(self._observer, self._obs_archive)
        data = DataProvider(self._observer, archive)
        projects_dict = {project.code: project for project in projects}

        # schedule from start to end
        async for task in self.schedule_in_interval(tasks, projects_dict, start, end, data):
            # evolve archive
            await data.archive.evolve(task)

            # yield to caller
            yield task

    async def abort(self) -> None:
        self._abort.set()

    async def schedule_in_interval(
        self,
        tasks: list[Task],
        projects: dict[str, Project],
        start: Time,
        end: Time,
        data: DataProvider,
        step: float = 300,
    ) -> AsyncIterator[Observation]:
        time = start
        while time < end:
            latest_end = start

            # schedule first in this interval, could be one or two
            async for scheduled_task in self.schedule_first_in_interval(tasks, projects, time, end, data):
                # yield it to caller
                yield scheduled_task

                # check end
                if scheduled_task.end > latest_end:
                    latest_end = scheduled_task.end

            if latest_end == start:
                # no task found, try 5 minutes later
                time += TimeDelta(step * u.second)
            else:
                # set new time from scheduled task
                time = latest_end

    async def schedule_first_in_interval(
        self,
        tasks: list[Task],
        projects: dict[str, Project],
        start: Time,
        end: Time,
        data: DataProvider,
        step: float = 300,
    ) -> AsyncIterator[Observation]:
        # find current best task
        task, merit = await self.find_next_best_task(tasks, projects, start, end, data)

        if task is not None and merit is not None:
            # check, whether there is another task within its duration that  will have a higher merit
            better_task, better_time, better_merit = await self.check_for_better_task(
                task, projects, merit, tasks, start, end, data, step=step
            )

            if better_task is not None and better_time is not None and better_merit is not None:
                # can we maybe postpone the better task to run both?
                postpone_time = await self.can_postpone_task(
                    task, projects, better_task, better_merit, start, end, data
                )
                if postpone_time is not None:
                    # yes, we can! schedule both
                    yield self.create_scheduled_task(task, merit, start)
                    yield self.create_scheduled_task(better_task, better_merit, postpone_time)
                else:
                    # just schedule better_task
                    yield self.create_scheduled_task(better_task, better_merit, better_time)

                    # and find other tasks for in between, new end time is better_time
                    async for between_task in self.schedule_in_interval(tasks, projects, start, better_time, data):
                        yield between_task

            else:
                # this seems to be the best task for now, schedule it
                yield self.create_scheduled_task(task, merit, start)

    def create_scheduled_task(self, task: Task, merit: float, time: Time) -> Observation:
        from pyobs.robotic import Observation

        return Observation(
            task=task,
            start=time,
            end=time + TimeDelta(task.duration * u.second),
            priority=merit,
            target=task.target,
        )

[docs] async def evaluate_constraints(self, task: Task, start: Time, end: Time, data: DataProvider) -> bool: """Loops all constraints. If any evaluates to False, return False. Otherwise, return True. Args: task: Task to evaluate. start: Start time. end: End time. data: Data provider. Returns: True if all constraints evaluate True, False otherwise. """ for constraint in self._global_constraints + task.constraints: if not await constraint(start, task, data): return False return True
[docs] async def evaluate_merits(self, task: Task, start: Time, end: Time, data: DataProvider) -> float: """Loop all merits, evaluate them and multiply the results. If any evaluates to 0, abort and return 0. Args: task: Task to evaluate. start: Start time. end: End time. data: Data provider. Returns: The final merit for this task. """ # loop merits total_merit = 1.0 for merit in task.merits: total_merit *= await merit(start, task, data) # if zero, abort and return it if total_merit == 0.0: return 0.0 # done return total_merit
async def evaluate_constraints_and_merits( self, tasks: list[Task], projects: dict[str, Project], start: Time, end: Time, data: DataProvider ) -> list[float]: # evaluate all merit functions at given time merits: list[float] = [] for task in tasks: # resolve dynamic target — skip task if no valid target found if not await task.resolve_target(start, task, data): merits.append(0.0) continue # evaluate constraints if await self.evaluate_constraints(task, start, end, data): # now we can evaluate the merits if len(task.merits) == 0: # no merits? evaluate to 1 merit = 1.0 elif start + TimeDelta(task.duration * u.second) > end: # if task is too long for the given slot, we evaluate its merits to zero merit = 0.0 else: merit = await self.evaluate_merits(task, start, end, data) else: # some constraint failed... merit = 0.0 # multiply with priorities if task.priority is not None: merit *= task.priority if task.project in projects: project = projects[task.project] if project.priority is not None: merit *= project.priority # store it merits.append(merit) return merits async def find_next_best_task( self, tasks: list[Task], projects: dict[str, Project], start: Time, end: Time, data: DataProvider ) -> tuple[Task | None, float]: # evaluate all merit functions at given time merits = await self.evaluate_constraints_and_merits(tasks, projects, start, end, data) # find max one idx = np.argmax(merits) task = tasks[idx] # if merit is zero, return nothing return None if merits[idx] == 0.0 else task, merits[idx] async def check_for_better_task( self, task: Task, projects: dict[str, Project], merit: float, tasks: list[Task], start: Time, end: Time, data: DataProvider, step: float = 300, ) -> tuple[Task | None, Time | None, float | None]: t = start + TimeDelta(step * u.second) while t < start + TimeDelta(task.duration * u.second): merits = await self.evaluate_constraints_and_merits(tasks, projects, t, end, data) for i, m in enumerate(merits): if m > merit: return tasks[i], t, m t += TimeDelta(step * u.second) return None, None, None async def can_postpone_task( self, task: Task, projects: dict[str, Project], better_task: Task, better_merit: float, start: Time, end: Time, data: DataProvider, ) -> Time | None: # new start time of better_task would be after the execution of task better_start: Time = start + TimeDelta(task.duration * u.second) # evaluate merit of better_task at new start time merit = (await self.evaluate_constraints_and_merits([better_task], projects, better_start, end, data))[0] # if it got better, return it, otherwise return Nones if merit >= better_merit: return better_start else: return None __all__ = ["OnDemandScheduler"]