import fnmatch
import glob
import logging
import os
import asyncio
from dataclasses import dataclass
from pathlib import PurePosixPath
from typing import Any, Optional, List, Tuple, AnyStr
from astropy.io import fits
from pyobs.modules import Module
from pyobs.utils.fits import format_filename
log = logging.getLogger(__name__)
@dataclass
class CurrentFile:
filename: str
data: AnyStr
out_filename: Optional[str] = None
hdu_list: Optional[fits.HDUList] = None
class ImageWatcher(Module):
"""Watch for new files and write them to all given destinations.
Watches a path for new files and stores them in all given destinations. Only if all operations were successful,
the file is deleted.
"""
__module__ = "pyobs.modules.image"
def __init__(
self,
watchpath: str,
destinations: Optional[List[str]] = None,
poll: bool = False,
poll_interval: int = 5,
wait_time: int = 10,
pattern: str = "*",
**kwargs: Any,
):
"""Create a new image watcher.
Args:
watchpath: Path to watch.
destinations: Filename patterns for destinations.
poll: If True, watchpath is polled instead of watched by inotify.
poll_interval: Interval for polling in seconds, if poll is True.
wait_time: Time in seconds between adding file to list and processing it.
"""
Module.__init__(self, **kwargs)
# add thread func
self.add_background_task(self._worker)
if poll:
self.add_background_task(self._watch_poll)
else:
self.add_background_task(self._watch_inotify)
# variables
self._watchpath = watchpath
self._notifier: Optional[Any] = None
self._queue = asyncio.Queue[Tuple[str, asyncio.Future[None]]]()
self._poll = poll
self._poll_interval = poll_interval
self._wait_time = wait_time
self._pattern = pattern
self.current_file: Optional[CurrentFile] = None
# filename patterns
if not destinations:
raise ValueError("No filename patterns given for the destinations.")
self._destinations = destinations
async def _watch_inotify(self) -> None:
from asyncinotify import Inotify, Mask # type: ignore
# get local directory
local = await self.vfs.local_path(self._watchpath)
# Context manager to close the inotify handle after use
with Inotify() as inotify:
# add watch on local directory
inotify.add_watch(local, Mask.CLOSE_WRITE)
# iterate events forever
async for event in inotify:
# get filename by replacing local with watchpath
filename = str(event.path).replace(local, self._watchpath)
# add file
self.add_file(filename)
async def _watch_poll(self) -> None:
# init list
files = set(await self.vfs.listdir(self._watchpath))
# run forever
path = PurePosixPath(self._watchpath)
while True:
# get new list
new_files = await self.vfs.listdir(self._watchpath)
# find all new files and add them
for f in new_files:
if f not in files:
print(str(path / f))
self.add_file(str(path / f))
# store new list
files = set(new_files)
[docs] async def open(self) -> None:
"""Open image watcher."""
await Module.open(self)
# add all files from directory to queue
for filename in await self.vfs.listdir(self._watchpath):
self.add_file(os.path.join(self._watchpath, filename))
[docs] async def close(self) -> None:
"""Close image watcher."""
await Module.close(self)
# stop watching
if self._notifier:
log.info("Stop watching directory...")
self._notifier.stop()
[docs] def add_file(self, filename: str) -> None:
"""Add a file to the file queue.
Args:
filename (str): Local filename of new file.
"""
# check pattern
if not fnmatch.fnmatch(filename, self._pattern):
return
# log and add file
log.info("Adding new file %s...", filename)
self._queue.put_nowait((filename, asyncio.create_task(asyncio.sleep(self._wait_time))))
async def _worker(self) -> None:
"""Worker thread."""
# run forever
while True:
# get next filename
filename, future = await self._queue.get()
# waiting for future, which is the wait time for new files
await future
log.info("Working on file %s...", filename)
# better safe than sorry
try:
# get file data
async with self.vfs.open_file(filename, "rb") as fd:
data = await fd.read()
# try to load as fits file
try:
fits_file = fits.HDUList.fromstring(data)
except:
fits_file = None
# fill current file
self.current_file = CurrentFile(filename=filename, data=data, hdu_list=fits_file)
# loop archive and upload
success = True
for pattern in self._destinations:
# if it contains {placeholders}, we assume it's a FITS file and format filename
if "{" in pattern and "}" in pattern and fits_file is not None:
# format filename
out_filename = format_filename(fits_file["SCI"].header, pattern)
if out_filename is None:
raise ValueError("Could not create name for file.")
else:
# no formatting, so just add filename to destination
out_filename = os.path.join(pattern, os.path.basename(filename))
# store it
log.info("Storing file as %s...", out_filename)
self.current_file.out_filename = out_filename
try:
async with self.vfs.open_file(out_filename, "wb") as fd:
await fd.write(data)
except:
log.warning("Error while copying file, skipping for now.")
success = False
break
# do extra processing
if not await self.process_extra(filename):
success = False
break
# no success?
if not success:
# re-queue file and skip file for now
self._queue.put_nowait((filename, asyncio.create_task(asyncio.sleep(self._wait_time))))
continue
# close and delete files
log.info("Removing file from watch directory...")
if not await self.vfs.remove(filename):
log.warning("Could not delete %s.", filename)
# cleanup extra
await self.cleanup_extra(filename)
except:
log.exception("Something went wrong.")
__all__ = ["ImageWatcher"]