import glob
from pathlib import Path
from typing import Any
import logging
from pydantic import PrivateAttr
import pandas as pd
from astropy.io import fits
from pyobs.utils.time import Time
from pyobs.images import Image
from .archive import Archive, FrameInfo
from pyobs.utils.enums import ImageType
log = logging.getLogger(__name__)
class LocalArchive(Archive):
"""Connector class to a local image archive."""
__module__ = "pyobs.utils.archive"
root: str
_root_path: Path = PrivateAttr()
_data: pd.DataFrame = PrivateAttr(default_factory=pd.DataFrame)
model_config = {"arbitrary_types_allowed": True}
[docs]
def model_post_init(self, __context: Any) -> None:
self._root_path = Path(self.root)
self._data = pd.DataFrame()
self._update_root()
def _update_root(self) -> None:
"""Update files in root directory."""
filenames = sorted(glob.glob(str(self._root_path / "*.fits")))
columns: dict[str, Any] = {
h: []
for h in [
"date-obs",
"day-obs",
"binning",
"filter",
"image_type",
"instrument",
"site",
"telescope",
"rlevel",
]
}
for filename in filenames:
hdr = fits.getheader(filename)
columns["date-obs"].append(Time(hdr["DATE-OBS"]) if "DATE-OBS" in hdr else None)
columns["day-obs"].append(Time(hdr["DAY-OBS"]) if "DAY-OBS" in hdr else None)
columns["binning"].append(f"{hdr['XBINNING']}x{hdr['YBINNING']}" if "XBINNING" in hdr else None)
columns["filter"].append(hdr["FILTER"] if "FILTER" in hdr else None)
columns["image_type"].append(hdr["IMAGETYP"] if "IMAGETYP" in hdr else None)
columns["instrument"].append(hdr["INSTRUME"] if "INSTRUME" in hdr else None)
columns["site"].append(hdr["SITEID"] if "SITEID" in hdr else None)
columns["telescope"].append(hdr["TELID"] if "TELID" in hdr else None)
columns["rlevel"].append(hdr["RLEVEL"] if "RLEVEL" in hdr else None)
columns["filename"] = filenames
self._data = pd.DataFrame(columns)
def _filter_data(
self,
start: Time | None = None,
end: Time | None = None,
night: str | None = None,
site: str | None = None,
telescope: str | None = None,
instrument: str | None = None,
image_type: ImageType | None = None,
binning: str | None = None,
filter_name: str | None = None,
rlevel: int | None = None,
) -> pd.DataFrame:
data = self._data
if start is not None:
data = data[data["date-obs"] > start]
if end is not None:
data = data[data["date-obs"] < end]
if night is not None:
data = data[data["day-obs"] == night]
if site is not None:
data = data[data["site"] == site]
if telescope is not None:
data = data[data["telescope"] == telescope]
if instrument is not None:
data = data[data["instrument"] == instrument]
if image_type is not None:
data = data[data["image_type"] == image_type.value]
if binning is not None:
data = data[data["binning"] == binning]
if filter_name is not None:
data = data[data["filter"] == filter_name]
if rlevel is not None:
data = data[data["rlevel"] == rlevel]
return data
async def list_options(
self,
start: Time | None = None,
end: Time | None = None,
night: str | None = None,
site: str | None = None,
telescope: str | None = None,
instrument: str | None = None,
image_type: ImageType | None = None,
binning: str | None = None,
filter_name: str | None = None,
rlevel: int | None = None,
) -> dict[str, list[Any]]:
data = self._filter_data(
start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel
)
return {
"binnings": list(data["binning"].unique()),
"filters": list(data["filter"].unique()),
"imagetypes": list(data["image_type"].unique()),
"instruments": list(data["instrument"].unique()),
"sites": list(data["site"].unique()),
"telescopes": list(data["telescope"].unique()),
}
async def list_frames(
self,
start: Time | None = None,
end: Time | None = None,
night: str | None = None,
site: str | None = None,
telescope: str | None = None,
instrument: str | None = None,
image_type: ImageType | None = None,
binning: str | None = None,
filter_name: str | None = None,
rlevel: int | None = None,
) -> list[FrameInfo]:
data = self._filter_data(
start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel
)
infos: list[FrameInfo] = []
for _, row in data.iterrows():
info = FrameInfo()
info.id = row["filename"]
info.filename = row["filename"]
info.filter_name = row["filter"]
info.binning = row["binning"]
info.dateobs = row["date-obs"]
infos.append(info)
return infos
async def download_frames(self, frames: list[FrameInfo]) -> list[Image]:
images: list[Image] = []
for frame in frames:
if frame.filename is not None:
images.append(Image.from_file(frame.filename))
return images
async def download_headers(self, infos: list[FrameInfo]) -> list[dict[str, Any]]:
headers = []
for frame in infos:
headers.append({k: v for k, v in fits.getheader(frame.filename).items()})
return headers
async def upload_frames(self, images: list[Image]) -> None:
pass
__all__ = ["LocalArchive"]