mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2025-12-31 20:36:24 +00:00
fix: process packets received and send to quic
This commit is contained in:
@ -144,19 +144,14 @@ def main() -> None:
|
||||
type=int,
|
||||
help="provide a seed to the random number generator",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-log",
|
||||
"--loglevel",
|
||||
default="DEBUG",
|
||||
help="Provide logging level. Example --loglevel debug, default=warning",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(level=args.loglevel.upper())
|
||||
|
||||
try:
|
||||
trio.run(run, args.port, args.destination, args.seed)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@ -3,6 +3,7 @@ from collections.abc import (
|
||||
Callable,
|
||||
)
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from multiaddr import (
|
||||
Multiaddr,
|
||||
@ -56,6 +57,11 @@ from .exceptions import (
|
||||
SwarmException,
|
||||
)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
logger = logging.getLogger("libp2p.network.swarm")
|
||||
|
||||
|
||||
@ -245,6 +251,7 @@ class Swarm(Service, INetworkService):
|
||||
- Map multiaddr to listener
|
||||
"""
|
||||
# We need to wait until `self.listener_nursery` is created.
|
||||
logger.debug("SWARM LISTEN CALLED")
|
||||
await self.event_listener_nursery_created.wait()
|
||||
|
||||
success_count = 0
|
||||
|
||||
@ -5,6 +5,7 @@ Uses aioquic's sans-IO core with trio for async operations.
|
||||
|
||||
import logging
|
||||
import socket
|
||||
from sys import stdout
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
@ -34,10 +35,11 @@ if TYPE_CHECKING:
|
||||
from .security import QUICTLSConfigManager
|
||||
from .transport import QUICTransport
|
||||
|
||||
logging.root.handlers = []
|
||||
logging.basicConfig(
|
||||
level="DEBUG",
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
|
||||
handlers=[logging.StreamHandler(stdout)],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -252,18 +254,17 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
raise QUICConnectionError(f"Connection start failed: {e}") from e
|
||||
|
||||
async def _initiate_connection(self) -> None:
|
||||
"""Initiate client-side connection establishment."""
|
||||
"""Initiate client-side connection, reusing listener socket if available."""
|
||||
try:
|
||||
with QUICErrorContext("connection_initiation", "connection"):
|
||||
# Create UDP socket using trio
|
||||
self._socket = trio.socket.socket(
|
||||
family=socket.AF_INET, type=socket.SOCK_DGRAM
|
||||
)
|
||||
if not self._socket:
|
||||
logger.debug("Creating new socket for outbound connection")
|
||||
self._socket = trio.socket.socket(
|
||||
family=socket.AF_INET, type=socket.SOCK_DGRAM
|
||||
)
|
||||
|
||||
# Connect the socket to the remote address
|
||||
await self._socket.connect(self._remote_addr)
|
||||
await self._socket.bind(("0.0.0.0", 0))
|
||||
|
||||
# Start the connection establishment
|
||||
self._quic.connect(self._remote_addr, now=time.time())
|
||||
|
||||
# Send initial packet(s)
|
||||
@ -297,8 +298,10 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
|
||||
# Start background event processing
|
||||
if not self._background_tasks_started:
|
||||
print("STARTING BACKGROUND TASK")
|
||||
logger.debug("STARTING BACKGROUND TASK")
|
||||
await self._start_background_tasks()
|
||||
else:
|
||||
logger.debug("BACKGROUND TASK ALREADY STARTED")
|
||||
|
||||
# Wait for handshake completion with timeout
|
||||
with trio.move_on_after(
|
||||
@ -330,11 +333,14 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
|
||||
self._background_tasks_started = True
|
||||
|
||||
if self.__is_initiator: # Only for client connections
|
||||
self._nursery.start_soon(async_fn=self._client_packet_receiver)
|
||||
|
||||
# Start event processing task
|
||||
self._nursery.start_soon(async_fn=self._event_processing_loop)
|
||||
|
||||
# Start periodic tasks
|
||||
# self._nursery.start_soon(async_fn=self._periodic_maintenance)
|
||||
self._nursery.start_soon(async_fn=self._periodic_maintenance)
|
||||
|
||||
logger.debug("Started background tasks for QUIC connection")
|
||||
|
||||
@ -379,6 +385,40 @@ class QUICConnection(IRawConnection, IMuxedConn):
|
||||
except Exception as e:
|
||||
logger.error(f"Error in periodic maintenance: {e}")
|
||||
|
||||
async def _client_packet_receiver(self) -> None:
|
||||
"""Receive packets for client connections."""
|
||||
logger.debug("Starting client packet receiver")
|
||||
print("Started QUIC client packet receiver")
|
||||
|
||||
try:
|
||||
while not self._closed and self._socket:
|
||||
try:
|
||||
# Receive UDP packets
|
||||
data, addr = await self._socket.recvfrom(65536)
|
||||
print(f"Client received {len(data)} bytes from {addr}")
|
||||
|
||||
# Feed packet to QUIC connection
|
||||
self._quic.receive_datagram(data, addr, now=time.time())
|
||||
|
||||
# Process any events that result from the packet
|
||||
await self._process_quic_events()
|
||||
|
||||
# Send any response packets
|
||||
await self._transmit()
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
logger.debug("Client socket closed")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error receiving client packet: {e}")
|
||||
await trio.sleep(0.01)
|
||||
|
||||
except trio.Cancelled:
|
||||
logger.info("Client packet receiver cancelled")
|
||||
raise
|
||||
finally:
|
||||
logger.debug("Client packet receiver terminated")
|
||||
|
||||
# Security and identity methods
|
||||
|
||||
async def _verify_peer_identity_with_security(self) -> None:
|
||||
|
||||
@ -5,6 +5,7 @@ QUIC Listener
|
||||
import logging
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@ -35,8 +36,8 @@ if TYPE_CHECKING:
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -440,7 +440,8 @@ class QUICTLSConfigManager:
|
||||
"private_key": self.tls_config.private_key,
|
||||
"certificate_chain": [],
|
||||
"alpn_protocols": ["libp2p"],
|
||||
"verify_mode": True,
|
||||
"verify_mode": False,
|
||||
"check_hostname": False,
|
||||
}
|
||||
return config
|
||||
|
||||
@ -458,7 +459,8 @@ class QUICTLSConfigManager:
|
||||
"private_key": self.tls_config.private_key,
|
||||
"certificate_chain": [],
|
||||
"alpn_protocols": ["libp2p"],
|
||||
"verify_mode": True,
|
||||
"verify_mode": False,
|
||||
"check_hostname": False,
|
||||
}
|
||||
return config
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ Updated to include Module 5 security integration.
|
||||
from collections.abc import Iterable
|
||||
import copy
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from aioquic.quic.configuration import (
|
||||
QuicConfiguration,
|
||||
@ -15,6 +16,7 @@ from aioquic.quic.configuration import (
|
||||
from aioquic.quic.connection import (
|
||||
QuicConnection as NativeQUICConnection,
|
||||
)
|
||||
from aioquic.quic.logger import QuicLogger
|
||||
import multiaddr
|
||||
import trio
|
||||
|
||||
@ -62,8 +64,8 @@ QUIC_DRAFT29_PROTOCOL = QUICTransportConfig.PROTOCOL_QUIC_DRAFT29
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -290,6 +292,7 @@ class QUICTransport(ITransport):
|
||||
raise QUICDialError(f"Unsupported QUIC version: {quic_version}")
|
||||
|
||||
config.is_client = True
|
||||
config.quic_logger = QuicLogger()
|
||||
logger.debug(
|
||||
f"Dialing QUIC connection to {host}:{port} (version: {quic_version})"
|
||||
)
|
||||
@ -484,3 +487,10 @@ class QUICTransport(ITransport):
|
||||
|
||||
"""
|
||||
return self._security_manager
|
||||
|
||||
def get_listener_socket(self) -> trio.socket.SocketType | None:
|
||||
"""Get the socket from the first active listener."""
|
||||
for listener in self._listeners:
|
||||
if listener.is_listening() and listener._socket:
|
||||
return listener._socket
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user