Source code for pyobs.vfs.httpfile

import io
import uuid
from typing import Any
from urllib.parse import urljoin
import logging
import aiohttp

from .bufferedfile import BufferedFile

log = logging.getLogger(__name__)


class HttpFile(BufferedFile):
    """Wraps a file on a HTTP server that can be accessed via GET/POST.
    Especially useful in combination with :class:`~pyobs.modules.utils.HttpFileCache`."""

    __module__ = "pyobs.vfs"

    def __init__(
        self,
        name: str,
        mode: str = "r",
        download: str | None = None,
        upload: str | None = None,
        username: str | None = None,
        password: str | None = None,
        verify_tls: bool = False,
        timeout: int = 30,
        **kwargs: Any,
    ):
        """Creates a new HTTP file.

        Args:
            name: Name of file.
            mode: Open mode (r/w).
            download: Base URL for downloading files. If None, no read access possible.
            upload: Base URL for uploading files. If None, no write access possible.
            username: Username for accessing the HTTP server.
            password: Password for accessing the HTTP server.
            verify_tls: Whether to verify TLS certificates.
            timeout: Timeout in seconds for uploading/downloading files.
        """

        # init
        io.RawIOBase.__init__(self)
        BufferedFile.__init__(self)
        self._verify_tls = verify_tls
        self._timeout = aiohttp.ClientTimeout(total=timeout)

        # auth
        self._auth = None
        if username is not None and password is not None:
            self._auth = aiohttp.BasicAuth(username, password)

        # filename is not allowed to start with a / or contain ..
        if name.startswith("/") or ".." in name:
            raise ValueError("Only files within root directory are allowed.")

        # build filename
        self.filename = name
        self.mode = mode
        self._pos = 0
        self._open = True

        # URLs given?
        self._download_path = download
        self._upload_path = upload
        if "r" in self.mode and self._download_path is None:
            raise ValueError("No download URL given.")
        if "w" in self.mode and self._upload_path is None:
            raise ValueError("No upload URL given.")

        # clear cache on write?
        if "w" in self.mode:
            self._clear_buffer(self.filename)

    @property
    def url(self) -> str:
        """Returns URL of file."""
        if self._download_path is None:
            raise ValueError("No download URL given.")
        return urljoin(self._download_path, self.filename)

    async def _download(self) -> None:
        """For read access, download the file into a local buffer.

        Raises:
            FileNotFoundError: If file could not be found.
        """

        # do request
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url, auth=self._auth, timeout=self._timeout) as response:
                # check response
                if response.status == 200:
                    # get data and return it
                    self._set_buffer(self.filename, await response.read())
                elif response.status == 401:
                    log.error("Wrong credentials for downloading file.")
                    raise FileNotFoundError
                else:
                    log.error("Could not download file from filecache.")
                    raise FileNotFoundError

[docs] async def read(self, n: int = -1) -> str | bytes: """Read number of bytes from stream. Args: n: Number of bytes to read. Read until end, if -1. Returns: Read bytes. """ # load file if not self._buffer_exists(self.filename): await self._download() buf = self._buffer(self.filename) # check size if n == -1: data = buf self._pos = len(buf) - 1 else: # extract data to read data = buf[self._pos : self._pos + n] self._pos += n # return data return data
[docs] async def write(self, s: str | bytes) -> None: """Write data into the stream. Args: s: Bytes of data to write. """ self._append_to_buffer(self.filename, s)
[docs] async def close(self) -> None: """Close stream.""" # write it? if "w" in self.mode and self._open: await self._upload() # clear buffer self._clear_buffer(self.filename) # set flag self._open = False
async def _upload(self) -> None: """If in write mode, actually send the file to the HTTP server.""" # filename given? filename = str(uuid.uuid4()) if self.filename is None else self.filename # check if self._upload_path is None: raise ValueError("No upload URL given.") # send data and return image ID async with aiohttp.ClientSession() as session: data = aiohttp.FormData() data.add_field("file", self._buffer(filename), filename=filename) async with session.post(self._upload_path, auth=self._auth, data=data, timeout=self._timeout) as response: if response.status == 401: log.error("Wrong credentials for uploading file.") raise FileNotFoundError elif response.status != 200: log.error(f"Could not upload file to filecache: {response.status} {response.reason}") raise FileNotFoundError __all__ = ["HttpFile"]