mirror of
https://github.com/varun-r-mallya/py-libp2p.git
synced 2026-02-12 16:10:57 +00:00
Key-Book: added tests
This commit is contained in:
@ -6,10 +6,12 @@ from multiaddr import Multiaddr
|
|||||||
from libp2p.crypto.secp256k1 import (
|
from libp2p.crypto.secp256k1 import (
|
||||||
create_new_key_pair,
|
create_new_key_pair,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.id import ID
|
||||||
from libp2p.peer.peerdata import (
|
from libp2p.peer.peerdata import (
|
||||||
PeerData,
|
PeerData,
|
||||||
PeerDataError,
|
PeerDataError,
|
||||||
)
|
)
|
||||||
|
from libp2p.peer.peerstore import PeerStore
|
||||||
|
|
||||||
MOCK_ADDR = Multiaddr("/ip4/127.0.0.1/tcp/4001")
|
MOCK_ADDR = Multiaddr("/ip4/127.0.0.1/tcp/4001")
|
||||||
MOCK_KEYPAIR = create_new_key_pair()
|
MOCK_KEYPAIR = create_new_key_pair()
|
||||||
@ -60,6 +62,7 @@ def test_supports_protocols():
|
|||||||
assert supported == ["protocol1", "protocol2"]
|
assert supported == ["protocol1", "protocol2"]
|
||||||
|
|
||||||
|
|
||||||
|
# Test case for first supported protocol is found
|
||||||
def test_first_supported_protocol_found():
|
def test_first_supported_protocol_found():
|
||||||
peer_data = PeerData()
|
peer_data = PeerData()
|
||||||
peer_data.set_protocols(["protocolA", "protocolB"])
|
peer_data.set_protocols(["protocolA", "protocolB"])
|
||||||
@ -70,6 +73,7 @@ def test_first_supported_protocol_found():
|
|||||||
assert first == "protocolB"
|
assert first == "protocolB"
|
||||||
|
|
||||||
|
|
||||||
|
# Test case for first supported protocol not found
|
||||||
def test_first_supported_protocol_none():
|
def test_first_supported_protocol_none():
|
||||||
peer_data = PeerData()
|
peer_data = PeerData()
|
||||||
peer_data.set_protocols(["protocolX", "protocolY"])
|
peer_data.set_protocols(["protocolX", "protocolY"])
|
||||||
@ -80,6 +84,7 @@ def test_first_supported_protocol_none():
|
|||||||
assert first == "None supported"
|
assert first == "None supported"
|
||||||
|
|
||||||
|
|
||||||
|
# Test case for clearing protocol data
|
||||||
def test_clear_protocol_data():
|
def test_clear_protocol_data():
|
||||||
peer_data = PeerData()
|
peer_data = PeerData()
|
||||||
peer_data.set_protocols(["proto1", "proto2"])
|
peer_data.set_protocols(["proto1", "proto2"])
|
||||||
@ -159,6 +164,42 @@ def test_get_privkey_not_found():
|
|||||||
peer_data.get_privkey()
|
peer_data.get_privkey()
|
||||||
|
|
||||||
|
|
||||||
|
# Test case for returning all the peers with stored keys
|
||||||
|
def test_peer_with_keys():
|
||||||
|
peer_store = PeerStore()
|
||||||
|
peer_id_1 = ID(b"peer1")
|
||||||
|
peer_id_2 = ID(b"peer2")
|
||||||
|
|
||||||
|
peer_data_1 = PeerData()
|
||||||
|
peer_data_2 = PeerData()
|
||||||
|
|
||||||
|
peer_data_1.pubkey = MOCK_PUBKEY
|
||||||
|
peer_data_2.pubkey = None
|
||||||
|
|
||||||
|
peer_store.peer_data_map = {
|
||||||
|
peer_id_1: peer_data_1,
|
||||||
|
peer_id_2: peer_data_2,
|
||||||
|
}
|
||||||
|
|
||||||
|
assert peer_store.peer_with_keys() == [peer_id_1]
|
||||||
|
|
||||||
|
|
||||||
|
# Test case for clearing the key book
|
||||||
|
def test_clear_keydata():
|
||||||
|
peer_store = PeerStore()
|
||||||
|
peer_id = ID(b"peer123")
|
||||||
|
peer_data = PeerData()
|
||||||
|
|
||||||
|
peer_data.pubkey = MOCK_PUBKEY
|
||||||
|
peer_data.privkey = MOCK_PRIVKEY
|
||||||
|
peer_store.peer_data_map = {peer_id: peer_data}
|
||||||
|
|
||||||
|
peer_store.clear_keydata(peer_id)
|
||||||
|
|
||||||
|
assert peer_data.pubkey is None
|
||||||
|
assert peer_data.privkey is None
|
||||||
|
|
||||||
|
|
||||||
# Test case for recording latency for the first time
|
# Test case for recording latency for the first time
|
||||||
def test_record_latency_initial():
|
def test_record_latency_initial():
|
||||||
peer_data = PeerData()
|
peer_data = PeerData()
|
||||||
|
|||||||
Reference in New Issue
Block a user