Source code for pyobs.utils.parallel

from __future__ import annotations

import asyncio
import contextlib
from asyncio import Task
from collections.abc import Coroutine
from typing import Any


async def event_wait(evt: asyncio.Event, timeout: float = 1.0) -> bool:
    # suppress TimeoutError because we'll return False in case of timeout
    with contextlib.suppress(asyncio.TimeoutError):
        await asyncio.wait_for(evt.wait(), timeout)
    return evt.is_set()


async def acquire_lock(lock: asyncio.Lock, timeout: float = 1.0) -> bool:
    # suppress TimeoutError because we'll return False in case of timeout
    try:
        await asyncio.wait_for(lock.acquire(), timeout)
        return True
    except TimeoutError:
        return False


[docs] class Future(asyncio.Future[Any]): def __init__( self, empty: bool = False, annotation: dict[str, Any] | None = None, *args: Any, **kwargs: Any, ): asyncio.Future.__init__(self, *args, **kwargs) """Init new base future.""" self.timeout: float | None = None self.annotation = annotation self._timeout_handle: asyncio.TimerHandle | None = None # already set? if empty: # fire event self.set_result(None)
[docs] def set_timeout(self, timeout: float) -> None: """ Sets a new timeout for the method call. Cancels any existing timeout handle and schedules a new one at the extended deadline. """ self.timeout = timeout if self._timeout_handle is not None: self._timeout_handle.cancel() try: loop = asyncio.get_running_loop() self._timeout_handle = loop.call_later(timeout, self._on_timeout) except RuntimeError: pass # no running loop — handle will be set when __await__ runs
[docs] def get_timeout(self) -> float | None: """ Returns async timeout. """ return self.timeout
def __await__(self) -> Any: # not finished? need to wait. if not self.done(): loop = asyncio.get_running_loop() # schedule timeout timeout = self.timeout if self.timeout is not None else 10.0 self._timeout_handle = loop.call_later(timeout, self._on_timeout) self._asyncio_future_blocking = True yield self # suspend until done or timeout # cancel timeout handle if completed normally if self._timeout_handle is not None: self._timeout_handle.cancel() self._timeout_handle = None # still not done? raise exception. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result()
[docs] @staticmethod async def wait_all(futures: list[Future | Coroutine[Any, Any, Any] | Task[Any] | None]) -> list[Any]: return [await fut for fut in futures if fut is not None]
def _on_timeout(self) -> None: if not self.done(): self.set_exception(TimeoutError())
__all__ = ["Future", "event_wait", "acquire_lock"]