Files
py-libp2p/tests/utils/interop/process.py
2025-06-01 19:30:25 +05:30

79 lines
2.3 KiB
Python

from abc import (
ABC,
abstractmethod,
)
from collections.abc import (
Iterable,
)
import subprocess
import trio
TIMEOUT_DURATION = 30
class AbstractInterativeProcess(ABC):
@abstractmethod
async def start(self) -> None:
...
@abstractmethod
async def close(self) -> None:
...
class BaseInteractiveProcess(AbstractInterativeProcess):
proc: trio.Process = None
cmd: str
args: list[str]
bytes_read: bytearray
patterns: Iterable[bytes] = None
event_ready: trio.Event
async def wait_until_ready(self) -> None:
patterns_occurred = {pat: False for pat in self.patterns}
buffers = {pat: bytearray() for pat in self.patterns}
async def read_from_daemon_and_check() -> None:
async for data in self.proc.stdout:
self.bytes_read.extend(data)
for pat, occurred in patterns_occurred.items():
if occurred:
continue
# Check if pattern is in new data or spans across chunks
buf = buffers[pat]
buf.extend(data)
if pat in buf:
patterns_occurred[pat] = True
else:
keep = min(len(pat) - 1, len(buf))
buffers[pat] = buf[-keep:] if keep > 0 else bytearray()
if all(patterns_occurred.values()):
return
with trio.fail_after(TIMEOUT_DURATION):
await read_from_daemon_and_check()
self.event_ready.set()
# Sleep a little bit to ensure the listener is up after logs are emitted.
await trio.sleep(0.01)
async def start(self) -> None:
if self.proc is not None:
return
# mypy says that `open_process` is not an attribute of trio, suggests run_process instead. # noqa: E501
self.proc = await trio.open_process( # type: ignore[attr-defined]
[self.cmd] + self.args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout, which makes parsing easier # noqa: E501
bufsize=0,
)
await self.wait_until_ready()
async def close(self) -> None:
if self.proc is None:
return
self.proc.terminate()
await self.proc.wait()