Communication between modules (pyobs.comm)

The Comm object is responsible for all communication between modules (see pyobs.modules). The base class for all implementations is Comm.

The user usually only has contact with the Comm object when writing the configuration for an existing module or when developing a new module that needs to communicate with other modules.

In a configuration file, the Comm object is defined at top-level like this:

comm:
    class: pyobs.comm.xmpp.XmppComm

Except for a single parameter defined in Comm’s constructor, all parameters are defined in derived classes.

The most convenient way for getting access to other modules’ method is by using a Proxy object, which can easily be obtained by using the proxy() method or the [] operator like this (if the module named ‘camera’ implements the ICamera interface):

camera = comm['camera']
camera.expose().wait()

Note that camera is now not of type ICamera.

Each Module that was configured with a Comm object (see modules) has an attribute comm for easy access.

There is currently one one implementation of the Comm interface:

  • XmppComm uses the XMPP protocol for communication.

See also

Module modules

Description for modules, to which Comm objects are usually assigned.

Modules in a pyobs system rarely run in isolation — an autofocus module needs to tell the camera to take a picture, a guiding module needs to send corrections to the telescope, and so on. All of this inter-module communication goes through a Comm object.

The Comm layer has three responsibilities:

  • Method calls — calling a method on a remote module as if it were local, via a Proxy

  • Events — broadcasting typed event objects to all interested modules

  • Discovery — finding out which modules are online and what interfaces they expose

Configuration

A Comm object is configured at the top level of any module YAML file:

comm:
  class: pyobs.comm.xmpp.XmppComm
  jid: camera@my.observatory.org

Once configured, it is available inside any Object or Module via self.comm.

Proxies and remote method calls

A Proxy is a local stand-in for a remote module. It exposes the same interface methods as the real module, but executes them remotely over the network. Obtain one with proxy() (available on every Object and Module):

from pyobs.interfaces import ITelescope, ICamera

# get a proxy to the telescope module
telescope = await self.proxy("telescope", ITelescope)

# call its methods just like local ones
await telescope.move_radec(ra=83.8, dec=-5.4)

The second argument is the expected interface. If the named module does not implement that interface, proxy() raises a ValueError, which makes type errors visible immediately rather than at the point of the actual call.

For cases where a module might or might not be present, use safe_proxy(), which returns None instead of raising:

focuser = await self.comm.safe_proxy("focuser", IFocuser)
if focuser is not None:
    await focuser.set_focus(focus_value)

To find all currently connected modules that implement a given interface:

cameras = await self.comm.clients_with_interface(ICamera)

Events

Events are typed objects broadcast to all modules that have registered an interest in them. They are used for loosely coupled notification — a module that finishes an exposure fires a NewImageEvent; any module that wants to react to new images subscribes to it.

Subscribing to an event is done in open():

from pyobs.events import NewImageEvent, GoodWeatherEvent

async def open(self) -> None:
    await Module.open(self)
    await self.comm.register_event(NewImageEvent, self._on_new_image)
    await self.comm.register_event(GoodWeatherEvent, self._on_good_weather)

async def _on_new_image(self, event: NewImageEvent, sender: str) -> bool:
    log.info("New image from %s: %s", sender, event.filename)
    return True

The handler must be an async coroutine that accepts the event object and the sender’s name, and returns a boolean indicating whether the event was handled.

Sending an event requires registering the event type first (even without a handler), then sending:

from pyobs.events import ExposureStatusChangedEvent
from pyobs.utils.enums import ExposureStatus

async def open(self) -> None:
    await Module.open(self)
    await self.comm.register_event(ExposureStatusChangedEvent)

async def _expose(self) -> None:
    await self.comm.send_event(ExposureStatusChangedEvent(ExposureStatus.EXPOSING))

See Events (pyobs.events) for the full list of available event types.

Implementations

Class

Use case

XmppComm

Production use. Requires an XMPP server (e.g. ejabberd). All modules connect to the server and communicate via XMPP’s RPC and publish-subscribe extensions. This is the standard choice for real observatories.

DbusComm

Single-machine setups on Linux using D-Bus for inter-process communication. No external server required, but modules must run on the same machine.

LocalComm

In-process communication for use in MultiModule setups and tests. All modules share the same Python process.

DummyComm

No-op implementation used when no comm is configured. A module with DummyComm runs in isolation — it cannot call remote methods or receive events.

API reference

class Comm(cache_proxies: bool = True)

Base class for all Comm modules in pyobs.

Creates a comm module.

