import asyncio
import logging
from typing import Tuple, Any, Dict, List, Optional
import astropy.units as u
from pyobs.events import BadWeatherEvent, GoodWeatherEvent
from pyobs.interfaces import IWeather, IFitsHeaderBefore
from pyobs.modules import Module
from pyobs.modules.weather.weather_api import WeatherApi
from pyobs.modules.weather.weather_state import WeatherState
from pyobs.utils.enums import WeatherSensors
from pyobs.utils.time import Time
log = logging.getLogger(__name__)
FITS_HEADERS = {
WeatherSensors.TEMPERATURE: ("WS-TEMP", "Ambient temperature average during exposure, C", float),
WeatherSensors.HUMIDITY: ("WS-HUMID", "Ambient rel. humidity average, %", float),
WeatherSensors.PRESSURE: ("WS-PRESS", "Average atmospheric pressure, hPa", float),
WeatherSensors.WINDDIR: ("WS-AZ", "Average wind direction, not corrected for overlap, deg", float),
WeatherSensors.WINDSPEED: ("WS-WIND", "Ambient average wind speed, km/h", float),
WeatherSensors.RAIN: ("WS-PREC", "Ambient precipitation [0/1]", bool),
WeatherSensors.SKYTEMP: ("WS-SKY", "Average sky temperature, C", float),
WeatherSensors.DEWPOINT: ("WS-TDEW", "Ambient dewpoint average during expsoure, C", float),
WeatherSensors.PARTICLES: ("WS-DUST", "Average particle count during exposure, ppcm", float),
WeatherSensors.SKYMAG: ("SKYMAG", "Sky brightness, mag/arcsec^2", float),
}
class Weather(Module, IWeather, IFitsHeaderBefore):
"""Connection to pyobs-weather."""
__module__ = "pyobs.modules.weather"
def __init__(self, url: str, system_init_time: int = 300, **kwargs: Any):
"""Initialize a new pyobs-weather connector.
Args:
url: URL to weather station
system_init_time: Time in seconds the full system needs to initialize
"""
Module.__init__(self, **kwargs)
# store and create session
self._system_init_time = system_init_time
self._api = WeatherApi(url)
# whether module is active, i.e. if None, weather is always good
self._active = True
# whole status
self._weather = WeatherState()
# add thread func
self.add_background_task(self._run, True)
[docs]
async def open(self) -> None:
"""Open module."""
await Module.open(self)
# subscribe to events
if self._comm:
await self.comm.register_event(BadWeatherEvent)
await self.comm.register_event(GoodWeatherEvent)
[docs]
async def start(self, **kwargs: Any) -> None:
"""Starts a service."""
# did status change and weather is now bad?
if not self._active and not self._weather.is_good:
# send event!
await self.comm.send_event(BadWeatherEvent())
# activate
self._active = True
[docs]
async def stop(self, **kwargs: Any) -> None:
"""Stops a service."""
self._active = False
[docs]
async def is_running(self, **kwargs: Any) -> bool:
"""Whether a service is running."""
return self._active
async def _run(self) -> None:
while True:
await self._loop()
async def _loop(self) -> None:
try:
await self._update()
except:
sleep = 60
else:
sleep = 5
await asyncio.sleep(sleep)
async def _update(self) -> None:
"""Update weather info."""
was_good = self._weather.is_good
try:
self._weather.status = await self._api.get_current_status()
except Exception as e:
log.warning("Request failed: %s", str(e))
self._weather.is_good = False # on error, we're always bad
if was_good != self._weather.is_good and self._active:
if self._weather.is_good:
log.info("Weather is now good.")
eta = self._calc_system_init_eta()
await self.comm.send_event(GoodWeatherEvent(eta=eta))
else:
log.info("Weather is now bad.")
await self.comm.send_event(BadWeatherEvent())
def _calc_system_init_eta(self) -> Time:
return Time(Time.now() + self._system_init_time * u.second)
[docs]
async def get_weather_status(self, **kwargs: Any) -> Dict[str, Any]:
"""Returns status of object in form of a dictionary. See other interfaces for details."""
raise NotImplementedError
[docs]
async def is_weather_good(self, **kwargs: Any) -> bool:
"""Whether the weather is good to observe."""
# if not active, weather is always good
if not self._active:
return True
return self._weather.is_good
[docs]
async def get_current_weather(self, **kwargs: Any) -> Dict[str, Any]:
"""Returns current weather.
Returns:
Dictionary containing entries for time, good, and sensor, with the latter being another dictionary
with sensor information, which contain a value and a good flag.
"""
return self._weather.status
[docs]
async def get_sensor_value(self, station: str, sensor: WeatherSensors, **kwargs: Any) -> Tuple[str, float]:
"""Return value for given sensor.
Args:
station: Name of weather station to get value from.
sensor: Name of sensor to get value from.
Returns:
Tuple of current value of given sensor or None and time of measurement or None.
"""
# do request
status = await self._api.get_sensor_value(station, sensor)
# to json
if "time" not in status or "value" not in status:
raise ValueError("Time and/or value parameters not found in response from weather station.")
# return time and value
return status["time"], status["value"]
__all__ = ["Weather"]