Source code for pyobs.object

"""
:class:`~pyobs.object.Object` is the base for almost all classes in *pyobs*. It adds some convenience methods
and helper methods for creating other Objects.

There are a few convenience functions:

    - :func:`~pyobs.object.create_object` creates objects from dictionaries.
    - :func:`~pyobs.object.get_object` is a wrapper around :func:`pyobs.object.create_object` that can do further checks.
    - :func:`~pyobs.object.get_safe_object` is a wrapper around :func:`~pyobs.object.get_object` that never raises
      exceptions.
"""

from __future__ import annotations

import copy
import datetime
import inspect
from collections.abc import Coroutine
from typing import Union, Callable, TypeVar, Optional, Type, List, Tuple, Any, overload, TYPE_CHECKING, Literal
import logging
import pytz
from astroplan import Observer
from astropy.coordinates import EarthLocation
from pydantic import BaseModel

from pyobs.background_task import BackgroundTask
from pyobs.comm import Comm
from pyobs.comm.dummy import DummyComm

if TYPE_CHECKING:
    from pyobs.vfs import VirtualFileSystem

log = logging.getLogger(__name__)


"""Class of an Object."""
ObjectClass = TypeVar("ObjectClass")

"""Class of a pydantic model."""
PydanticModel = TypeVar("PydanticModel", bound=BaseModel)


"""Class of a proxy."""
ProxyType = TypeVar("ProxyType")


@overload
def get_object(
    config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass],
    object_class: type[ObjectClass],
    **kwargs: Any,
) -> ObjectClass: ...


@overload
def get_object(config_or_object: dict[str, Any], object_class: Literal[None], **kwargs: Any) -> Any: ...


@overload
def get_object(
    config_or_object: ObjectClass | type[ObjectClass], object_class: Literal[None], **kwargs: Any
) -> ObjectClass: ...


@overload
def get_object(
    config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass],
    object_class: type[ObjectClass] | None = None,
    **kwargs: Any,
) -> ObjectClass | Any: ...