cast_to_real_post(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them after being sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

cast_to_real_pre(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them after being sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

cast_to_simple_post(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them to be sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

cast_to_simple_pre(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them to be sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

property clients: List[str]

Returns list of currently connected clients.

Returns:

(list) List of currently connected clients.

async clients_with_interface(interface: Type[Interface]) List[str][source]

Returns list of currently connected clients that implement the given interface.

Parameters:

interface – Interface to search for.

Returns:

(list) List of currently connected clients that implement the given interface.

async close() None[source]

Close module.

async execute(client: str, method: str, annotation: Dict[str, Any], *args: Any) Any[source]

Execute a given method on a remote client.

Parameters:
  • client (str) – ID of client.

  • method (str) – Method to call.

  • annotation – Method annotation.

  • *args – List of parameters for given method.

Returns:

Passes through return from method call.

async get_interfaces(client: str) List[Type[Interface]][source]

Returns list of interfaces for given client.

Parameters:

client – Name of client.

Returns:

List of supported interfaces.

Raises:

IndexError – If client cannot be found.

log_message(entry: LogEvent) None[source]

Send a log message to other clients.

Parameters:

entry (LogEvent) – Log event to send.

property module: Module

The module that this Comm object is attached to.

property name: str | None

Name of this client.

async open() None[source]

Open module.

async proxy(name_or_object: str | object, obj_type: Type[ProxyType]) ProxyType[source]
async proxy(name_or_object: str | object, obj_type: Type[ProxyType] | None = None) Any

Returns object directly if it is of given type. Otherwise get proxy of client with given name and check type.

If name_or_object is an object:
  • If it is of type (or derived), return object.

  • Otherwise raise exception.

If name_name_or_object is string:
  • Create proxy from name and raise exception, if it doesn’t exist.

  • Check type and raise exception if wrong.

  • Return object.

Parameters:
  • name_or_object – Name of object or object itself.

  • obj_type – Expected class of object.

Returns:

Object or proxy to object.

Raises:

ValueError – If proxy does not exist or wrong type.

async register_event(event_class: Type[Event], handler: Callable[[Event, str], Coroutine[Any, Any, bool]] | None = None) None[source]

Register an event type. If a handler is given, we also receive those events, otherwise we just send them.

Parameters:
  • event_class – Class of event to register.

  • handler – Event handler method.

async safe_proxy(name_or_object: str | object, obj_type: Type[ProxyType] | None = None) Any | ProxyType | None[source]

Calls proxy() in a safe way and returns None instead of raising an exception.

async send_event(event: Event) None[source]

Send an event to other clients.

Parameters:

event (Event) – Event to send

class Proxy(comm: Comm, client: str, interfaces: List[Type[Interface]])

A proxy for remote pyobs modules.

Creates a new proxy.

Parameters:
  • comm – Comm object to use for connection.

  • client – Name of client to connect to.

  • interfaces – List of interfaces supported by client.

async execute(method: str, *args: Any, **kwargs: Any) Any[source]

Execute a method on the remote client.

Parameters:
  • method – Name of method to call.

  • *args – Parameters for method call.

  • **kwargs – Parameters for method call.

Returns:

Result of method call.

interface_method(method: str) Any[source]

Returns the method of the given name from the interface and not from the object itself.

Parameters:

method – Name of method.

Returns:

The interface method.

property interfaces: List[Type[Interface]]

List of interfaces.

property method_names: List[str]

List of method names.

property name: str

Name of the client.

signature(method: str) Signature[source]

Returns the signature of a given method.

Parameters:

method – Name of the method.

Returns:

Signature of the given method.

class XmppComm(jid: str | None = None, user: str | None = None, domain: str | None = None, resource: str = 'pyobs', password: str = '', server: str | None = None, use_tls: bool = False, ignore_cert_errors: bool = False, *args: Any, **kwargs: Any)

Bases: Comm

A Comm class using XMPP.

This Comm class uses an XMPP server (e.g. ejabberd) for communication between modules. Essentially required for a connection to the server is a JID, a JabberID. It can be specified in the configuration like this:

comm:
    class: pyobs.comm.xmpp.XmppComm
    jid:  someuser@example.com/pyobs

Using this, pyobs tries to connect to example.com as user someuser with resource pyobs. Since pyobs is the default resource, it can be omitted:

jid:  someuser@example.com

Alternatively, one can split the user, domain, and resource (if required) into three different parameters:

user: someuser
domain: example.com

This comes in handy, if one wants to put the basic Comm configuration into a separate file. Imagine a _comm.yaml in the same directory as the module config:

comm_cfg: &comm
    class: pyobs.comm.sleekxmpp.XmppComm
    domain: example.com

Now in the module configuration, one can simply do this:

{include _comm.yaml}

comm:
    <<: *comm
    user: someuser
    password: supersecret

This allows for a super easy change of the domain for all configurations, which especially makes developing on different machines a lot easier.

The server parameter can be used, when the server’s hostname is different from the XMPP domain. This might, e.g., be the case, when connecting to a server via SSH port forwarding:

jid:  someuser@example.com/pyobs
server: localhost:52222

Finally, always make sure that use_tls is set according to the server’s settings, i.e. if it uses TLS, this parameter must be True, and False otherwise. Cryptic error messages will follow, if one does not set this properly.

Create a new XMPP Comm module.

Either a fill JID needs to be provided, or a set of user/domian/resource, from which a JID is built.

Parameters:
  • jid – JID to connect as.

  • user – Username part of the JID.

  • domain – Domain part of the JID.

  • resource – Resource part of the JID.

  • password – Password for given JID.

  • server – Server to connect to. If not given, domain from JID is used.

  • use_tls – Whether to use TLS.

cast_to_real_post(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them after being sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

cast_to_simple_pre(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them to be sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

property client: XmppClient

Returns the XMPP client.

Returns:

The XMPP client.

property clients: List[str]

Returns list of currently connected clients.

Returns:

(list) List of currently connected clients.

async close() None[source]

Close connection.

async execute(client: str, method: str, annotation: Dict[str, Any], *args: Any) Any[source]

Execute a given method on a remote client.

Parameters:
  • client (str) – ID of client.

  • method (str) – Method to call.

  • annotation – Method annotation.

  • *args – List of parameters for given method.

Returns:

Passes through return from method call.

async get_interfaces(client: str) List[Type[Interface]][source]

Returns list of interfaces for given client.

Parameters:

client – Name of client.

Returns:

List of supported interfaces.

Raises:

IndexError – If client cannot be found.

property name: str | None

Name of this client.

async open() None[source]

Open the connection to the XMPP server.

Returns:

Whether opening was successful.

async send_event(event: Event) None[source]

Send an event to other clients.

Parameters:

event (Event) – Event to send

class DbusComm(name: str, domain: str = 'org.pyobs.module', *args: Any, **kwargs: Any)

Bases: Comm

A Comm class using cbus.

This Comm class uses dbus for communication between modules and is therefore available on all (most) Linux systems. The interface name for the bus is build as <domain>.<name> and this class will only find other modules with the same domain, so it should be kept constant in a closed system. The name on the other hand should be unique in the system.

A basic configuration looks like this:

comm:
    class: pyobs.dbus.DbusComm
    name: example

Create a new dbus Comm module.

Parameters:
  • name – Name for export.

  • domain – Domain for export.

cast_to_real_pre(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them after being sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

cast_to_simple_pre(value: Any, annotation: Any | None = None) Tuple[bool, Any][source]

Special treatment of single parameters when converting them to be sent via Comm.

Parameters:
  • value – Value to be treated.

  • annotation – Annotation for value.

Returns:

A tuple containing a tuple that indicates whether this value should be further processed and a new value.

property clients: List[str]

Returns list of currently connected clients.

Returns:

(list) List of currently connected clients.

async close() None[source]

Close connection.

async execute(client: str, method: str, annotation: Dict[str, Any], *args: Any) Any[source]

Execute a given method on a remote client.

Parameters:
  • client (str) – ID of client.

  • method (str) – Method to call.

  • annotation – Method annotation.

  • *args – List of parameters for given method.

Returns:

Passes through return from method call.

async get_dbus_owner(bus: str, attempts: int = 3) str[source]

Gets the owning module name for a given bus.

Params:

bus: Name of bus to find owner for.

Returns:

Owning module.

async get_interfaces(client: str) List[Type[Interface]][source]

Returns list of interfaces for given client.

Parameters:

client – Name of client.

Returns:

List of supported interfaces.

Raises:

IndexError – If client cannot be found.

async handle_event(event: Event, sender: str) None[source]

Handle event localy, i.e. send it to module.

Parameters:
  • event – Event to handle.

  • sender – Sender of event.

property name: str | None

Name of this client.

async open() None[source]

Creates the dbus connection.

async send_event(event: Event) None[source]

Send an event to other clients.

Parameters:

event (Event) – Event to send

set_timeout(uid: str, timeout: float) None[source]

Set timeout, usually received from other module.

Parameters:
  • uid – UID of remote call.

  • timeout – Timeout in seconds.