import logging
import asyncio
from typing import Union, List, Any, Optional
from pyobs.modules import Module
from pyobs.events import NewImageEvent, Event
from pyobs.utils.fits import format_filename
log = logging.getLogger(__name__)
class ImageWriter(Module):
"""Writes new images to disk."""
__module__ = "pyobs.modules.image"
def __init__(
self, filename: str = "/archive/{FNAME}", sources: Optional[Union[str, List[str]]] = None, **kwargs: Any
):
"""Creates a new image writer.
Args:
filename: Pattern for filename to store images at.
sources: List of sources (e.g. cameras) to process images from or None for all.
"""
Module.__init__(self, **kwargs)
# add thread func
self.add_background_task(self._worker, True)
# variables
self._filename = filename
self._sources = [sources] if isinstance(sources, str) else sources
self._queue = asyncio.Queue()
[docs] async def open(self) -> None:
"""Open image writer."""
await Module.open(self)
# subscribe to channel with new images
if self.comm is not None:
log.info("Subscribing to new image events...")
await self.comm.register_event(NewImageEvent, self.process_new_image_event)
[docs] async def process_new_image_event(self, event: Event, sender: str) -> bool:
"""Puts a new images in the DB with the given ID.
Args:
event: New image event
sender: Who sent the event?
Returns:
Success
"""
if not isinstance(event, NewImageEvent):
return False
# filter by source
if self._sources is not None and sender not in self._sources:
return False
# queue file
log.info("Received new image event from %s.", sender)
self._queue.put_nowait(event.filename)
return True
async def _worker(self) -> None:
"""Worker thread."""
# run forever
while True:
# get next filename
filename = await self._queue.get()
try:
# download image
log.info("Downloading file %s...", filename)
img = await self.vfs.read_image(filename)
except FileNotFoundError:
log.error("Could not download image.")
continue
# output filename
try:
output = format_filename(img.header, self._filename)
except KeyError as e:
log.error("Could not format filename: %s", e)
continue
try:
# open output
log.info("Storing image as %s...", output)
await self.vfs.write_image(output, img)
except Exception:
log.error("Could not store image.")
__all__ = ["ImageWriter"]