Source code for pyobs.vfs.vfs

from __future__ import annotations

import io
import logging
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, Literal, cast, overload

import yaml

from .file import VFSFile

if TYPE_CHECKING:
    import pandas as pd
    from astropy.io import fits

    from pyobs.images import Image

log = logging.getLogger(__name__)


class VirtualFileSystem:
    """Base for a virtual file system."""

    __module__ = "pyobs.vfs"

    def __init__(self, roots: dict[str, Any] | None = None, **kwargs: Any):
        """Create a new VFS.

        Args:
            roots: Dictionary containing roots, see :mod:`~pyobs.vfs` for examples.
        """

        # if no root for 'pyobs' is given, add one
        self._roots: dict[str, Any] = {
            "pyobs": {"class": "pyobs.vfs.LocalFile", "root": "/opt/pyobs/storage/"},
            "robotic": {"class": "pyobs.vfs.LocalFile", "root": "/opt/pyobs/robotic/"},
        }
        if roots is not None:
            self._roots.update(roots)

[docs] @staticmethod def split_root(path: str) -> tuple[str, str]: """Splits the root from the rest of the path. Args: path (str): Path to split. Returns: (tuple) Tuple (root, filename). """ # remove leading slash if path.startswith("/"): path = path[1:] # no more slash left? if "/" not in path: raise ValueError("No valid path with a root.") # get position of first slash and split pos = path.index("/") root = path[:pos] filename = path[pos + 1 :] # return it return root, filename
[docs] def open_file(self, filename: str, mode: str) -> VFSFile: """Open a file. The handling class is chosen depending on the rootse in the filename. Args: filename (str): Name of file to open. mode (str): Opening mode. Returns: (IOBase) File like object for given file. """ # split root root, filename = VirtualFileSystem.split_root(filename) # does root exist? if root not in self._roots: raise ValueError(f"Could not find root {root} for file.") # create file object from pyobs.object import get_object fd = get_object(self._roots[root], object_class=VFSFile, name=filename, mode=mode) # return it return cast(VFSFile, fd)
[docs] async def read_fits(self, filename: str) -> fits.HDUList: """Convenience function that wraps around open_file() to read a FITS file and put it into a astropy FITS structure. Args: filename: Name of file to download. Returns: A PrimaryHDU containing the FITS file. """ from astropy.io import fits async with self.open_file(filename, "rb") as f: data = await f.read() return fits.HDUList.fromstring(data)
[docs] async def write_fits(self, filename: str, hdulist: fits.HDUList, *args: Any, **kwargs: Any) -> None: """Convenience function for writing an Image to a FITS file. Args: filename: Name of file to write. hdulist: hdu list to write. """ # open file async with self.open_file(filename, "wb") as f: with io.BytesIO() as bio: hdulist.writeto(bio, *args, **kwargs) await f.write(bio.getvalue())
[docs] async def read_image(self, filename: str) -> Image: """Convenience function that wraps around open_file() to read an Image. Args: filename: Name of file to download. Returns: An image object """ from pyobs.images import Image async with self.open_file(filename, "rb") as f: data = await f.read() if isinstance(data, str): data = data.encode("utf-8") return Image.from_bytes(data)
[docs] async def write_image(self, filename: str, image: Image, *args: Any, **kwargs: Any) -> None: """Convenience function for writing an Image to a FITS file. Args: filename: Name of file to write. image: Image to write. """ # open file async with self.open_file(filename, "wb") as f: with io.BytesIO() as bio: image.writeto(bio, *args, **kwargs) await f.write(bio.getvalue())
[docs] async def write_bytes(self, filename: str, data: bytes, *args: Any, **kwargs: Any) -> None: """Convenience function for writing bytes to a file. Args: filename: Name of file to write. data: Bytes to write. """ # open file async with self.open_file(filename, "wb") as f: await f.write(data)
[docs] async def read_csv(self, filename: str, *args: Any, **kwargs: Any) -> pd.DataFrame: """Convenience function for reading a CSV file into a DataFrame. Args: filename: Name of file to read. Returns: DataFrame with content of file. """ import pandas as pd try: # open file async with self.open_file(filename, "r") as f: data = await f.read() if isinstance(data, bytes): data = data.decode("utf-8") return cast(pd.DataFrame, pd.read_csv(io.StringIO(data), *args, **kwargs)) except pd.errors.EmptyDataError: # on error, return empty dataframe return pd.DataFrame()
[docs] async def write_csv(self, filename: str, df: pd.DataFrame, *args: Any, **kwargs: Any) -> None: """Convenience function for writing a CSV file from a DataFrame. Args: filename: Name of file to write. df: DataFrame to write. """ async with self.open_file(filename, "w") as f: # create a StringIO as temporary write target with io.StringIO() as sio: # write table to sio df.to_csv(sio, *args, **kwargs) # and write all content to file await f.write(sio.getvalue())
[docs] async def read_yaml(self, filename: str) -> Any: """Convenience function for reading a YAML file into a dict. Args: filename: Name of file to read. Returns: Content of file. """ # open file async with self.open_file(filename, "r") as f: # read YAML data = await f.read() if isinstance(data, bytes): data = data.decode("utf-8") return yaml.safe_load(io.StringIO(data))
[docs] async def write_yaml(self, filename: str, data: Any) -> None: """Convenience function for writing a YAML file from a dict. Args: data: dict to write. filename: Name of file to write. """ # open file async with self.open_file(filename, "w") as f: # create StringIO as temp storage with io.StringIO() as sio: # dump to StringIO yaml.dump(data, sio) # write file from StringIO await f.write(sio.getvalue())
[docs] async def local_path(self, path: str) -> str: """Returns a local filename, but only, if path leads to a LocalFile. Args: path: Path to get local path for. Returns: Local path. Raises: ValueError if path does not lead to LocalFile. """ from .localfile import LocalFile # get class klass, root, path = self._get_class(path) # local file? if not issubclass(klass, LocalFile): raise ValueError(f"Given path {path} is not a local path.") # get local path return await klass.local_path(path, **self._roots[root])
def _get_class(self, path: str) -> tuple[type[VFSFile], str, str]: from pyobs.object import get_class_from_string # split root root, path = VirtualFileSystem.split_root(path) # get root class return get_class_from_string(self._roots[root]["class"]), root, path @overload def _get_method( self, path: str, method: Literal["find"] ) -> tuple[Callable[..., Awaitable[list[str]]], str, str]: ... @overload def _get_method( self, path: str, method: Literal["listdir"] ) -> tuple[Callable[..., Awaitable[list[str]]], str, str]: ... @overload def _get_method(self, path: str, method: Literal["exists"]) -> tuple[Callable[..., Awaitable[bool]], str, str]: ... @overload def _get_method(self, path: str, method: Literal["remove"]) -> tuple[Callable[..., Awaitable[bool]], str, str]: ... def _get_method( self, path: str, method: Literal["find", "listdir", "exists", "remove"] ) -> tuple[Callable[..., Any], str, str]: # split root klass, root, path = self._get_class(path) # get find method return getattr(klass, method), root, path
[docs] async def find(self, path: str, pattern: str) -> list[str]: """Find a file in the given path. Args: path: Path to search in. pattern: Pattern to search for. Returns: List of found files. """ # get method find, root, path = self._get_method(path, "find") # and call it return await find(path, pattern, **self._roots[root])
[docs] async def listdir(self, path: str) -> list[str]: """Find a file in the given path. Args: path: Path to search in. pattern: Pattern to search for. Returns: List of found files. """ # get method listdir, root, path = self._get_method(path, "listdir") # and call it return await listdir(path, **self._roots[root])
[docs] async def exists(self, path: str) -> bool: """Checks, whether a given path or file exists. Args: path: Path to check. Returns: Whether it exists or not """ # get method exists, root, path = self._get_method(path, "exists") # and call it return await exists(path, **self._roots[root])
[docs] async def remove(self, path: str) -> bool: """Removes file with given path. Args: path: Path to delete. Returns: Success of deletion. """ # get method remove, root, path = self._get_method(path, "remove") # and call it return await remove(path, **self._roots[root])
__all__ = ["VirtualFileSystem", "VFSFile"]