Source code for pyobs.comm.proxy

from __future__ import annotations

import asyncio
import inspect
import types
from collections.abc import Coroutine
from typing import TYPE_CHECKING, Any, Generic, TypeVar, get_type_hints

from pyobs.interfaces import Interface

if TYPE_CHECKING:
    from pyobs.comm import Comm

"""Class of a proxy."""
ProxyType = TypeVar("ProxyType")


class Proxy:
    """A proxy for remote pyobs modules."""

    __module__ = "pyobs.comm"

    def __init__(
        self,
        comm: Comm,
        client: str,
        interfaces: list[type[Interface]],
        capabilities: dict[type[Interface], Any] | None = None,
    ):
        """Creates a new proxy.

        Args:
            comm: Comm object to use for connection.
            client: Name of client to connect to.
            interfaces: List of interfaces supported by client.
            capabilities: Pre-populated capabilities dict, keyed by interface type.
        """

        # set client and interfaces
        self._comm: Comm = comm
        self._client = client

        # remove interfaces that are implemented by others
        to_delete = []
        for i1 in interfaces:
            for i2 in interfaces:
                if i1 != i2 and issubclass(i1, i2):
                    # i1 implements i2, so remove i2
                    to_delete.append(i2)
        interfaces = [i for i in interfaces if i not in to_delete]

        # store deduplicated interfaces
        self._interfaces = interfaces

        # add interfaces as base classes
        cls = self.__class__
        self.__class__ = type("Proxy", tuple([cls] + interfaces), {})

        # create methods
        self._methods = self._create_methods()

        # store state and capabilities
        self._state: dict[type[Interface], Any] = {}
        self._capabilities: dict[type[Interface], Any] = capabilities if capabilities is not None else {}

    @property
    def name(self) -> str:
        """Name of the client."""
        return self._client

    @property
    def method_names(self) -> list[str]:
        """List of method names."""
        return list(sorted(self._methods.keys()))

    @property
    def interfaces(self) -> list[type[Interface]]:
        """List of interfaces."""
        return self._interfaces

[docs] def signature(self, method: str) -> inspect.Signature: """Returns the signature of a given method. Args: method: Name of the method. Returns: Signature of the given method. """ return inspect.signature(self._methods[method][0])
[docs] def interface_method(self, method: str) -> Any: """Returns the method of the given name from the interface and not from the object itself. Args: method: Name of method. Returns: The interface method. """ return self._methods[method][0]
[docs] async def execute(self, method: str, *args: Any, **kwargs: Any) -> Any: """Execute a method on the remote client. Args: method: Name of method to call. *args: Parameters for method call. **kwargs: Parameters for method call. Returns: Result of method call. """ # add 'self' to args args = tuple([self] + list(args)) # get signature and type hints _, signature, _, type_hints = self._methods[method] # bind signature ba = signature.bind(*args, **kwargs) ba.apply_defaults() # do request and return future return await self._comm.execute(self._client, method, type_hints, *ba.args[1:])
def _create_methods(self) -> dict[str, Any]: """Create local methods for the remote client.""" # loop all interfaces and get methods methods = {} for interface in self._interfaces: # loop all methods: for func_name, func in inspect.getmembers(interface, predicate=inspect.isfunction): # skip base Interface infrastructure methods — Proxy provides its own implementations if func_name in Interface.__dict__: continue # set method my_func = types.MethodType(self._remote_function_wrapper(func_name), self) setattr(self, func_name, my_func) type_hints = get_type_hints(func) signature = inspect.signature(func) # store func methods[func_name] = (func, signature, getattr(self, func_name), type_hints) # return methods return methods def _remote_function_wrapper(self, method: str) -> Any: """Function wrapper for remote calls. Args: method: Name of method to wrap. Returns: Wrapper. """ async def inner(this: Proxy, *args: Any, **kwargs: Any) -> Any: return await this.execute(method, *args, **kwargs) return inner
[docs] def update_state(self, interface: type[Interface], state: Any) -> None: """Called by Comm whenever a new state arrives. Not intended to be called directly by module code.""" self._state[interface] = state
[docs] def clear_state(self) -> None: """Clear all cached state. Called by Comm when the remote module disconnects.""" self._state.clear()
[docs] def get_state(self, interface: type[Interface]) -> Any | None: """Latest known state for the given interface, or None if nothing has arrived yet.""" return self._state.get(interface)
[docs] def get_capabilities(self, interface: type[Interface]) -> Any | None: """Capabilities for the given interface, populated once at Proxy construction.""" return self._capabilities.get(interface)
[docs] async def wait_for_state( self, interface: type[Interface], timeout: float = 10.0, ) -> Any: """Return state immediately if available, otherwise wait for the first update.""" if self._state.get(interface) is not None: return self._state[interface] event = asyncio.Event() def _notify(state: Any) -> None: self._state[interface] = state event.set() # patch in a one-shot notifier alongside the existing callback await self._comm.subscribe_state(self._client, interface, _notify) try: await asyncio.wait_for(event.wait(), timeout=timeout) finally: await self._comm.unsubscribe_state(self._client, interface, _notify) return self._state.get(interface)
class _ProxyContext(Generic[ProxyType]): """Returned by Comm.proxy() / Object.proxy() / Comm.safe_proxy(). Must be used as: async with self.proxy("camera", ICooling) as camera: ... """ def __init__(self, coro: Coroutine[Any, Any, ProxyType]) -> None: self._coro = coro async def __aenter__(self) -> ProxyType: return await self._coro async def __aexit__(self, *exc_info: Any) -> None: # intentionally a no-op: the underlying Proxy is owned and cached # by Comm, not by this block, and stays alive for other callers. pass __all__ = ["Proxy"]