from __future__ import annotations
from typing import TYPE_CHECKING, Self
from astropy.coordinates import EarthLocation, SkyCoord
from pydantic import Field, PrivateAttr, model_validator
from pyobs.utils.time import Time
from .merit import Merit
if TYPE_CHECKING:
from pyobs.robotic import Task
from ..dataprovider import DataProvider
[docs]
class TransitMerit(Merit):
"""Merit function for observing transits."""
# jd0 of first transit
jd0: float = Field(default=2450000, ge=2400000, le=2499999, json_schema_extra={"decimals": 9})
# period in days
period: float = Field(default=1.0, ge=0.01, le=9999, json_schema_extra={"decimals": 9})
# transit duration in seconds
duration: int = Field(default=1, ge=1, le=99999)
# start observation this ingress*duration before 1st contact
ingress: float = Field(default=0.2, ge=0, le=5)
# start observation not later than over*duration before 1st contact
over: float = Field(default=0.0, ge=0, le=5)
# minimum scheduling window in minutes; ensures the transit can be picked even for short transits
min_window: float = Field(default=5.0, ge=0, le=60)
_duration: float = PrivateAttr(default=0.0)
_ingress: float = PrivateAttr(default=0.0)
_over: float = PrivateAttr(default=0.0)
_min_window: float = PrivateAttr(default=0.0)
@model_validator(mode="after")
def calculate_derived(self) -> Self:
self._duration = self.duration / 86400.0 / self.period
self._ingress = self.ingress * self.duration / 86400.0 / self.period
self._over = self.over * self.duration / 86400.0 / self.period
self._min_window = self.min_window / (24.0 * 60.0 * self.period)
return self
async def __call__(self, time: Time, task: Task, data: DataProvider) -> float:
if task.target is None:
return 0.0
# fast-path: find nearest mid-transit to the given time and check distance
n = round((time.jd - self.jd0) / self.period)
mid_jd = self.jd0 + n * self.period
window_half = max((self._duration / 2.0 + self._ingress) * self.period, self._min_window) # days
if abs(time.jd - mid_jd) > window_half:
return 0.0
# current phase (expensive barycentric correction only when near transit)
phi = self.phase_for_jd(task.target.coordinates(time), data.observer.location, time)
# check
return float(1.0 - self._duration / 2.0 - self._ingress <= phi <= 1.0 - self._duration / 2.0 - self._over)
[docs]
def transit_time(self) -> Time:
"""Returns the time of the next mid-transit."""
n = self.periods_since_jd0()
return Time(self.jd0 + n * self.period, format="jd", scale="tdb")
[docs]
def end_time(self) -> Time:
"""Returns the time until which observations should run: mid-transit + duration/2 + ingress buffer."""
mid = self.transit_time()
end_offset = (self.duration / 2.0 + self.ingress * self.duration) / 86400.0
return Time(mid.jd + end_offset, format="jd", scale="tdb")
def days_since_jd0(self) -> float:
return float(Time.now().jd - self.jd0)
def periods_since_jd0(self) -> int:
p = self.days_since_jd0() / self.period
return round(p)
def phase_for_jd(self, target: SkyCoord, location: EarthLocation, time: Time) -> float:
hjd = self.jd_to_hjd(target, location, time)
phi = ((hjd - self.jd0) % self.period) / self.period
return phi if phi >= 0 else phi + 1.0
@staticmethod
def jd_to_hjd(target: SkyCoord, location: EarthLocation, time: Time) -> float:
t_bjd = time.light_travel_time(target, kind="barycentric", location=location)
bjd = (time + t_bjd).tdb.jd
return float(bjd)
__all__ = ["TransitMerit"]