from __future__ import annotations
import contextlib
import time
import asyncio
import inspect
from asyncio import Task
from collections.abc import Coroutine
from typing import TypeVar, Optional, List, Any, cast, Union
from pyobs.utils.types import cast_response_to_real
async def event_wait(evt: asyncio.Event, timeout: float = 1.0) -> bool:
# suppress TimeoutError because we'll return False in case of timeout
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(evt.wait(), timeout)
return evt.is_set()
async def acquire_lock(lock: asyncio.Lock, timeout: float = 1.0) -> bool:
# suppress TimeoutError because we'll return False in case of timeout
try:
await asyncio.wait_for(lock.acquire(), timeout)
return True
except asyncio.TimeoutError:
return False
[docs]class Future(asyncio.Future):
def __init__(self, empty: bool = False, signature: Optional[inspect.Signature] = None, *args: Any, **kwargs: Any):
asyncio.Future.__init__(self, *args, **kwargs)
"""Init new base future."""
self.timeout: Optional[float] = None
self.signature: Optional[inspect.Signature] = signature
# already set?
if empty:
# fire event
self.set_result(None)
[docs] def set_timeout(self, timeout: float) -> None:
"""
Sets a new timeout for the method call.
"""
self.timeout = timeout
[docs] def get_timeout(self) -> Optional[float]:
"""
Returns async timeout.
"""
return self.timeout
def _wait_for_time(self, timeout: float = 0) -> None:
"""Waits a little.
Args:
time: Time to wait in seconds.
"""
start = time.time()
while not self.done() or time.time() - start > timeout:
return
raise TimeoutError
def __await__(self) -> Any:
# not finished? need to wait.
if not self.done():
try:
# wait some 10s first
self._wait_for_time(10)
except TimeoutError:
# got an additional timeout?
if self.timeout is not None and self.timeout > 10:
# we already waited 10s, so subtract it
self._wait_for_time(self.timeout - 10.0)
# not done? yield!
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
# still not done? raise exception.
if not self.done():
raise RuntimeError("await wasn't used with future")
# get result
result = self.result()
# all ok, return value
if self.signature is not None:
# cast response to real types
return cast_response_to_real(result, self.signature)
else:
return result
[docs] @staticmethod
async def wait_all(futures: List[Optional[Union[Future, Coroutine[Any, Any, Any], Task[Any]]]]) -> List[Any]:
return [await fut for fut in futures if fut is not None]
__all__ = ["Future", "event_wait", "acquire_lock"]