Source code for pyobs.modules.camera.basevideo

from abc import ABCMeta
from collections.abc import Coroutine
from datetime import datetime
import io
import logging
import time
import asyncio
from typing import Dict, Any, Tuple, NamedTuple, Optional, List
import numpy as np
import PIL.Image
from aiohttp import web
from numpy.typing import NDArray

from pyobs.modules import Module, timeout
from pyobs.interfaces import IVideo, IImageType, IExposureTime
from pyobs.events import NewImageEvent
from pyobs.images import Image
from pyobs.mixins.fitsheader import ImageFitsHeaderMixin
from pyobs.utils.cache import DataCache
from pyobs.utils.enums import ImageType

log = logging.getLogger(__name__)

INDEX_HTML = """
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <title>Title</title>
  </head>
  <body>
    <img src="/video.mjpg" width="100%">
  </body>
</html>

"""


async def calc_expose_timeout(webcam: IExposureTime, *args: Any, **kwargs: Any) -> float:
    """Calculates timeout for grabe_image()."""
    if hasattr(webcam, "get_exposure_time"):
        return 2.0 * await webcam.get_exposure_time() + 30.0
    else:
        return 30.0


class NextImage(NamedTuple):
    date_obs: str
    image_type: ImageType
    header_futures: Dict[str, Coroutine[Dict[str, Tuple[Any, str]], None, None]]
    broadcast: bool


class ImageRequest:
    def __init__(self, broadcast: bool = True):
        self.broadcast: bool = broadcast
        self.image: Optional[Image] = None
        self.filename: Optional[str] = None


class LastImage(NamedTuple):
    data: NDArray[Any]
    image: Optional[Image]
    jpeg: Optional[bytes]
    filename: Optional[str]


