Source code for pyobs.utils.parallel

from __future__ import annotations
import contextlib
import time
import asyncio
import inspect
from asyncio import Task
from collections.abc import Coroutine
from typing import TypeVar, Optional, List, Any, cast, Union

from pyobs.utils.types import cast_response_to_real


async def event_wait(evt: asyncio.Event, timeout: float = 1.0) -> bool:
    # suppress TimeoutError because we'll return False in case of timeout
    with contextlib.suppress(asyncio.TimeoutError):
        await asyncio.wait_for(evt.wait(), timeout)
    return evt.is_set()


async def acquire_lock(lock: asyncio.Lock, timeout: float = 1.0) -> bool:
    # suppress TimeoutError because we'll return False in case of timeout
    try:
        await asyncio.wait_for(lock.acquire(), timeout)
        return True
    except asyncio.TimeoutError:
        return False


[docs]class Future(asyncio.Future): def __init__(self, empty: bool = False, signature: Optional[inspect.Signature] = None, *args: Any, **kwargs: Any): asyncio.Future.__init__(self, *args, **kwargs) """Init new base future.""" self.timeout: Optional[float] = None self.signature: Optional[inspect.Signature] = signature # already set? if empty: # fire event self.set_result(None)
[docs] def set_timeout(self, timeout: float) -> None: """ Sets a new timeout for the method call. """ self.timeout = timeout
[docs] def get_timeout(self) -> Optional[float]: """ Returns async timeout. """ return self.timeout
def _wait_for_time(self, timeout: float = 0) -> None: """Waits a little. Args: time: Time to wait in seconds. """ start = time.time() while not self.done() or time.time() - start > timeout: return raise TimeoutError def __await__(self) -> Any: # not finished? need to wait. if not self.done(): try: # wait some 10s first self._wait_for_time(10) except TimeoutError: # got an additional timeout? if self.timeout is not None and self.timeout > 10: # we already waited 10s, so subtract it self._wait_for_time(self.timeout - 10.0) # not done? yield! if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. # still not done? raise exception. if not self.done(): raise RuntimeError("await wasn't used with future") # get result result = self.result() # all ok, return value if self.signature is not None: # cast response to real types return cast_response_to_real(result, self.signature) else: return result
[docs] @staticmethod async def wait_all(futures: List[Optional[Union[Future, Coroutine[Any, Any, Any], Task[Any]]]]) -> List[Any]: return [await fut for fut in futures if fut is not None]
__all__ = ["Future", "event_wait", "acquire_lock"]