diff --git a/examples/chat/chat.py b/examples/chat/chat.py new file mode 100755 index 00000000..6fce80f4 --- /dev/null +++ b/examples/chat/chat.py @@ -0,0 +1,109 @@ +#!/bin/env python3 +import sys +from os.path import dirname, abspath +sys.path.append(dirname(dirname(dirname(abspath(__file__))))) + +import asyncio + +import click +from libp2p.libp2p import Libp2p +from network.multiaddr import MultiAddr + +# TODO: change once muxed_connection supports extracting protocol id from messages +PROTOCOL_ID = '/echo/1.0.0' + + +async def read_data(stream): + while True: + read_string = (await stream.read()).decode() + + if not read_string: + return + + if read_string != "\n": + # Green console colour: \x1b[32m + # Reset console colour: \x1b[0m + print("\x1b[32m%s\x1b[0m" % read_string) + + +async def write_data(stream): + loop = asyncio.get_event_loop() + + while True: + line = await loop.run_in_executor(None, sys.stdin.readline) + await stream.write(line.encode()) + + +async def run(port, destination): + + if not destination: + lib = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostA" % port]) + host = await lib.new_node() + + async def stream_handler(stream): + asyncio.ensure_future(read_data(stream)) + asyncio.ensure_future(write_data(stream)) + + host.set_stream_handler(PROTOCOL_ID, stream_handler) + + port = None + for listener in host.network.listeners.values(): + for addr in listener.get_addrs(): + addr_dict = addr.to_options() + if addr_dict['transport'] == 'tcp': + port = addr_dict['port'] + break + + if not port: + raise RuntimeError("was not able to find the actual local port") + + print("Run './chat.py --port %s -d /ip4/127.0.0.1/tcp/%s/p2p/%s' on another console.\n" % (int(port)+1, port, host.get_id().pretty())) + print("You can replace 127.0.0.1 with public IP as well.") + print("\nWaiting for incoming connection\n\n") + + else: + lib = Libp2p(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostB" % port]) + host = await lib.new_node() + + # TODO: improve multiaddr module to have proper function to do this + multiaddr = MultiAddr(destination) + ss = multiaddr.get_multiaddr_string().split('/') + peer_id = ss[-1] + addr = '/'.join(ss[:-2]) + + # Associate the peer with local ip address (see default parameters of Libp2p()) + host.get_peerstore().add_addr(peer_id, addr, 10) + + # Start a stream with the destination. + # Multiaddress of the destination peer is fetched from the peerstore using 'peerId'. + stream = await host.new_stream(peer_id, PROTOCOL_ID) + + asyncio.ensure_future(read_data(stream)) + asyncio.ensure_future(write_data(stream)) + + +@click.command() +@click.option('--port', help='source port number', default=8000) +@click.option('--destination', '-d', help="Destination multiaddr string") +@click.option('--help', is_flag=True, default=False, help='display help') +# @click.option('--debug', is_flag=True, default=False, help='Debug generates the same node ID on every execution') +def main(port, destination, help): + + if help: + print("This program demonstrates a simple p2p chat application using libp2p\n\n") + print("Usage: Run './chat -sp ' where can be any port number.") + print("Now run './chat -d ' where is multiaddress of previous listener host.") + return + + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(run(port, destination)) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + loop.close() + + +if __name__ == '__main__': + main() diff --git a/muxer/mplex/muxed_connection.py b/muxer/mplex/muxed_connection.py index c2fd3435..0a43640c 100644 --- a/muxer/mplex/muxed_connection.py +++ b/muxer/mplex/muxed_connection.py @@ -83,8 +83,12 @@ class MuxedConn(IMuxedConn): # << by 3, then or with flag header = (stream_id << 3) | flag header = encode_uvarint(header) - data_length = encode_uvarint(len(data)) - _bytes = header + data_length + data + if data is None: + data_length = encode_uvarint(0) + _bytes = header + data_length + else: + data_length = encode_uvarint(len(data)) + _bytes = header + data_length + data return await self.write_to_stream(_bytes) diff --git a/muxer/mplex/muxed_stream.py b/muxer/mplex/muxed_stream.py index 1d6bd5fb..11e04f49 100644 --- a/muxer/mplex/muxed_stream.py +++ b/muxer/mplex/muxed_stream.py @@ -49,7 +49,7 @@ class MuxedStream(IMuxedStream): """ return await self.muxed_conn.send_message(self.get_flag("MESSAGE"), data, self.stream_id) - def close(self): + async def close(self): """ close stream :return: true if successful @@ -58,7 +58,7 @@ class MuxedStream(IMuxedStream): if self.local_closed and self.remote_closed: return True - self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) + await self.muxed_conn.send_message(self.get_flag("CLOSE"), None, self.stream_id) self.muxed_conn.streams.pop(self.stream_id) self.local_closed = True diff --git a/network/network_interface.py b/network/network_interface.py index 0a30473a..323e679b 100644 --- a/network/network_interface.py +++ b/network/network_interface.py @@ -3,6 +3,13 @@ from abc import ABC, abstractmethod class INetwork(ABC): + @abstractmethod + def get_peer_id(self): + """ + :return: the peer id + """ + pass + @abstractmethod def set_stream_handler(self, protocol_id, stream_handler): """ diff --git a/network/stream/net_stream.py b/network/stream/net_stream.py index 0377c32c..8cfb6356 100644 --- a/network/stream/net_stream.py +++ b/network/stream/net_stream.py @@ -34,10 +34,10 @@ class NetStream(INetStream): """ return await self.muxed_stream.write(data) - def close(self): + async def close(self): """ close stream :return: true if successful """ - self.muxed_stream.close() + await self.muxed_stream.close() return True diff --git a/network/swarm.py b/network/swarm.py index 9a5dc3b0..41ef9bc6 100644 --- a/network/swarm.py +++ b/network/swarm.py @@ -4,11 +4,13 @@ from .stream.net_stream import NetStream from .multiaddr import MultiAddr from .connection.raw_connection import RawConnection +from peer.id import ID class Swarm(INetwork): def __init__(self, my_peer_id, peerstore, upgrader): - self.my_peer_id = my_peer_id + self._my_peer_id = my_peer_id + self.id = ID(my_peer_id) self.peerstore = peerstore self.upgrader = upgrader self.connections = dict() @@ -16,6 +18,9 @@ class Swarm(INetwork): self.stream_handlers = dict() self.transport = None + def get_peer_id(self): + return self.id + def set_stream_handler(self, protocol_id, stream_handler): """ :param protocol_id: protocol id used on stream @@ -103,6 +108,7 @@ class Swarm(INetwork): try: # Success listener = self.transport.create_listener(conn_handler) + self.listeners[multiaddr_str] = listener await listener.listen(multiaddr) return True except IOError: diff --git a/peer/id.py b/peer/id.py new file mode 100644 index 00000000..9295d2b9 --- /dev/null +++ b/peer/id.py @@ -0,0 +1,17 @@ +import base58 + +class ID: + + def __init__(self, id_str): + self._id_str = id_str + + def pretty(self): + return base58.b58encode(self._id_str).decode() + + def __str__(self): + pid = self.pretty() + if len(pid) <= 10: + return "" % pid + return "" % (pid[:2], pid[len(pid)-6:]) + + __repr__ = __str__ \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6a797138..4a2593e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ pylint pytest pycryptodome pytest-asyncio +click +base58 \ No newline at end of file