feat: implement get_remote_address via delegation pattern

This commit is contained in:
acul71
2025-03-13 14:08:13 +01:00
committed by Paul Robinson
parent 798229cd3a
commit fabf2cefc4
8 changed files with 72 additions and 1 deletions

View File

@ -1,4 +1,7 @@
import logging
from typing import (
Optional,
)
import trio
@ -42,3 +45,11 @@ class TrioTCPStream(ReadWriteCloser):
async def close(self) -> None:
await self.stream.aclose()
def get_remote_address(self) -> Optional[tuple[str, int]]:
"""Return the remote address as (host, port) tuple."""
try:
return self.stream.socket.getpeername()
except (AttributeError, OSError) as e:
logger.error("Error getting remote address: %s", e)
return None