Source code for pyobs.robotic.lco.taskarchive

import asyncio
import logging
import astropy.units as u
from typing import Dict, Optional, Any
from astropy.time import TimeDelta

from pyobs.utils.time import Time
from pyobs.robotic.taskarchive import TaskArchive
from ._portal import Portal
from .task import LcoTask
from .. import Task
from ..task import Project

log = logging.getLogger(__name__)


[docs] class LcoTaskArchive(TaskArchive): """Scheduler for using the LCO portal""" def __init__( self, url: str, token: str, instrument_type: str | list[str], **kwargs: Any, ): """Creates a new LCO scheduler. Args: url: URL to portal token: Authorization token for portal instrument_type: Type of instrument to use. scripts: External scripts """ TaskArchive.__init__(self, **kwargs) # portal self._portal = self.add_child_object(Portal(url, token, "", "", "")) # store stuff instrument_type = [instrument_type] if isinstance(instrument_type, str) else instrument_type self._instrument_type = [it.lower() for it in instrument_type] # buffers in case of errors self._last_changed: Optional[Time] = None # task list self._tasks: Dict[str, LcoTask] = {} # update task self.add_background_task(self._update_worker) async def _update_worker(self) -> None: # time of last change in blocks last_change = None # run forever while True: # got new time of last change? t = await self._portal.last_changed() more_1day = (Time.now() - t) > TimeDelta(1 * u.day) if last_change is None or last_change < t and not more_1day: try: last_change = t if self._on_tasks_changed is not None: asyncio.create_task(self._on_tasks_changed()) except asyncio.CancelledError: return except: log.exception("Something went wrong when updating schedule.") # sleep a little await asyncio.sleep(5)
[docs] async def last_changed(self) -> Optional[Time]: """Returns time when last time any blocks changed.""" # try to update time try: self._last_changed = await self._portal.last_changed() except: log.debug("Could not get last changed time") # even in case of errors, return last time return self._last_changed
[docs] async def get_projects(self) -> list[Project]: """Returns list of projects from the LCO portal.""" proposals = await self._portal.proposals() return [Project(code=p["id"], name=p["id"], priority=p.get("tac_priority", 1.0)) for p in proposals]
[docs] async def get_task(self, id: Any) -> Task | None: """Returns the task with the given ID.""" for task in self._tasks.values(): if task.id == id: return task return None
[docs] async def get_schedulable_tasks(self) -> list[Task]: """Returns a list of schedulable tasks. Returns: List of schedulable tasks """ # get data schedulable_requests = await self._portal.schedulable_requests() # get proposal priorities proposals = await self._portal.proposals() tac_priorities = {p["id"]: p["tac_priority"] for p in proposals} # to LcoTasks all_tasks: list[Task] = [] for schedulable_request in schedulable_requests: tasks = LcoTask.from_schedulable_request(schedulable_request, {}) for task in tasks: task.priority = schedulable_request.ipp_value * tac_priorities[schedulable_request.proposal] if task.request.state != "PENDING": continue if task.request.configurations[0].instrument_type.lower() not in self._instrument_type: continue all_tasks.append(task) # return tasks return all_tasks
__all__ = ["LcoTaskArchive"]