Source code for pyobs.modules.image.imagewatcher

import glob
import logging
import os
import asyncio
from typing import Any, Optional, List
from astropy.io import fits

from pyobs.modules import Module
from pyobs.utils.fits import format_filename

log = logging.getLogger(__name__)


class ImageWatcher(Module):
    """Watch for new images and write them to all given destinations.

    Watches a path for new images and stores them in all given destinations. Only if all operations were successful,
    the file is deleted.
    """

    __module__ = "pyobs.modules.image"

    def __init__(self, watchpath: Optional[str] = None, destinations: Optional[List[str]] = None, **kwargs: Any):
        """Create a new image watcher.

        Args:
            watchpath: Path to watch.
            destinations: Filename patterns for destinations.
        """
        Module.__init__(self, **kwargs)

        # test import
        import pyinotify

        # add thread func
        self.add_background_task(self._worker)

        # variables
        self._watchpath = watchpath
        self._notifier = None
        self._queue = asyncio.Queue[str]()

        # filename patterns
        if not destinations:
            raise ValueError("No filename patterns given for the destinations.")
        self._destinations = destinations

[docs] async def open(self) -> None: """Open module.""" await Module.open(self) import pyinotify class EventHandler(pyinotify.ProcessEvent): """Event handler for file watcher.""" def __init__(self, main, *args: Any, **kwargs: Any) -> None: """Create event handler.""" pyinotify.ProcessEvent.__init__(self, *args, **kwargs) self.main = main def process_IN_CLOSE_WRITE(self, event: Any) -> None: """React to IN_CLOSE_WRITE events.""" self.main.add_image(event.pathname) # start watching directory if self._watchpath: log.info("Start watching directory %s for changes...", self._watchpath) wm = pyinotify.WatchManager() wm.add_watch(self._watchpath, pyinotify.IN_CLOSE_WRITE) self._notifier = pyinotify.ThreadedNotifier(wm, default_proc_fun=EventHandler(self)) # , name='observer') self._notifier.start()
[docs] async def close(self) -> None: """Close image watcher.""" await Module.close(self) # stop watching if self._notifier: log.info("Stop watching directory...") self._notifier.stop()
[docs] def add_image(self, filename: str) -> None: """Add an image to the image database. Args: filename (str): Local filename of new image. """ # log file log.info("Adding new image %s...", filename) self._queue.put_nowait(filename)
async def _clear_queue(self) -> None: """Clear the queue with new files.""" # clear queue while not self._queue.empty(): await self._queue.get() async def _worker(self) -> None: """Worker thread.""" # first, add all files from directory to queue await self._clear_queue() for filename in sorted(glob.glob(os.path.join(self._watchpath, "*"))): self.add_image(filename) # run forever while True: # get next filename filename = await self._queue.get() log.info("Working on file %s...", filename) # better safe than sorry try: # open file fits_file = fits.open(filename) # loop archive and upload success = True for pattern in self._destinations: # create filename out_filename = format_filename(fits_file["SCI"].header, pattern) if out_filename is None: raise ValueError("Could not create name for file.") # store it log.info("Storing file as %s...", out_filename) try: await self.vfs.write_fits(out_filename, fits_file) except: log.exception("Error while copying file, skipping for now.") success = False # no success_ if not success: continue # close and delete files log.info("Removing file from watch directory...") os.remove(filename) except: log.exception("Something went wrong.") __all__ = ["ImageWatcher"]