mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
32 lines
706 B
Python
32 lines
706 B
Python
from unittest.mock import (
|
|
MagicMock,
|
|
)
|
|
|
|
import trio
|
|
|
|
from libp2p.abc import IHost
|
|
|
|
|
|
def create_mock_connections(count: int = 50) -> dict:
|
|
connections = {}
|
|
|
|
for i in range(1, count):
|
|
peer_id = f"peer-{i}"
|
|
mock_conn = MagicMock(name=f"INetConn-{i}")
|
|
connections[peer_id] = mock_conn
|
|
|
|
return connections
|
|
|
|
|
|
async def run_host_forever(host: IHost, addr):
|
|
async with host.run([addr]):
|
|
await trio.sleep_forever()
|
|
|
|
|
|
async def wait_until_listening(host, timeout=3):
|
|
with trio.move_on_after(timeout):
|
|
while not host.get_addrs():
|
|
await trio.sleep(0.05)
|
|
return
|
|
raise RuntimeError("Timed out waiting for host to get an address")
|