mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
app{websocket): Refactor transport type annotations and improve event handling in QUIC connection
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@ -184,4 +184,4 @@ tests/interop/js_libp2p/js_node/src/node_modules/
|
||||
tests/interop/js_libp2p/js_node/src/package-lock.json
|
||||
|
||||
# Sphinx documentation build
|
||||
_build/
|
||||
_build/
|
||||
|
||||
@ -203,7 +203,7 @@ def new_swarm(
|
||||
|
||||
id_opt = generate_peer_id_from(key_pair)
|
||||
|
||||
transport: TCP | QUICTransport
|
||||
transport: TCP | QUICTransport | ITransport
|
||||
quic_transport_opt = connection_config if isinstance(connection_config, QUICTransportConfig) else None
|
||||
|
||||
if listen_addrs is None:
|
||||
@ -261,7 +261,6 @@ def new_swarm(
|
||||
)
|
||||
|
||||
# Create transport based on listen_addrs or default to TCP
|
||||
transport: ITransport
|
||||
if listen_addrs is None:
|
||||
transport = TCP()
|
||||
else:
|
||||
@ -274,7 +273,7 @@ def new_swarm(
|
||||
if addr.__contains__("tcp"):
|
||||
transport = TCP()
|
||||
elif addr.__contains__("quic"):
|
||||
raise ValueError("QUIC not yet supported")
|
||||
transport = QUICTransport(key_pair.private_key, config=quic_transport_opt)
|
||||
else:
|
||||
supported_protocols = get_supported_transport_protocols()
|
||||
raise ValueError(
|
||||
|
||||
@ -491,9 +491,8 @@ class Swarm(Service, INetworkService):
|
||||
logger.debug(f"Swarm.listen processing multiaddr: {maddr}")
|
||||
if str(maddr) in self.listeners:
|
||||
logger.debug(f"Swarm.listen: listener already exists for {maddr}")
|
||||
return True
|
||||
success_count += 1
|
||||
continue
|
||||
success_count += 1
|
||||
continue
|
||||
|
||||
async def conn_handler(
|
||||
read_write_closer: ReadWriteCloser, maddr: Multiaddr = maddr
|
||||
@ -557,6 +556,7 @@ class Swarm(Service, INetworkService):
|
||||
# I/O agnostic, we should change the API.
|
||||
if self.listener_nursery is None:
|
||||
raise SwarmException("swarm instance hasn't been run")
|
||||
assert self.listener_nursery is not None # For type checker
|
||||
logger.debug(f"Swarm.listen: calling listener.listen for {maddr}")
|
||||
await listener.listen(maddr, self.listener_nursery)
|
||||
logger.debug(f"Swarm.listen: listener.listen completed for {maddr}")
|
||||
|
||||
@ -8,7 +8,7 @@ from collections.abc import Awaitable, Callable
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any, Optional, cast
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from aioquic.quic import events
|
||||
from aioquic.quic.connection import QuicConnection
|
||||
@ -871,9 +871,11 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
# Process events by type
|
||||
for event_type, event_list in events_by_type.items():
|
||||
if event_type == type(events.StreamDataReceived).__name__:
|
||||
await self._handle_stream_data_batch(
|
||||
cast(list[events.StreamDataReceived], event_list)
|
||||
)
|
||||
# Filter to only StreamDataReceived events
|
||||
stream_data_events = [
|
||||
e for e in event_list if isinstance(e, events.StreamDataReceived)
|
||||
]
|
||||
await self._handle_stream_data_batch(stream_data_events)
|
||||
else:
|
||||
# Process other events individually
|
||||
for event in event_list:
|
||||
|
||||
Reference in New Issue
Block a user