Source code for pyobs_weather.weather.stations.mysql

import logging
import MySQLdb
import pytz
from astropy.time import Time, TimeDelta
import astropy.units as u

from pyobs_weather.weather.models import Sensor, SensorType
from .station import WeatherStation

log = logging.getLogger(__name__)


[docs]class MySQL(WeatherStation): """The MySQL weather station reads current weather information from a MySQL database.""" def __init__(self, connect, table, fields, time: str = 'time', time_offset: int = 0, *args, **kwargs): """Creates a new Database station. A typical JSON configuration for this weather station might look like this: | { | "connect": { | "host": "database.com", | "user": "weather", | "password": "weather", | "db": "weather" | }, | "table": "weather_table", | "time": "sensor_datetime", | "time_offset": -7200, | "fields": { | "temp": {"code": "temp", "name": "Temperature", "unit": "°C"}, | "rel_hum": {"code": "humid", "name": "Relative humidity", "unit": "%"}, | "temp_dew": {"code": "dewpoint", "name": "Dew point", "unit": "°C"}, | "windsp": {"code": "windspeed", "name": "Wind speed", "unit": "km/h"}, | "winddir": {"code": "winddir", "name": "Wind direction", "unit": "°E of N"}, | "pressure": {"code": "press", "name": "Pressure", "unit": "hPa"} | } | } Args: connect: Dictionary with fields required by MySQLd.connect table: Database table. time: Name of column containing time. fields: Dictionary with field->SensorType data, e.g. {column: {code="temp", name="Temperature", unit="C"}} column: table column code: field code name: field name unit: field unit bool_true: if given, evaluate value to 1 if equal to this, otherwise 0 bool_false: if given, evaluate value to 0 if equal to this, otherwise 1 time_offset: Offset in seconds to add to current time to get UTC.""" WeatherStation.__init__(self, *args, **kwargs) self.connect = connect self.table = table self.time = time self.time_offset = time_offset self.fields = fields
[docs] def create_sensors(self): """Entry point for creating sensors for this station. New sensors are created based on the configuration.""" # loop all fields for field, typ in self.fields.items(): if 'name' in typ and 'unit' in typ: # get or create sensor type sensor_type, _ = SensorType.objects.get_or_create(code=typ['code'], name=typ['name'], unit=typ['unit']) else: sensor_type = self._add_sensor(typ['code']) # get or create sensor Sensor.objects.get_or_create(station=self._station, type=sensor_type)
[docs] def update(self): """Entry point for updating sensor values for this station. This method connects to the database and reads the latest values.""" log.info('Updating Database station %s...' % self._station.code) # connect to DB db = MySQLdb.connect(**self.connect) # get all columns to query columns = [self.time] + list(self.fields.keys()) # build query sql = 'SELECT %s FROM %s ORDER BY %s DESC LIMIT 1' % (','.join(columns), self.table, self.time) # and query cur = db.cursor() cur.execute(sql) row = cur.fetchone() cur.close() # nothing? if row is None: logging.error('Result from database is empty.') return # evaluate row time = (Time(row[0]) + TimeDelta(self.time_offset * u.second)).to_datetime(pytz.UTC) # other values for cfg, value in zip(self.fields.values(), row[1:]): # boolean? if 'bool_true' in cfg: value = 1 if cfg['bool_true'] == value else 0 elif 'bool_false' in cfg: value = 0 if cfg['bool_false'] == value else 1 # add value self._add_value(cfg['code'], time, value)
__all__ = ['MySQL']