Source code for pyobs.robotic.utils.archive.local_archive

import glob
import logging
from pathlib import Path
from typing import Any

import pandas as pd
from astropy.io import fits
from pydantic import PrivateAttr

from pyobs.images import Image
from pyobs.utils.enums import ImageType
from pyobs.utils.time import Time

from .archive import Archive, FrameInfo

log = logging.getLogger(__name__)


class LocalArchive(Archive):
    """Connector class to a local image archive."""

    __module__ = "pyobs.utils.archive"

    root: str

    _root_path: Path = PrivateAttr()
    _data: pd.DataFrame = PrivateAttr(default_factory=pd.DataFrame)

    model_config = {"arbitrary_types_allowed": True}

[docs] def model_post_init(self, __context: Any) -> None: self._root_path = Path(self.root) self._data = pd.DataFrame() self._update_root()
def _update_root(self) -> None: """Update files in root directory.""" filenames = sorted(glob.glob(str(self._root_path / "*.fits"))) columns: dict[str, Any] = { h: [] for h in [ "date-obs", "day-obs", "binning", "filter", "image_type", "instrument", "site", "telescope", "rlevel", ] } for filename in filenames: hdr = fits.getheader(filename) columns["date-obs"].append(Time(hdr["DATE-OBS"]) if "DATE-OBS" in hdr else None) columns["day-obs"].append(Time(hdr["DAY-OBS"]) if "DAY-OBS" in hdr else None) columns["binning"].append(f"{hdr['XBINNING']}x{hdr['YBINNING']}" if "XBINNING" in hdr else None) columns["filter"].append(hdr["FILTER"] if "FILTER" in hdr else None) columns["image_type"].append(hdr["IMAGETYP"] if "IMAGETYP" in hdr else None) columns["instrument"].append(hdr["INSTRUME"] if "INSTRUME" in hdr else None) columns["site"].append(hdr["SITEID"] if "SITEID" in hdr else None) columns["telescope"].append(hdr["TELID"] if "TELID" in hdr else None) columns["rlevel"].append(hdr["RLEVEL"] if "RLEVEL" in hdr else None) columns["filename"] = filenames self._data = pd.DataFrame(columns) def _filter_data( self, start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> pd.DataFrame: data = self._data if start is not None: data = data[data["date-obs"] > start] if end is not None: data = data[data["date-obs"] < end] if night is not None: data = data[data["day-obs"] == night] if site is not None: data = data[data["site"] == site] if telescope is not None: data = data[data["telescope"] == telescope] if instrument is not None: data = data[data["instrument"] == instrument] if image_type is not None: data = data[data["image_type"] == image_type] if binning is not None: data = data[data["binning"] == binning] if filter_name is not None: data = data[data["filter"] == filter_name] if rlevel is not None: data = data[data["rlevel"] == rlevel] return data async def list_options( self, start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> dict[str, list[Any]]: data = self._filter_data( start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel ) return { "binnings": list(data["binning"].unique()), "filters": list(data["filter"].unique()), "imagetypes": list(data["image_type"].unique()), "instruments": list(data["instrument"].unique()), "sites": list(data["site"].unique()), "telescopes": list(data["telescope"].unique()), } async def list_frames( self, start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> list[FrameInfo]: data = self._filter_data( start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel ) infos: list[FrameInfo] = [] for _, row in data.iterrows(): info = FrameInfo() info.id = row["filename"] info.filename = row["filename"] info.filter_name = row["filter"] info.binning = row["binning"] info.dateobs = row["date-obs"] infos.append(info) return infos async def download_frames(self, frames: list[FrameInfo]) -> list[Image]: images: list[Image] = [] for frame in frames: if frame.filename is not None: images.append(Image.from_file(frame.filename)) return images async def download_headers(self, infos: list[FrameInfo]) -> list[dict[str, Any]]: headers = [] for frame in infos: headers.append({k: v for k, v in fits.getheader(frame.filename).items()}) return headers async def upload_frames(self, images: list[Image]) -> None: pass __all__ = ["LocalArchive"]