Source code for pyobs.modules.utils.kiosk

import asyncio
import io
import logging
from typing import Union, Any, Optional
import numpy as np
from aiohttp import web

from pyobs.interfaces import ICamera, IExposureTime, IWindow
from pyobs.modules import Module
from pyobs.interfaces import IStartStop

log = logging.getLogger(__name__)


class Kiosk(Module, IStartStop):
    """A kiosk mode for a pyobs camera that takes images and published them via HTTP."""

    __module__ = "pyobs.modules.utils"

    def __init__(self, camera: Union[ICamera, str], port: int = 37077, **kwargs: Any):
        """Initializes file cache.

        Args:
            camera: Camera to use for kiosk mode.
            port: Port for HTTP server.
        """
        Module.__init__(self, **kwargs)

        # add thread funcs
        self.add_background_task(self._camera_thread)

        # store stuff
        self._is_listening = False
        self._camera = camera
        self._port = port
        self._exp_time = 2
        self._running = False
        self._image: Optional[bytes] = None

        # create empty image
        from PIL import Image, ImageDraw

        img = Image.new("RGB", (300, 300), color=(0, 0, 0))
        d = ImageDraw.Draw(img)
        d.text((110, 150), "No image taken yet", fill=(255, 255, 255))

        # create image from data array
        with io.BytesIO() as bio:
            img.save(bio, format="jpeg")
            self._empty = bio.getvalue()

        # define web server
        self._app = web.Application()
        self._app.add_routes([web.get("/image.jpg", self.image_handler)])
        self._runner = web.AppRunner(self._app)
        self._site: Optional[web.TCPSite] = None

[docs] async def open(self) -> None: """Open server""" await Module.open(self) # start listening log.info("Starting HTTP server on port %d...", self._port) await self._runner.setup() self._site = web.TCPSite(self._runner, "localhost", self._port) await self._site.start() self._is_listening = True
[docs] async def close(self) -> None: """Close server""" await Module.close(self) # stop server await self._runner.cleanup()
[docs] async def image_handler(self, request: web.Request) -> web.Response: """Handles access to /* and returns a specified image. Args: request: Request to respond to. Returns: Response containing image. """ # get image image = self._empty if self._image is None else self._image # send it return web.Response(body=image, content_type="image/fits")
@property def opened(self) -> bool: """Whether the server is started.""" return self._is_listening
[docs] async def start(self, **kwargs: Any) -> None: """Start kiosk mode.""" self._running = True
[docs] async def stop(self, **kwargs: Any) -> None: """Stop kiosk mode.""" self._running = False
[docs] async def is_running(self, **kwargs: Any) -> bool: """Whether kiosk mode is running.""" return self._running
async def _camera_thread(self) -> None: """Thread for taking images.""" # loop until closing while True: # are we running? if not self._running: # no, so wait a little and continue await asyncio.sleep(1) continue # get camera try: camera = await self.proxy(self._camera, ICamera) except ValueError: await asyncio.sleep(10) continue # do settings if isinstance(camera, IExposureTime): # set exposure time await camera.set_exposure_time(self._exp_time) if isinstance(camera, IWindow): # set full frame full_frame = await camera.get_full_frame() await camera.set_window(*full_frame) # do exposure filename = await camera.grab_data(False) # download image try: image = await self.vfs.read_image(filename) except FileNotFoundError: continue # convert it to JPEG self._image = await asyncio.get_running_loop().run_in_executor(None, image.to_jpeg) # adjust exposure time? if isinstance(camera, IExposureTime): # get max value in image max_val = np.max(image.data) # adjust self._exp_time = self._exp_time / max_val * 40000 # cut self._exp_time = max(self._exp_time, 30) __all__ = ["Kiosk"]