Handling Peer Messages
As we can see below, the only thing the Peer::create_peer
method does is initializing the Peer
and running its read_loop
method.
// Path: floresta-wire/src/p2p_wire/peer.rs
pub async fn create_peer<W: AsyncWrite + Unpin + Send + Sync + 'static>(
// ...
id: u32,
mempool: Arc<Mutex<Mempool>>,
node_tx: UnboundedSender<NodeNotification>,
node_requests: UnboundedReceiver<NodeRequest>,
address_id: usize,
kind: ConnectionKind,
actor_receiver: UnboundedReceiver<ReaderMessage>,
writer: WriteTransport<W>,
our_user_agent: String,
cancellation_sender: tokio::sync::oneshot::Sender<()>,
) {
let peer = Peer {
// Initializing the many Peer fields :P
address_id,
blocks_only: false,
current_best_block: -1,
id,
mempool,
last_ping: None,
last_message: Instant::now(),
node_tx,
services: ServiceFlags::NONE,
messages: 0,
start_time: Instant::now(),
user_agent: "".into(),
state: State::None,
send_headers: false,
node_requests,
kind,
wants_addrv2: false,
shutdown: false,
actor_receiver, // Add the receiver for messages from TcpStreamActor
writer,
our_user_agent,
cancellation_sender,
};
spawn(peer.read_loop());
}
This read_loop
method will in turn call a peer_loop_inner
method:
// Path: floresta-wire/src/p2p_wire/peer.rs
pub async fn read_loop(mut self) -> Result<()> {
let err = self.peer_loop_inner().await;
// Check any errors returned by the loop, shutdown the stream writer
// and send cancellation signal to close the stream reader (actor task)
// ...
if err.is_err() {
debug!("Peer {} connection loop closed: {err:?}", self.id);
}
self.send_to_node(PeerMessages::Disconnected(self.address_id))
.await;
// force the stream to shutdown to prevent leaking resources
if let Err(shutdown_err) = self.writer.shutdown().await {
debug!(
"Failed to shutdown writer for Peer {}: {shutdown_err:?}",
self.id
);
}
if let Err(cancellation_err) = self.cancellation_sender.send(()) {
debug!(
"Failed to propagate cancellation signal for Peer {}: {cancellation_err:?}",
self.id
);
}
if let Err(err) = err {
debug!("Peer {} connection loop closed: {err:?}", self.id);
}
Ok(())
}
The Peer Loop
The peer_loop_inner
method is the main loop execution that handles all communication between the Peer
component, the actual peer over the network, and the UtreexoNode
. It sends P2P messages to the peer, processes requests from the node, and manages responses from the peer.
- Initial Handshake and Main Loop: At the start, the method sends a version message to the peer using
peer_utils::build_version_message
, which initiates the handshake. Then the method enters an asynchronous loop where it handles node requests, peer messages, and ensures the peer connection remains healthy.
// Path: floresta-wire/src/p2p_wire/peer.rs
async fn peer_loop_inner(&mut self) -> Result<()> {
// send a version
let version = peer_utils::build_version_message(self.our_user_agent.clone());
self.write(version).await?;
self.state = State::SentVersion(Instant::now());
loop {
futures::select! {
request = tokio::time::timeout(Duration::from_secs(10), self.node_requests.recv()).fuse() => {
match request {
Ok(None) => {
return Err(PeerError::Channel);
},
Ok(Some(request)) => {
self.handle_node_request(request).await?;
},
Err(_) => {
// Timeout, do nothing
}
}
},
message = self.actor_receiver.recv().fuse() => {
match message {
None => {
return Err(PeerError::Channel);
}
Some(ReaderMessage::Error(e)) => {
return Err(e);
}
Some(ReaderMessage::Block(block)) => {
self.send_to_node(PeerMessages::Block(block)).await;
}
Some(ReaderMessage::Message(msg)) => {
self.handle_peer_message(msg).await?;
}
}
}
}
// ...
if self.shutdown {
return Ok(());
}
// If we send a ping and our peer doesn't respond in time, disconnect
if let Some(when) = self.last_ping {
if when.elapsed().as_secs() > PING_TIMEOUT {
return Err(PeerError::Timeout);
}
}
// Send a ping to check if this peer is still good
let last_message = self.last_message.elapsed().as_secs();
if last_message > SEND_PING_TIMEOUT {
if self.last_ping.is_some() {
continue;
}
let nonce = rand::random();
self.last_ping = Some(Instant::now());
self.write(NetworkMessage::Ping(nonce)).await?;
}
// divide the number of messages by the number of seconds we've been connected,
// if it's more than 10 msg/sec, this peer is sending us too many messages, and we should
// disconnect.
let msg_sec = self
.messages
.checked_div(Instant::now().duration_since(self.start_time).as_secs())
.unwrap_or(0);
if msg_sec > 10 {
error!(
"Peer {} is sending us too many messages, disconnecting",
self.id
);
return Err(PeerError::TooManyMessages);
}
if let State::SentVersion(when) = self.state {
if Instant::now().duration_since(when) > Duration::from_secs(10) {
return Err(PeerError::UnexpectedMessage);
}
}
}
}
-
Handling Node Requests: The method uses a
futures::select!
block to listen for requests fromUtreexoNode
viaself.node_requests
, with a 10-second timeout for each operation.- If a request is received, it is passed to the
handle_node_request
method for processing. - If the channel is closed (
Ok(None)
), the method exits with aPeerError::Channel
. - If the timeout expires without receiving a request, the method simply does nothing, allowing the loop to continue.
- If a request is received, it is passed to the
-
Processing Peer Messages: Simultaneously, the loop listens for messages from the TCP actor via
self.actor_receiver
. Depending on the type of message received:- Error: If an error is reported (closed channel or
ReaderMessage::Error
), the loop exits with the error. - Block Message: If a block is received, it is forwarded to
UtreexoNode
usingsend_to_node
. - Generic Message: Other peer messages are processed by the
handle_peer_message
method.
- Error: If an error is reported (closed channel or
// Path: floresta-wire/src/p2p_wire/peer.rs
async fn peer_loop_inner(&mut self) -> Result<()> {
// send a version
let version = peer_utils::build_version_message(self.our_user_agent.clone());
self.write(version).await?;
self.state = State::SentVersion(Instant::now());
loop {
futures::select! {
request = tokio::time::timeout(Duration::from_secs(10), self.node_requests.recv()).fuse() => {
match request {
Ok(None) => {
return Err(PeerError::Channel);
},
Ok(Some(request)) => {
self.handle_node_request(request).await?;
},
Err(_) => {
// Timeout, do nothing
}
}
},
message = self.actor_receiver.recv().fuse() => {
match message {
None => {
return Err(PeerError::Channel);
}
Some(ReaderMessage::Error(e)) => {
return Err(e);
}
Some(ReaderMessage::Block(block)) => {
self.send_to_node(PeerMessages::Block(block)).await;
}
Some(ReaderMessage::Message(msg)) => {
self.handle_peer_message(msg).await?;
}
}
}
}
// ...
if self.shutdown {
return Ok(());
}
// If we send a ping and our peer doesn't respond in time, disconnect
if let Some(when) = self.last_ping {
if when.elapsed().as_secs() > PING_TIMEOUT {
return Err(PeerError::Timeout);
}
}
// Send a ping to check if this peer is still good
let last_message = self.last_message.elapsed().as_secs();
if last_message > SEND_PING_TIMEOUT {
if self.last_ping.is_some() {
continue;
}
let nonce = rand::random();
self.last_ping = Some(Instant::now());
self.write(NetworkMessage::Ping(nonce)).await?;
}
// ...
// divide the number of messages by the number of seconds we've been connected,
// if it's more than 10 msg/sec, this peer is sending us too many messages, and we should
// disconnect.
let msg_sec = self
.messages
.checked_div(Instant::now().duration_since(self.start_time).as_secs())
.unwrap_or(0);
if msg_sec > 10 {
error!(
"Peer {} is sending us too many messages, disconnecting",
self.id
);
return Err(PeerError::TooManyMessages);
}
if let State::SentVersion(when) = self.state {
if Instant::now().duration_since(when) > Duration::from_secs(10) {
return Err(PeerError::UnexpectedMessage);
}
}
}
}
-
Shutdown Check: The loop continually checks if the
shutdown
flag is set. If it is, the loop exits gracefully. -
Ping Management: To maintain the connection, the method sends periodic
NetworkMessage::Ping
s. If the peer fails to respond within a timeout (PING_TIMEOUT
), the connection is terminated. Additionally, if no messages have been exchanged for a period (SEND_PING_TIMEOUT
), a new ping is sent, and the timestamp is updated.
Currently, we disconnect if a peer doesn't respond to a ping within 30 seconds, and we send a ping 60 seconds after the last message.
// Path: floresta-wire/src/p2p_wire/peer.rs const PING_TIMEOUT: u64 = 30; const SEND_PING_TIMEOUT: u64 = 60;
// Path: floresta-wire/src/p2p_wire/peer.rs
async fn peer_loop_inner(&mut self) -> Result<()> {
// send a version
let version = peer_utils::build_version_message(self.our_user_agent.clone());
self.write(version).await?;
self.state = State::SentVersion(Instant::now());
loop {
futures::select! {
request = tokio::time::timeout(Duration::from_secs(10), self.node_requests.recv()).fuse() => {
match request {
Ok(None) => {
return Err(PeerError::Channel);
},
Ok(Some(request)) => {
self.handle_node_request(request).await?;
},
Err(_) => {
// Timeout, do nothing
}
}
},
message = self.actor_receiver.recv().fuse() => {
match message {
None => {
return Err(PeerError::Channel);
}
Some(ReaderMessage::Error(e)) => {
return Err(e);
}
Some(ReaderMessage::Block(block)) => {
self.send_to_node(PeerMessages::Block(block)).await;
}
Some(ReaderMessage::Message(msg)) => {
self.handle_peer_message(msg).await?;
}
}
}
}
if self.shutdown {
return Ok(());
}
// If we send a ping and our peer doesn't respond in time, disconnect
if let Some(when) = self.last_ping {
if when.elapsed().as_secs() > PING_TIMEOUT {
return Err(PeerError::Timeout);
}
}
// Send a ping to check if this peer is still good
let last_message = self.last_message.elapsed().as_secs();
if last_message > SEND_PING_TIMEOUT {
if self.last_ping.is_some() {
continue;
}
let nonce = rand::random();
self.last_ping = Some(Instant::now());
self.write(NetworkMessage::Ping(nonce)).await?;
}
// ...
// divide the number of messages by the number of seconds we've been connected,
// if it's more than 10 msg/sec, this peer is sending us too many messages, and we should
// disconnect.
let msg_sec = self
.messages
.checked_div(Instant::now().duration_since(self.start_time).as_secs())
.unwrap_or(0);
if msg_sec > 10 {
error!(
"Peer {} is sending us too many messages, disconnecting",
self.id
);
return Err(PeerError::TooManyMessages);
}
if let State::SentVersion(when) = self.state {
if Instant::now().duration_since(when) > Duration::from_secs(10) {
return Err(PeerError::UnexpectedMessage);
}
}
}
}
-
Rate Limiting: The method calculates the rate of messages received from the peer. If the peer sends more than 10 messages per second on average, it is deemed misbehaving, and the connection is closed.
-
Handshake Timeout: If the peer does not respond to the version message within 10 seconds, the loop exits with an error, as the expected handshake flow was not completed.
Handshake Process
In this Peer
execution loop we have also seen a State
type, stored in the Peer.state
field. This represents the state of the handshake with the peer:
// Path: floresta-wire/src/p2p_wire/peer.rs
enum State {
None,
SentVersion(Instant),
SentVerack,
Connected,
}
None
is the initial state when the Peer
is created, but shortly after that it will be updated with SentVersion
, when we initiate the handshake by sending our NetworkMessage::Version
.
If the peer is responsive, we will hear back from her within the next 10 seconds, via her NetworkMessage::Version
, which will be handled by the handle_peer_message
(that we saw in the third step). This method will internally save data from the peer, send her a NetworkMessage::Verack
(i.e. the acknowledgment of her message), and update the state to SentVerack
.
Finally, when we receive the NetworkMessage::Verack
from the peer, we can switch to the Connected
state, and communicate the new peer data with UtreexoNode
.
Node Communication Lifecycle
Once connected to a peer, UtreexoNode
can send requests and receive responses.
-
It interacts with a specific peer through
NodeCommon.peers
and usesLocalPeerView.channel
to send requests. -
Peer
receives the request message and handles it viahandle_node_request
(that we saw in the second step). This method will perform the write operation. -
When the peer responds with a message, it is received via the message
actor_receiver
and handled by thehandle_peer_message
method, which likely passes new data back toUtreexoNode
, continuing the communication loop.