diff --git a/tests/core/pubsub/test_pubsub.py b/tests/core/pubsub/test_pubsub.py index e674dbc0..179f359c 100644 --- a/tests/core/pubsub/test_pubsub.py +++ b/tests/core/pubsub/test_pubsub.py @@ -8,6 +8,7 @@ from typing import ( from unittest.mock import patch import pytest +import multiaddr import trio from libp2p.custom_types import AsyncValidatorFn @@ -17,6 +18,7 @@ from libp2p.exceptions import ( from libp2p.network.stream.exceptions import ( StreamEOF, ) +from libp2p.peer.envelope import Envelope from libp2p.peer.id import ( ID, ) @@ -87,6 +89,45 @@ async def test_re_unsubscribe(): assert TESTING_TOPIC not in pubsubs_fsub[0].topic_ids +@pytest.mark.trio +async def test_reissue_when_listen_addrs_change(): + async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: + await connect(pubsubs_fsub[0].host, pubsubs_fsub[1].host) + await pubsubs_fsub[0].subscribe(TESTING_TOPIC) + # Yield to let 0 notify 1 + await trio.sleep(1) + assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + + # Check whether signed-records were transfered properly in the subscribe call + envelope_b_sub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_sub, Envelope) + + # Simulate pubsubs_fsub[1].host listen addrs changing (different port) + new_addr = multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/123") + + # Patch just for the duration we force A to unsubscribe + with patch.object(pubsubs_fsub[0].host, "get_addrs", return_value=[new_addr]): + # Unsubscribe from A's side so that a new_record is issued + await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC) + await trio.sleep(1) + + # B should be holding A's new record with bumped seq + envelope_b_unsub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_unsub, Envelope) + + # This proves that a freshly signed record was issued rather than + # the latest-cached-one creating one. + assert envelope_b_sub.record().seq < envelope_b_unsub.record().seq + + @pytest.mark.trio async def test_peers_subscribe(): async with PubsubFactory.create_batch_with_floodsub(2) as pubsubs_fsub: @@ -95,11 +136,31 @@ async def test_peers_subscribe(): # Yield to let 0 notify 1 await trio.sleep(1) assert pubsubs_fsub[0].my_id in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + + # Check whether signed-records were transfered properly in the subscribe call + envelope_b_sub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_sub, Envelope) + await pubsubs_fsub[0].unsubscribe(TESTING_TOPIC) # Yield to let 0 notify 1 await trio.sleep(1) assert pubsubs_fsub[0].my_id not in pubsubs_fsub[1].peer_topics[TESTING_TOPIC] + envelope_b_unsub = ( + pubsubs_fsub[1] + .host.get_peerstore() + .get_peer_record(pubsubs_fsub[0].host.get_id()) + ) + assert isinstance(envelope_b_unsub, Envelope) + + # This proves that the latest-cached-record was re-issued rather than + # freshly creating one. + assert envelope_b_sub.record().seq == envelope_b_unsub.record().seq + @pytest.mark.trio async def test_get_hello_packet():