class BaseVideo(Module, ImageFitsHeaderMixin, IVideo, IImageType, metaclass=ABCMeta):
    """Base class for all webcam modules."""

    __module__ = "pyobs.modules.camera"

    def __init__(
        self,
        http_port: int = 37077,
        interval: float = 0.5,
        video_path: str = "/webcam/video.mjpg",
        filenames: str = "/webcam/pyobs-{DAY-OBS|date:}-{FRAMENUM|string:04d}.fits",
        fits_namespaces: Optional[List[str]] = None,
        fits_headers: Optional[Dict[str, Any]] = None,
        centre: Optional[Tuple[float, float]] = None,
        rotation: float = 0.0,
        cache_size: int = 5,
        live_view: bool = True,
        flip: bool = False,
        sleep_time: int = 600,
        **kwargs: Any,
    ):
        """Creates a new BaseWebcam.

        On the receiving end, a VFS root with a HTTPFile must exist with the same name as in image_path and video_path,
        i.e. "webcam" in the default settings.

        Args:
            http_port: HTTP port for webserver.
            exposure_time: Initial exposure time.
            interval: Min interval for grabbing images.
            video_path: VFS path to video.
            filename: Filename pattern for FITS images.
            fits_namespaces: List of namespaces for FITS headers that this camera should request.
            fits_headers: Additional FITS headers.
            centre: (x, y) tuple of camera centre.
            rotation: Rotation east of north.
            cache_size: Size of cache for previous images.
            live_view: If True, live view is served via web server.
            flip: Whether to flip around Y axis.
            sleep_time: Time in s with inactivity after which the camera should go to sleep.
        """
        Module.__init__(self, **kwargs)
        ImageFitsHeaderMixin.__init__(
            self,
            fits_namespaces=fits_namespaces,
            fits_headers=fits_headers,
            centre=centre,
            rotation=rotation,
            filenames=filenames,
        )

        # store
        self._is_listening = False
        self._port = http_port
        self._interval = interval
        self._new_image_event = asyncio.Event()
        self._video_path = video_path
        self._frame_num = 0
        self._live_view = live_view
        self._image_type = ImageType.OBJECT
        self._image_request_lock = asyncio.Lock()
        self._image_requests: List[ImageRequest] = []
        self._next_image: Optional[NextImage] = None
        self._last_image: Optional[LastImage] = None
        self._last_time = 0.0
        self._flip = flip
        self._sleep_time = sleep_time

        # active
        self._active = False
        self._active_time = 0.0
        self.add_background_task(self._active_update)

        # image cache
        self._cache = DataCache(cache_size)

        # define web server
        self._app = web.Application()
        self._app.add_routes(
            [
                web.get("/", self.web_handler),
                web.get("/video.mjpg", self.video_handler),
                web.get("/{filename}", self.image_handler),
            ]
        )
        self._runner = web.AppRunner(self._app)
        self._site: Optional[web.TCPSite] = None

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) # start listening log.info("Starting HTTP file cache on port %d...", self._port) await self._runner.setup() self._site = web.TCPSite(self._runner, "0.0.0.0", self._port) await self._site.start() self._is_listening = True
[docs] async def close(self) -> None: """Close server""" await Module.close(self) # stop server await self._runner.cleanup()
@property def opened(self) -> bool: """Whether the server is started.""" return self._is_listening
[docs] async def web_handler(self, request: web.Request) -> web.Response: """Handles access to / and returns HTML page. Args: request: Request to respond to. Returns: Response containing web page. """ return web.Response(text=INDEX_HTML, content_type="text/html")
[docs] async def video_handler(self, request: web.Request) -> web.StreamResponse: """Handles access to /video.mjpg and returns the video. Args: request: Request to respond to. Returns: Response containing video stream. """ # create response response = web.StreamResponse() response.content_type = "multipart/x-mixed-replace; boundary=--jpgboundary" await response.prepare(request) last_num = None last_time = 0.0 interval = 1.0 while True: # not reached interval? if time.time() < last_time + interval: await asyncio.sleep(0.01) continue # get image num, image = await self.image_jpeg() if image is None: await asyncio.sleep(0.01) continue # is it actually a new image? if num == last_num: await asyncio.sleep(0.01) continue # now send image! last_num = num last_time = time.time() await response.write(b"--jpgboundary\r\nContent-type: image/jpeg\r\n\r\n" + image + b"\r\n") # return response return response
[docs] async def image_handler(self, request: web.Request) -> web.Response: """Handles access to /* and returns a specified image. Args: request: Request to respond to. Returns: Response containing image. """ # get filename filename = request.match_info["filename"] # get data if filename not in self._cache: raise web.HTTPNotFound() data = self._cache[filename] # send it log.info(f"Serving file {filename}.") return web.Response(body=data, content_type="image/fits")
@property def camera_active(self) -> bool: """Whether camera is currently active.""" return self._active
[docs] async def activate_camera(self) -> None: """Activate camera.""" self._active_time = time.time() if not self._active: await self._activate_camera() self._active = True
[docs] async def deactivate_camera(self) -> None: """Deactivate camera.""" self._active_time = 0 if self._active: await self._deactivate_camera() self._active = False
async def _activate_camera(self) -> None: """Can be overridden by derived class to implement inactivity sleep""" pass async def _deactivate_camera(self) -> None: """Can be overridden by derived class to implement inactivity sleep""" pass async def _active_update(self) -> None: """Checking active status regularly.""" self._active_time = time.time() while True: # go to sleep? if time.time() - self._active_time > self._sleep_time and self._active: await self.deactivate_camera() # wait a little for next check await asyncio.sleep(1)
[docs] async def image_jpeg(self) -> Tuple[Optional[int], Optional[bytes]]: """Return image as jpeg.""" # activate camera, first image will most probably be None await self.activate_camera() # return what we got return self._frame_num, None if self._last_image is None else self._last_image.jpeg
[docs] @staticmethod def create_jpeg(data: NDArray[Any]) -> bytes: """Create a JPEG ge from a numpy array and return as bytes. Args: data: Numpy array to convert to JPEG. Returns: Bytes containing JPEG image. """ # uint16? if data.dtype == np.uint16: # TODO: find a better way to convert to uint8 data = (data / 256).astype(np.uint8) # write to jpeg with io.BytesIO() as output: PIL.Image.fromarray(data).save(output, format="jpeg") return output.getvalue()
async def _set_image(self, data: NDArray[Any]) -> None: """Create FITS and JPEG images from data.""" # flip image? if self._flip: data: NDArray[Any] = np.flip(data, axis=0) # type: ignore # got a requested image in the queue? image, filename = None, None if self._next_image is not None: # create image and reset image, filename = await self._create_image(data, self._next_image) self._next_image = None async with self._image_request_lock: for req in self._image_requests: req.image = image req.filename = filename # convert to jpeg only if we need live view now = time.time() jpeg = None if self._live_view: # check interval if now - self._last_time > self._interval: # write to buffer and reset interval loop = asyncio.get_running_loop() jpeg = await loop.run_in_executor(None, self.create_jpeg, data) self._last_time = now # store both self._last_image = LastImage(data=data, image=image, jpeg=jpeg, filename=filename) self._frame_num += 1 # signal it self._new_image_event.set() self._new_image_event = asyncio.Event() # prepare next image if len(self._image_requests) > 0: # broadcast? broadcast = any([req.broadcast for req in self._image_requests]) # store everything logging.info("Preparing to catch next image...") self._next_image = NextImage( date_obs=datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f"), image_type=self._image_type, header_futures=await self.request_fits_headers(), broadcast=broadcast, ) # reset self._image_request = None async def _create_image(self, data: NDArray[Any], next_image: NextImage) -> Tuple[Image, str]: """Create an Image object from numpy array. Args: data: Numpy array to convert to Image. Returns: Tuple with image itself and the filename. """ # create image flipped: NDArray[Any] = np.flip(data, axis=0) # type: ignore image = Image(flipped) image.header["DATE-OBS"] = next_image.date_obs image.header["IMAGETYP"] = next_image.image_type.value # add fits headers and format filename await self.add_requested_fits_headers(image, next_image.header_futures) await self.add_fits_headers(image) # finish it up return await self._finish_image(image, next_image.broadcast, next_image.image_type) async def _finish_image(self, image: Image, broadcast: bool, image_type: ImageType) -> Tuple[Image, str]: """Finish up an image at the end of _create_image. Args: image: Image to finish up. broadcast: Whether to broadcast it. image_type: Type of image. Returns: Tuple with image itself and the filename. """ # format filename filename = self.format_filename(image) if filename is None: filename = "image.fits" # store it and return filename log.info("Writing image %s to cache...", filename) self._cache[image.header["FNAME"]] = image.to_bytes() # broadcast image path if broadcast and self.comm: log.info("Broadcasting image ID...") await self.comm.send_event(NewImageEvent(filename, image_type)) # finished return image, filename
[docs] @timeout(calc_expose_timeout) async def grab_image(self, broadcast: bool = True, **kwargs: Any) -> str: """Grabs an image ans returns reference. Args: broadcast: Broadcast existence of image. Returns: Name of image that was taken. """ # activate camera await self.activate_camera() # acquire lock async with self._image_request_lock: # request new image image_request = ImageRequest(broadcast) self._image_requests.append(image_request) # we want an image that starts exposing AFTER now, so we wait for the current image to finish. log.info("Waiting for image to finish...") while image_request.image is None: await asyncio.sleep(0.1) # remove from list self._image_requests.remove(image_request) # no image? if image_request.image is None or image_request.filename is None: raise ValueError("Could not take image.") # finished return image_request.filename
[docs] async def get_video(self, **kwargs: Any) -> str: """Returns path to video. Returns: Path to video. """ return self._video_path
[docs] async def set_image_type(self, image_type: ImageType, **kwargs: Any) -> None: """Set the image type. Args: image_type: New image type. """ log.info("Setting image type to %s...", image_type) self._image_type = image_type
[docs] async def get_image_type(self, **kwargs: Any) -> ImageType: """Returns the current image type. Returns: Current image type. """ return self._image_type
__all__ = ["BaseVideo", "NextImage"]