[docs] def get_object( config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass], object_class: type[ObjectClass] | None = None, **kwargs: Any, ) -> ObjectClass | Any: """Creates object from config or returns object directly, both optionally after check of type. Args: config_or_object: A configuration dict or an object itself to create/check. If a dict with a class key is given, a new object is created. object_class: Class to check object against. Returns: (New) object (created from config) that optionally passed class check. Raises: TypeError: If the object does not match the given class. """ if config_or_object is None: raise TypeError("No config or object given.") elif isinstance(config_or_object, dict): # copy kwargs to config_or_object, so that we don't have any duplicates for k, v in kwargs.items(): config_or_object[k] = v # a dict is given, so create object obj = create_object(config_or_object) elif inspect.isclass(config_or_object): # config_or_object is a type, so create it using its constructor obj = config_or_object(**kwargs) else: # just use given object obj = config_or_object # do we need a type check and does the given object pass? if object_class is not None and not isinstance(obj, object_class): raise TypeError("Provided object is not of requested type %s." % object_class.__name__) return obj
@overload def get_safe_object( config_or_object: ObjectClass | dict[str, Any], object_class: type[ObjectClass], **kwargs: Any ) -> ObjectClass: ... @overload def get_safe_object(config_or_object: ObjectClass | Any, object_class: None, **kwargs: Any) -> Any | None: ...
[docs] def get_safe_object( config_or_object: dict[str, Any] | Any, object_class: type[ObjectClass] | None = None, **kwargs: Any ) -> ObjectClass | Any | None: """Calls get_object in a safe way and returns None, if an exceptions thrown. Args: config_or_object: A configuration dict or an object itself to create/check. If a dict with a class key is given, a new object is created. object_class: Class to check object against. Returns: (New) object (created from config) that optionally passed class check or None. """ try: return get_object(config_or_object, object_class, **kwargs) except Exception: return None
[docs] def get_class_from_string(class_name: str) -> Any: """Get class from a given string. Args: class_name: Name of class as string. Returns: Actual class. """ parts = class_name.split(".") module_name = ".".join(parts[:-1]) cls = __import__(module_name) for comp in parts[1:]: cls = getattr(cls, comp) return cls
[docs] def create_object(config: dict[str, Any], *args: Any, **kwargs: Any) -> Any: """Create object from dict config. Args: config: Config to create object from *args: Parameters to be passed to object. **kwargs: Parameters to be passed to object. Returns: Created object. """ # get class name class_name = config["class"] # create class klass = get_class_from_string(class_name) # remove class from kwargs cfg = copy.copy(config) del cfg["class"] # create object return klass(*args, **cfg, **kwargs)
[docs] class PrivateAttrMixin: _comm: Comm | None _vfs: VirtualFileSystem | None _observer: Observer | None _location: EarthLocation | None _timezone: datetime.tzinfo | None @property def comm(self) -> Comm: if self._comm is None: raise AttributeError("No comm available.") return self._comm @property def vfs(self) -> VirtualFileSystem: if self._vfs is None: raise AttributeError("No VFS available.") return self._vfs @property def observer(self) -> Observer: if self._observer is None: raise AttributeError("No Observer available.") return self._observer @property def location(self) -> EarthLocation: if self._location is None: raise AttributeError("No location available.") return self._location @property def timezone(self) -> datetime.tzinfo: if self._timezone is None: raise AttributeError("No timezone available.") return self._timezone
[docs] def pyobs_model_validate(self, cls: type[PydanticModel], *args: Any, **kwargs: Any) -> PydanticModel: """Validate a pydantic model with additional fields.""" return cls.model_validate( *args, context={ "comm": self._comm, "observer": self._observer, "vfs": self._vfs, "timezone": self._timezone, "location": self._location, }, **kwargs, )
[docs] class Object(PrivateAttrMixin): """Base class for all objects in *pyobs*.""" def __init__( self, vfs: VirtualFileSystem | dict[str, Any] | None = None, comm: Comm | dict[str, Any] | None = None, timezone: str | datetime.tzinfo | None = "utc", location: str | dict[str, Any] | EarthLocation | None = None, observer: Observer | None = None, **kwargs: Any, ): """ .. note:: Objects must always be opened and closed using :meth:`~pyobs.object.Object.open` and :meth:`~pyobs.object.Object.close`, respectively. This class provides a :class:`~pyobs.vfs.VirtualFileSystem`, a timezone and a location. From the latter two, an observer object is automatically created. Object also adds support for easily adding threads using the :meth:`~pyobs.object.Object.add_background_task` method as well as a watchdog thread that automatically restarts threads, if requested. Using :meth:`~pyobs.object.Object.add_child_object`, other objects can be (created an) attached to this object, which then automatically handles calls to :meth:`~pyobs.object.Object.open` and :meth:`~pyobs.object.Object.close` on those objects. Args: vfs: VFS to use (either object or config) comm: Comm object to use timezone: Timezone at observatory. location: Location of observatory, either a name or a dict containing latitude, longitude, and elevation. """ from pyobs.vfs import VirtualFileSystem # child objects self._child_objects: list[Any] = [] # create vfs self._vfs: VirtualFileSystem | None if vfs: self._vfs = get_object(vfs, VirtualFileSystem) else: self._vfs = VirtualFileSystem() # timezone self._timezone: datetime.tzinfo | None if isinstance(timezone, datetime.tzinfo): self._timezone = timezone elif isinstance(timezone, str): self._timezone = pytz.timezone(timezone) else: raise ValueError(f"Unknown format for timezone: {type(timezone)}") # location if location is None: self._location = None elif isinstance(location, EarthLocation): self._location = location elif isinstance(location, str): self._location = EarthLocation.of_site(location) elif isinstance(location, dict): self._location = EarthLocation.from_geodetic( location["longitude"], location["latitude"], location["elevation"] ) else: raise ValueError("Unknown format for location.") # create observer self._observer = observer if self._observer is None and self._location is not None and self._timezone is not None: log.info( "Setting location to longitude=%s, latitude=%s, and elevation=%s.", self._location.lon, self._location.lat, self._location.height, ) self._observer = Observer(location=self._location, timezone=timezone) # comm object self._comm: Comm | None if comm is None: self._comm = DummyComm() elif isinstance(comm, Comm): self._comm = comm elif isinstance(comm, dict): log.info("Creating comm object...") self._comm = get_object(comm, Comm) else: raise ValueError("Invalid Comm object") # opened? self._opened = False # background tasks self._background_tasks: List[Tuple[BackgroundTask, bool]] = []
[docs] def add_background_task( self, func: Callable[..., Coroutine[Any, Any, None]], restart: bool = True, autostart: bool = True ) -> BackgroundTask: """Add a new function that should be run in the background. MUST be called in constructor of derived class or at least before calling open() on the object. Args: func: Func to add. restart: Whether to restart this function. autostart: Whether to start this function when the module is opened Returns: Background task """ background_task = BackgroundTask(func, restart, self) self._background_tasks.append((background_task, autostart)) return background_task
[docs] async def open(self) -> None: """Open module.""" self._perform_background_task_autostart() # open child objects for obj in self._child_objects: if hasattr(obj, "open"): if inspect.iscoroutinefunction(obj.open): await obj.open() else: obj.open() # success self._opened = True
def _perform_background_task_autostart(self) -> None: todo = filter(lambda b: b[1] is True, self._background_tasks) for task, _ in todo: task.start() @property def opened(self) -> bool: """Whether object has been opened.""" return self._opened
[docs] async def close(self) -> None: """Close module.""" # close child objects for obj in self._child_objects: if hasattr(obj, "close"): await obj.close() self._stop_background_tasks()
def _stop_background_tasks(self) -> None: for task, _ in self._background_tasks: task.stop()
[docs] def quit(self) -> None: """Can be overloaded to quit program.""" pass
@overload def get_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass], object_class: type[ObjectClass], copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass: ... @overload def get_object( self, config_or_object: dict[str, Any], object_class: Literal[None], copy_comm: bool = True, **kwargs: Any, ) -> Any: ... @overload def get_object( self, config_or_object: ObjectClass | type[ObjectClass], object_class: Literal[None], copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass: ... @overload def get_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass], object_class: type[ObjectClass] | None = None, copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass | Any: ...
[docs] def get_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass], object_class: type[ObjectClass] | None = None, copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass | Any: """Creates object from config or returns object directly, both optionally after check of type. Args: config_or_object: A configuration dict or an object itself to create/check. If a dict with a class key is given, a new object is created. object_class: Class to check object against. copy_comm: Copy comm from this object to the new one. Returns: (New) object (created from config) that optionally passed class check. Raises: TypeError: If the object does not match the given class. """ # set parameters params = copy.copy(kwargs) # copy comm? if copy_comm: params["comm"] = self._comm # copy timezone, location, vfs, and observer, if not exists for p in ["_timezone", "_location", "_vfs", "_observer"]: if self.config_or_object_get_param(config_or_object, p) is None: params[p[1:]] = getattr(self, p) # get it return get_object(config_or_object, object_class, **params)
[docs] @staticmethod def config_or_object_get_param(config_or_object: dict[str, Any] | Any, param: str) -> Any: """Checks, whether a config_or_object has the given parameter. Args: config_or_object: Dict config or object. param: Parameter name to check. Returns: """ is_dict = isinstance(config_or_object, dict) if is_dict and param in config_or_object: return config_or_object[param] if not is_dict and hasattr(config_or_object, param): return getattr(config_or_object, param) return None
@overload def get_safe_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: Type[ObjectClass], copy_comm: bool = True, **kwargs: Any, ) -> Optional[ObjectClass]: ... @overload def get_safe_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: None, copy_comm: bool = True, **kwargs: Any, ) -> Optional[Any]: ...
[docs] def get_safe_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: type[ObjectClass] | None = None, copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass | Any | None: """Calls get_object in a safe way and returns None, if an exceptions thrown.""" try: return self.get_object(config_or_object, object_class=object_class, copy_comm=copy_comm, **kwargs) except Exception: return None
@overload def add_child_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: type[ObjectClass], copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass: ... @overload def add_child_object( self, config_or_object: ObjectClass, **kwargs: Any, ) -> ObjectClass: ... @overload def add_child_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: Literal[None], copy_comm: bool = True, **kwargs: Any, ) -> Any: ...
[docs] def add_child_object( self, config_or_object: dict[str, Any] | ObjectClass | type[ObjectClass] | Any, object_class: type[ObjectClass] | None = None, copy_comm: bool = True, **kwargs: Any, ) -> ObjectClass | Any: """Create a new sub-module, which will automatically be opened and closed. Args: config_or_object: Module definition object_class: Class for new module copy_comm: Copy comm from this object to the new one. Returns: The created module. """ # get object obj = self.get_object(config_or_object, object_class=object_class, copy_comm=copy_comm, **kwargs) # add to list self._child_objects.append(obj) # return it return obj
@overload async def proxy(self, name_or_object: str | object, obj_type: type[ProxyType]) -> ProxyType: ... @overload async def proxy(self, name_or_object: str | object, obj_type: type[ProxyType] | None = None) -> Any: ...
[docs] async def proxy( self, name_or_object: str | object, obj_type: type[ProxyType] | None = None ) -> Union[Any, ProxyType]: """Returns object directly if it is of given type. Otherwise get proxy of client with given name and check type. If name_or_object is an object: - If it is of type (or derived), return object. - Otherwise raise exception. If name_name_or_object is string: - Create proxy from name and raise exception, if it doesn't exist. - Check type and raise exception if wrong. - Return object. Args: name_or_object: Name of object or object itself. obj_type: Expected class of object. Returns: Object or proxy to object. Raises: ValueError: If proxy does not exist or wrong type. """ return await self.comm.proxy(name_or_object, obj_type)
__all__ = ["get_object", "get_class_from_string", "create_object", "Object", "PrivateAttrMixin"]