Source code for pyobs.robotic.utils.archive.pyobs_archive

import io
from typing import Any, TypedDict, cast
import aiohttp
import urllib.parse
import logging
from pydantic import PrivateAttr

from pyobs.utils.time import Time
from pyobs.images import Image
from .archive import Archive, FrameInfo
from pyobs.utils.enums import ImageType

log = logging.getLogger(__name__)


class PyobsArchiveFrameInfoDict(TypedDict):
    id: int
    basename: str
    DATE_OBS: str
    FILTER: str
    binning: str
    url: str


class PyobsArchiveFrameInfo(FrameInfo):
    """Frame info for pyobs archive."""

    def __init__(self, info: PyobsArchiveFrameInfoDict):
        FrameInfo.__init__(self)
        self.info = info
        self.id = self.info["id"]
        self.filename = self.info["basename"]
        self.dateobs = Time(self.info["DATE_OBS"])
        self.filter_name = self.info["FILTER"]
        self.binning = int(self.info["binning"][0])
        self.url = self.info["url"]


class PyobsArchive(Archive):
    """Connector class to running pyobs-archive instance."""

    __module__ = "pyobs.utils.archive"

    url: str
    token: str
    proxies: dict[str, str] | None = None

    _headers: dict[str, str] = PrivateAttr()
    _timeout: aiohttp.ClientTimeout = PrivateAttr()

    model_config = {"arbitrary_types_allowed": True}

[docs] def model_post_init(self, __context: Any) -> None: self._headers = {"Authorization": "Token " + self.token} self._timeout = aiohttp.ClientTimeout(total=30)
async def list_options( self, start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> dict[str, list[Any]]: url = urllib.parse.urljoin(self.url, "frames/aggregate/") params = self._build_query( start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel ) async with aiohttp.ClientSession() as session: async with session.get(url, params=params, headers=self._headers, timeout=self._timeout) as response: if response.status != 200: raise ValueError("Could not query frames: %s" % str(await response.text())) return cast(dict[str, list[Any]], await response.json()) async def list_frames( self, start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> list[FrameInfo]: url = urllib.parse.urljoin(self.url, "frames/") params = self._build_query( start, end, night, site, telescope, instrument, image_type, binning, filter_name, rlevel ) frames: list[FrameInfo] = [] params["offset"] = 0 params["limit"] = 1000 async with aiohttp.ClientSession() as session: while True: async with session.get(url, params=params, headers=self._headers, timeout=self._timeout) as response: if response.status != 200: raise ValueError("Could not query frames") res = await response.json() new_frames = [PyobsArchiveFrameInfo(frame) for frame in res["results"]] frames.extend(new_frames) if len(frames) >= res["count"]: return frames params["offset"] += len(new_frames) @staticmethod def _build_query( start: Time | None = None, end: Time | None = None, night: str | None = None, site: str | None = None, telescope: str | None = None, instrument: str | None = None, image_type: ImageType | None = None, binning: str | None = None, filter_name: str | None = None, rlevel: int | None = None, ) -> dict[str, Any]: params: dict[str, Any] = {} if start is not None: params["start"] = start.isot if end is not None: params["end"] = end.isot if night is not None: params["night"] = night if site is not None: params["SITE"] = site if telescope is not None: params["TELESCOPE"] = telescope if instrument is not None: params["INSTRUMENT"] = instrument if image_type is not None: params["IMAGETYPE"] = image_type.value if binning is not None: params["binning"] = binning if filter_name is not None: params["FILTER"] = filter_name if rlevel is not None: params["RLEVEL"] = rlevel return params async def download_frames(self, infos: list[FrameInfo]) -> list[Image]: images = [] for info in infos: if not isinstance(info, PyobsArchiveFrameInfo): log.warning("Incorrect type for frame info.") continue url = urllib.parse.urljoin(self.url, info.url) async with aiohttp.ClientSession() as session: async with session.get(url, headers=self._headers, timeout=self._timeout) as response: if response.status != 200: log.exception("Error downloading file %s.", info.filename) try: image = Image.from_bytes(await response.read()) images.append(image) except OSError: log.exception("Error downloading file %s.", info.filename) return images async def download_headers(self, infos: list[PyobsArchiveFrameInfo]) -> list[dict[str, Any]]: headers = [] for info in infos: url = urllib.parse.urljoin(self.url, info.url).replace("download", "headers") async with aiohttp.ClientSession() as session: async with session.get(url, headers=self._headers, timeout=self._timeout) as response: if response.status != 200: log.error("Could not fetch headers for %s.", info.filename) try: results = (await response.json())["results"] headers.append(dict((d["key"], d["value"]) for d in results)) except KeyError: log.error("Could not fetch headers for %s.", info.filename) headers.append({}) return headers async def upload_frames(self, images: list[Image]) -> None: url = urllib.parse.urljoin(self.url, "frames/create/") async with aiohttp.ClientSession() as session: data = aiohttp.FormData() async with session.get(self.url, headers=self._headers) as response: token = response.cookies["csrftoken"].value data.add_field("csrfmiddlewaretoken", token) for i, img in enumerate(images, 1): filename = img.header["FNAME"] with io.BytesIO() as bio: img.writeto(bio) data.add_field(f"file{i}", bio.getvalue(), filename=filename) async with session.post(url, data=data, timeout=self._timeout, headers=self._headers) as response: if response.status != 200: raise ValueError("Cannot write file, received status_code %d." % response.status) json = await response.json() if "created" not in json or json["created"] == 0: if "errors" in json: raise ValueError("Could not create file in archive: " + str(json["errors"])) else: raise ValueError("Could not create file in archive.") __all__ = ["PyobsArchiveFrameInfo", "PyobsArchive"]