Source code for pyobs.robotic.scheduler.astroplanscheduler

from __future__ import annotations
import asyncio
import logging
import multiprocessing as mp
from typing import Any, TYPE_CHECKING
from collections.abc import AsyncIterator
import astroplan
import astropy.units as u
from astroplan import ObservingBlock, FixedTarget

from pyobs.object import Object
from .taskscheduler import TaskScheduler
from .targets import SiderealTarget
from pyobs.utils.time import Time

if TYPE_CHECKING:
    from pyobs.robotic import Observation, Task, ObservationList, Project

log = logging.getLogger(__name__)


[docs] class AstroplanScheduler(TaskScheduler): """Scheduler based on astroplan.""" def __init__( self, twilight: str = "astronomical", **kwargs: Any, ): """Initialize a new scheduler. Args: twilight: astronomical or nautical """ Object.__init__(self, **kwargs) # store self._twilight = twilight self._lock = asyncio.Lock() self._abort: asyncio.Event = asyncio.Event() self._is_running: bool = False async def schedule( self, tasks: list[Task], projects: list[Project], start: Time, end: Time ) -> AsyncIterator[Observation]: # is lock acquired? send abort signal if self._lock.locked(): await self.abort() # get lock async with self._lock: # clear abort event for this run self._abort.clear() # prepare scheduler blocks, start, end, constraints = await self._prepare_schedule(tasks, start, end) # schedule if not blocks: return scheduled_blocks = await self._schedule_blocks(blocks, start, end, constraints, self._abort) # convert scheduled_tasks = await self._convert_blocks(scheduled_blocks, tasks) # yield them for scheduled_task in scheduled_tasks: yield scheduled_task # clean up del blocks, constraints, scheduled_blocks, scheduled_tasks async def abort(self) -> None: self._abort.set() async def _prepare_schedule( self, tasks: list[Task], start: Time, end: Time ) -> tuple[list[ObservingBlock], Time, Time, list[Any]]: """TaskSchedule blocks.""" from pyobs.robotic.scheduler.dataprovider import DataProvider data = DataProvider(self.observer) # only global constraint is the night if self._twilight == "astronomical": constraints = [astroplan.AtNightConstraint.twilight_astronomical()] elif self._twilight == "nautical": constraints = [astroplan.AtNightConstraint.twilight_nautical()] else: raise ValueError("Unknown twilight type.") # create blocks from tasks blocks: list[ObservingBlock] = [] for task in tasks: # resolve dynamic target if not await task.resolve_target(start, task, data): log.warning("Could not resolve target for task '%s', skipping.", task.name) continue target = task.target if not isinstance(target, SiderealTarget): log.warning("Non-sidereal targets not supported.") continue priority = (1000.0 - task.priority) if task.priority is not None else 1000.0 if priority < 0: priority = 0 blocks.append( ObservingBlock( FixedTarget(target.coord, name=target.name), task.duration * u.second, priority, constraints=[c.to_astroplan() for c in task.constraints] if task.constraints else None, configuration={"request": task}, name=task.id, ) ) # return all return blocks, start, end, constraints async def _schedule_blocks( self, blocks: list[ObservingBlock], start: Time, end: Time, constraints: list[Any], abort: asyncio.Event ) -> list[ObservingBlock]: # run actual scheduler in separate process and wait for it queue_out: mp.Queue[ObservingBlock] = mp.Queue() p = mp.Process(target=self._schedule_process, args=(blocks, start, end, constraints, queue_out)) p.start() # wait for process to finish # note that the process only finishes, when the queue is empty! so we have to poll the queue first # and then the process. loop = asyncio.get_running_loop() future = loop.run_in_executor(None, queue_out.get, True) while not future.done(): if abort.is_set(): p.kill() return [] else: await asyncio.sleep(0.1) scheduled_blocks: list[ObservingBlock] = await future await loop.run_in_executor(None, p.join) return scheduled_blocks def _schedule_process( self, blocks: list[ObservingBlock], start: Time, end: Time, constraints: list[Any], scheduled_blocks: mp.Queue[ObservingBlock], ) -> None: """Actually do the scheduling, usually run in a separate process.""" # log it log.info("Calculating schedule for %d schedulable block(s) starting at %s...", len(blocks), start) # we don't need any transitions transitioner = astroplan.Transitioner() # create scheduler scheduler = astroplan.PriorityScheduler(constraints, self._observer, transitioner=transitioner) # run scheduler logging.disable(logging.WARNING) time_range = astroplan.Schedule(start, end) schedule = scheduler(blocks, time_range) logging.disable(logging.NOTSET) # put scheduled blocks in queue scheduled_blocks.put(schedule.scheduled_blocks) # clean up del transitioner, scheduler, schedule async def _convert_blocks(self, blocks: list[ObservingBlock], tasks: list[Task]) -> ObservationList: from pyobs.robotic import Observation, ObservationList scheduled_tasks = ObservationList() for block in blocks: # find task task_id = block.name for task in tasks: if task.id == task_id: break else: raise ValueError(f"Could not find task with id '{task_id}'") # create scheduled task scheduled_tasks.append(Observation(task=task, start=block.start_time, end=block.end_time)) return scheduled_tasks
__all__ = ["AstroplanScheduler"]