Source code for pyobs.robotic.scheduler.merits.transit

from __future__ import annotations

from typing import TYPE_CHECKING, Self

from astropy.coordinates import EarthLocation, SkyCoord
from pydantic import Field, PrivateAttr, model_validator

from pyobs.utils.time import Time

from .merit import Merit

if TYPE_CHECKING:
    from pyobs.robotic import Task

    from ..dataprovider import DataProvider


[docs] class TransitMerit(Merit): """Merit function for observing transits.""" # jd0 of first transit jd0: float = Field(default=2450000, ge=2400000, le=2499999, json_schema_extra={"decimals": 9}) # period in days period: float = Field(default=1.0, ge=0.01, le=9999, json_schema_extra={"decimals": 9}) # transit duration in seconds duration: int = Field(default=1, ge=1, le=99999) # start observation this ingress*duration before 1st contact ingress: float = Field(default=0.2, ge=0, le=5) # start observation not later than over*duration before 1st contact over: float = Field(default=0.0, ge=0, le=5) # minimum scheduling window in minutes; ensures the transit can be picked even for short transits min_window: float = Field(default=5.0, ge=0, le=60) _duration: float = PrivateAttr(default=0.0) _ingress: float = PrivateAttr(default=0.0) _over: float = PrivateAttr(default=0.0) _min_window: float = PrivateAttr(default=0.0) @model_validator(mode="after") def calculate_derived(self) -> Self: self._duration = self.duration / 86400.0 / self.period self._ingress = self.ingress * self.duration / 86400.0 / self.period self._over = self.over * self.duration / 86400.0 / self.period self._min_window = self.min_window / (24.0 * 60.0 * self.period) return self async def __call__(self, time: Time, task: Task, data: DataProvider) -> float: if task.target is None: return 0.0 # fast-path: find nearest mid-transit to the given time and check distance n = round((time.jd - self.jd0) / self.period) mid_jd = self.jd0 + n * self.period window_half = max((self._duration / 2.0 + self._ingress) * self.period, self._min_window) # days if abs(time.jd - mid_jd) > window_half: return 0.0 # current phase (expensive barycentric correction only when near transit) phi = self.phase_for_jd(task.target.coordinates(time), data.observer.location, time) # check return float(1.0 - self._duration / 2.0 - self._ingress <= phi <= 1.0 - self._duration / 2.0 - self._over)
[docs] def transit_time(self) -> Time: """Returns the time of the next mid-transit.""" n = self.periods_since_jd0() return Time(self.jd0 + n * self.period, format="jd", scale="tdb")
[docs] def end_time(self) -> Time: """Returns the time until which observations should run: mid-transit + duration/2 + ingress buffer.""" mid = self.transit_time() end_offset = (self.duration / 2.0 + self.ingress * self.duration) / 86400.0 return Time(mid.jd + end_offset, format="jd", scale="tdb")
def days_since_jd0(self) -> float: return float(Time.now().jd - self.jd0) def periods_since_jd0(self) -> int: p = self.days_since_jd0() / self.period return round(p) def phase_for_jd(self, target: SkyCoord, location: EarthLocation, time: Time) -> float: hjd = self.jd_to_hjd(target, location, time) phi = ((hjd - self.jd0) % self.period) / self.period return phi if phi >= 0 else phi + 1.0 @staticmethod def jd_to_hjd(target: SkyCoord, location: EarthLocation, time: Time) -> float: t_bjd = time.light_travel_time(target, kind="barycentric", location=location) bjd = (time + t_bjd).tdb.jd return float(bjd)
__all__ = ["TransitMerit"]