Peer-to-Peer Networking
In the previous chapter, we learned how UtreexoNode
opens connections, although we didn't dive into the low-level networking details. We mentioned that each peer connection is handled by the Peer
type, keeping the peer networking logic separate from UtreexoNode
.
In this chapter, we will explore the details of Peer
operations, beginning with the low-level logic for opening connections (i.e. Peer
creation).
Peer Creation
Recall that in the open_connection
method on UtreexoNode
we call either UtreexoNode::open_proxy_connection
or UtreexoNode::open_non_proxy_connection
, depending on the self.socks5
proxy option. It's within these two functions that the Peer
is created. Let's first learn how the direct TCP connection is opened!
The open_non_proxy_connection
function will first retrieve the peer's network address and port from the provided LocalAddress
. Then it will attempt to establish a TCP connection using transport::connect
(implemented in the transport
module, which handles both the v1 and v2 transport protocols).
If successful, we get the transport reader and writer, which are of type ReadTransport
and WriteTransport
, defined in p2p_wire/transport.rs. Respectively, these two types wrap a tokio
ReadHalf<TcpStream>
, for receiving data, and a WriteHalf<TcpStream>
, for sending data to the peer.
It then sets up an 'actor', that is, an independent component that reads incoming messages and communicates them to the 'actor receiver'. The actor is effectively a transport reader wrapper.
// Path: floresta-wire/src/p2p_wire/node.rs
pub(crate) async fn open_non_proxy_connection(
kind: ConnectionKind,
peer_id: usize,
address: LocalAddress,
requests_rx: UnboundedReceiver<NodeRequest>,
peer_id_count: u32,
mempool: Arc<Mutex<Mempool>>,
network: bitcoin::Network,
node_tx: UnboundedSender<NodeNotification>,
user_agent: String,
allow_v1_fallback: bool,
) -> Result<(), WireError> {
let address = (address.get_net_address(), address.get_port());
let (transport_reader, transport_writer) =
transport::connect(address, network, allow_v1_fallback).await?;
let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
_ = actor.run() => {}
}
});
// Use create_peer function instead of manually creating the peer
Peer::<WriteHalf>::create_peer(
peer_id_count,
mempool,
node_tx.clone(),
requests_rx,
peer_id,
kind,
actor_receiver,
transport_writer,
user_agent,
cancellation_sender,
)
.await;
Ok(())
}
This actor is obtained via the create_actors
function, implemented in p2p_wire/peer.rs, and is of type MessageActor
. The actor is spawned as a separate asynchronous task, ensuring it runs independently to handle incoming data.
Very importantly, the actor for a peer must be closed when the connection finalizes, and this is why we have an additional one-time-use channel, used by the Peer
type to send a cancellation signal (i.e. "Peer connection is closed, so we don't need to listen to the peer anymore"). The tokio::select
macro ensures that the async actor task is dropped whenever a cancellation signal is received from Peer
.
Finally, the Peer
instance is created using the Peer::create_peer
function. The communication channels (internal and over the P2P network) that the Peer
uses are:
- The node sender (
node_tx
): to send messages toUtreexoNode
. - The requests receiver (
requests_rx
): to receive requests fromUtreexoNode
that will be sent to the peer. - The
actor_receiver
: to receive peer messages. - The
transport_writer
: to send messages to the peer. - The
cancellation_sender
: to close the reader actor task.
By the end of this function, a fully initialized Peer
is ready to manage communication with the connected peer via TCP (writing side) and via MessageActor
(reading side), as well as communicating with UtreexoNode
.
Proxy Connection
The open_proxy_connection
is pretty much the same, except we get the transport reader and writer from the proxy connection instead, handled by transport::connect_proxy
.
// Path: floresta-wire/src/p2p_wire/node.rs
pub(crate) async fn open_proxy_connection(
proxy: SocketAddr,
// ...
kind: ConnectionKind,
mempool: Arc<Mutex<Mempool>>,
network: bitcoin::Network,
node_tx: UnboundedSender<NodeNotification>,
peer_id: usize,
address: LocalAddress,
requests_rx: UnboundedReceiver<NodeRequest>,
peer_id_count: u32,
user_agent: String,
allow_v1_fallback: bool,
) -> Result<(), WireError> {
let (transport_reader, transport_writer) =
transport::connect_proxy(proxy, address, network, allow_v1_fallback).await?;
let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_actors(transport_reader);
tokio::spawn(async move {
tokio::select! {
_ = cancellation_receiver => {}
_ = actor.run() => {}
}
});
Peer::<WriteHalf>::create_peer(
// Same as before
peer_id_count,
mempool,
node_tx,
requests_rx,
peer_id,
kind,
actor_receiver,
transport_writer,
user_agent,
cancellation_sender,
)
.await;
Ok(())
}
Recap of Channels
Let's do a brief recap of the channels we have opened for internal node message passing:
-
Node Channel (
Peer
->UtreexoNode
)Peer
sends vianode_tx
UtreexoNode
receives viaNodeCommon.node_rx
-
Requests Channel (
UtreexoNode
->Peer
)UtreexoNode
sends via eachLocalPeerView.channel
, stored inNodeCommon.peers
Peer
receives via itsrequests_rx
-
Message Actor Channel (
MessageActor
->Peer
)MessageActor
sends viaactor_sender
Peer
receives viaactor_receiver
-
Cancellation Signal Channel (
Peer
->UtreexoNode
)Peer
sends the signal viacancellation_sender
at the end of the connectionUtreexoNode
receives it viacancellation_receiver
UtreexoNode
sends requests via the Request Channel to the Peer
component (which then forwards them to the peer via TCP), Peer
receives the result or other peer messages via the Actor Channel, and then it notifies UtreexoNode
via the Node Channel. When the peer connection is closed, Peer
uses the Cancellation Signal Channel to allow the TCP actor listening to the peer to be closed as well.
Next, we'll explore how messages are read and sent in the P2P network!