mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
- Fix type annotation errors in transport_registry.py and __init__.py - Fix line length violations in test files (E501 errors) - Fix missing return type annotations - Fix cryptography NameAttribute type errors with type: ignore - Fix ExceptionGroup import for cross-version compatibility - Fix test failure in test_wss_listen_without_tls_config by handling ExceptionGroup - Fix len() calls with None arguments in test_tcp_data_transfer.py - Fix missing attribute access errors on interface types - Fix boolean type expectation errors in test_js_ws_ping.py - Fix nursery context manager type errors All tests now pass and linting is clean.
58 lines
2.0 KiB
Python
58 lines
2.0 KiB
Python
from typing import Any
|
|
|
|
from .tcp.tcp import TCP
|
|
from .websocket.transport import WebsocketTransport
|
|
from .transport_registry import (
|
|
TransportRegistry,
|
|
create_transport_for_multiaddr,
|
|
get_transport_registry,
|
|
register_transport,
|
|
get_supported_transport_protocols,
|
|
)
|
|
from .upgrader import TransportUpgrader
|
|
from libp2p.abc import ITransport
|
|
|
|
def create_transport(protocol: str, upgrader: TransportUpgrader | None = None, **kwargs: Any) -> ITransport:
|
|
"""
|
|
Convenience function to create a transport instance.
|
|
|
|
:param protocol: The transport protocol ("tcp", "ws", "wss", or custom)
|
|
:param upgrader: Optional transport upgrader (required for WebSocket)
|
|
:param kwargs: Additional arguments for transport construction (e.g., tls_client_config, tls_server_config)
|
|
:return: Transport instance
|
|
"""
|
|
# First check if it's a built-in protocol
|
|
if protocol in ["ws", "wss"]:
|
|
if upgrader is None:
|
|
raise ValueError(f"WebSocket transport requires an upgrader")
|
|
return WebsocketTransport(
|
|
upgrader,
|
|
tls_client_config=kwargs.get("tls_client_config"),
|
|
tls_server_config=kwargs.get("tls_server_config"),
|
|
handshake_timeout=kwargs.get("handshake_timeout", 15.0)
|
|
)
|
|
elif protocol == "tcp":
|
|
return TCP()
|
|
else:
|
|
# Check if it's a custom registered transport
|
|
registry = get_transport_registry()
|
|
transport_class = registry.get_transport(protocol)
|
|
if transport_class:
|
|
transport = registry.create_transport(protocol, upgrader, **kwargs)
|
|
if transport is None:
|
|
raise ValueError(f"Failed to create transport for protocol: {protocol}")
|
|
return transport
|
|
else:
|
|
raise ValueError(f"Unsupported transport protocol: {protocol}")
|
|
|
|
__all__ = [
|
|
"TCP",
|
|
"WebsocketTransport",
|
|
"TransportRegistry",
|
|
"create_transport_for_multiaddr",
|
|
"create_transport",
|
|
"get_transport_registry",
|
|
"register_transport",
|
|
"get_supported_transport_protocols",
|
|
]
|