from __future__ import annotations
import asyncio
import inspect
import types
from collections.abc import Coroutine
from typing import TYPE_CHECKING, Any, Generic, TypeVar, get_type_hints
from pyobs.interfaces import Interface
if TYPE_CHECKING:
from pyobs.comm import Comm
"""Class of a proxy."""
ProxyType = TypeVar("ProxyType")
class Proxy:
"""A proxy for remote pyobs modules."""
__module__ = "pyobs.comm"
def __init__(
self,
comm: Comm,
client: str,
interfaces: list[type[Interface]],
capabilities: dict[type[Interface], Any] | None = None,
):
"""Creates a new proxy.
Args:
comm: Comm object to use for connection.
client: Name of client to connect to.
interfaces: List of interfaces supported by client.
capabilities: Pre-populated capabilities dict, keyed by interface type.
"""
# set client and interfaces
self._comm: Comm = comm
self._client = client
# remove interfaces that are implemented by others
to_delete = []
for i1 in interfaces:
for i2 in interfaces:
if i1 != i2 and issubclass(i1, i2):
# i1 implements i2, so remove i2
to_delete.append(i2)
interfaces = [i for i in interfaces if i not in to_delete]
# store deduplicated interfaces
self._interfaces = interfaces
# add interfaces as base classes
cls = self.__class__
self.__class__ = type("Proxy", tuple([cls] + interfaces), {})
# create methods
self._methods = self._create_methods()
# store state and capabilities
self._state: dict[type[Interface], Any] = {}
self._capabilities: dict[type[Interface], Any] = capabilities if capabilities is not None else {}
@property
def name(self) -> str:
"""Name of the client."""
return self._client
@property
def method_names(self) -> list[str]:
"""List of method names."""
return list(sorted(self._methods.keys()))
@property
def interfaces(self) -> list[type[Interface]]:
"""List of interfaces."""
return self._interfaces
[docs]
def signature(self, method: str) -> inspect.Signature:
"""Returns the signature of a given method.
Args:
method: Name of the method.
Returns:
Signature of the given method.
"""
return inspect.signature(self._methods[method][0])
[docs]
def interface_method(self, method: str) -> Any:
"""Returns the method of the given name from the interface and not from the object itself.
Args:
method: Name of method.
Returns:
The interface method.
"""
return self._methods[method][0]
[docs]
async def execute(self, method: str, *args: Any, **kwargs: Any) -> Any:
"""Execute a method on the remote client.
Args:
method: Name of method to call.
*args: Parameters for method call.
**kwargs: Parameters for method call.
Returns:
Result of method call.
"""
# add 'self' to args
args = tuple([self] + list(args))
# get signature and type hints
_, signature, _, type_hints = self._methods[method]
# bind signature
ba = signature.bind(*args, **kwargs)
ba.apply_defaults()
# do request and return future
return await self._comm.execute(self._client, method, type_hints, *ba.args[1:])
def _create_methods(self) -> dict[str, Any]:
"""Create local methods for the remote client."""
# loop all interfaces and get methods
methods = {}
for interface in self._interfaces:
# loop all methods:
for func_name, func in inspect.getmembers(interface, predicate=inspect.isfunction):
# skip base Interface infrastructure methods — Proxy provides its own implementations
if func_name in Interface.__dict__:
continue
# set method
my_func = types.MethodType(self._remote_function_wrapper(func_name), self)
setattr(self, func_name, my_func)
type_hints = get_type_hints(func)
signature = inspect.signature(func)
# store func
methods[func_name] = (func, signature, getattr(self, func_name), type_hints)
# return methods
return methods
def _remote_function_wrapper(self, method: str) -> Any:
"""Function wrapper for remote calls.
Args:
method: Name of method to wrap.
Returns:
Wrapper.
"""
async def inner(this: Proxy, *args: Any, **kwargs: Any) -> Any:
return await this.execute(method, *args, **kwargs)
return inner
[docs]
def update_state(self, interface: type[Interface], state: Any) -> None:
"""Called by Comm whenever a new state arrives. Not intended to be called directly by module code."""
self._state[interface] = state
[docs]
def clear_state(self) -> None:
"""Clear all cached state. Called by Comm when the remote module disconnects."""
self._state.clear()
[docs]
def get_state(self, interface: type[Interface]) -> Any | None:
"""Latest known state for the given interface, or None if nothing has arrived yet."""
return self._state.get(interface)
[docs]
def get_capabilities(self, interface: type[Interface]) -> Any | None:
"""Capabilities for the given interface, populated once at Proxy construction."""
return self._capabilities.get(interface)
[docs]
async def wait_for_state(
self,
interface: type[Interface],
timeout: float = 10.0,
) -> Any:
"""Return state immediately if available, otherwise wait for the first update."""
if self._state.get(interface) is not None:
return self._state[interface]
event = asyncio.Event()
def _notify(state: Any) -> None:
self._state[interface] = state
event.set()
# patch in a one-shot notifier alongside the existing callback
await self._comm.subscribe_state(self._client, interface, _notify)
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
finally:
await self._comm.unsubscribe_state(self._client, interface, _notify)
return self._state.get(interface)
class _ProxyContext(Generic[ProxyType]):
"""Returned by Comm.proxy() / Object.proxy() / Comm.safe_proxy(). Must be used as:
async with self.proxy("camera", ICooling) as camera:
...
"""
def __init__(self, coro: Coroutine[Any, Any, ProxyType]) -> None:
self._coro = coro
async def __aenter__(self) -> ProxyType:
return await self._coro
async def __aexit__(self, *exc_info: Any) -> None:
# intentionally a no-op: the underlying Proxy is owned and cached
# by Comm, not by this block, and stays alive for other callers.
pass
__all__ = ["Proxy"]