Source code for pyobs.robotic.lco.observationarchive

import datetime
import logging
from typing import Any, Literal

from pyobs.robotic.observation import Observation, ObservationList, ObservationState
from pyobs.utils.time import Time
from pyobs.robotic.observationarchive import ObservationArchive
from ._portal import Portal
from .configdb import ConfigDB
from .. import Task, TaskArchive

log = logging.getLogger(__name__)


STATE_MAP = {
    "CANCELED": ObservationState.CANCELED,
    "COMPLETED": ObservationState.COMPLETED,
    "PENDING": ObservationState.PENDING,
    "ABORTED": ObservationState.ABORTED,
    "IN_PROGRESS": ObservationState.IN_PROGRESS,
    "FAILED": ObservationState.FAILED,
}


[docs] class LcoObservationArchive(ObservationArchive): """Scheduler for using the LCO portal""" def __init__( self, url: str, configdb: str, site: str, token: str, enclosure: str, telescope: str, period: int = 24, mode: Literal["read", "write", "readwrite"] = "readwrite", **kwargs: Any, ): """Creates a new LCO scheduler. Args: url: URL to portal configdb: URL to configdb site: Site filter for fetching requests token: Authorization token for portal enclosure: Enclosure for new schedules. telescope: Telescope for new schedules. instrument: Instrument for new schedules. period: Period to schedule in hours """ from ._schedulereader import LcoScheduleReader from ._schedulewriter import LcoScheduleWriter ObservationArchive.__init__(self, **kwargs) # portal self._portal = self.add_child_object(Portal(url, token, site, enclosure, telescope)) self._configdb = ConfigDB(configdb) # reader/writer self._schedule_reader = self.add_child_object(LcoScheduleReader(self._portal, site, telescope)) self._schedule_writer = self.add_child_object( LcoScheduleWriter(self._portal, self._configdb, site, enclosure, telescope, period) )
[docs] async def get_schedule(self) -> ObservationList: """Fetch schedule from the portal. Returns: Dictionary with tasks. Raises: Timeout: If request timed out. ValueError: If something goes wrong. """ return await self._schedule_reader.get_schedule()
[docs] async def get_current_observation(self, task_archive: TaskArchive | None = None) -> Observation | None: """Returns the currently running observation.""" return await self._schedule_reader.get_task(Time.now())
[docs] async def update_observation(self, observation: Observation) -> None: """Updates observation state in the portal.""" if not isinstance(observation.task, Task) or observation.task.id is None: return from .task import LcoTask, ConfigStatus if not isinstance(observation.task, LcoTask): return for config in observation.task.request.configurations: status = ConfigStatus(state=observation.state.value) status.finish(state=observation.state.value) await self.send_update(config.configuration_status, status.to_json())
[docs] async def get_next_observation(self, time: Time, task_archive: TaskArchive | None = None) -> Observation | None: """Returns the active scheduled task at the given time. Args: time: Time to return an observation for. Returns: Scheduled task at the given time. """ return await self._schedule_reader.get_task(time)
[docs] async def send_update(self, status_id: int | None, status: dict[str, Any]) -> None: """Send a report to the LCO portal Args: status_id: id of config status status: Status dictionary """ if status_id is None: return await self._portal.update_configuration_status(status_id, status)
# await self._schedule_reader.update_now()
[docs] async def add_observations(self, tasks: ObservationList) -> None: """Add the list of scheduled tasks to the schedule. Args: tasks: Scheduled tasks. """ await self._schedule_writer.add_schedule(tasks)
[docs] async def clear_schedule(self, start_time: Time) -> None: """Clear schedule after given start time. Args: start_time: Start time to clear from. """ await self._schedule_writer.clear_schedule(start_time)
[docs] async def get_observations( self, task: Task | None = None, state: ObservationState | None = None, start_before: Time | None = None, start_after: Time | None = None, end_before: Time | None = None, end_after: Time | None = None, ) -> ObservationList: """Returns a list of observations matching the given filters. The LCO portal requires a request id, so a task is mandatory for this archive. Args: task: Task to get observations for (required for the LCO archive). state: If given, only return observations in this state. start_before: If given, only return observations that start before this time. start_after: If given, only return observations that start after this time. end_before: If given, only return observations that end before this time. end_after: If given, only return observations that end after this time. Returns: List of matching observations. """ from .task import LcoTask if not isinstance(task, LcoTask) or task.id is None or not isinstance(task.id, int): raise ValueError("Task is not a LCO task.") portal_observations = await self._portal.observations(task.id) observations = ObservationList() for obs in portal_observations: observations.append( Observation( id=obs.id, task=task, start=obs.start, end=obs.end, state=STATE_MAP[obs.state], ) ) return observations.filter( state=state, start_before=start_before, start_after=start_after, end_before=end_before, end_after=end_after, )
[docs] async def observations_for_night(self, date: datetime.date) -> ObservationList: """Returns a list of observations for the given task. Args: date: Date of night to get observations for. Returns: List of observations for the given task. """ return ObservationList()
__all__ = ["LcoObservationArchive"]