diff --git a/swap/src/network/swap_setup/bob.rs b/swap/src/network/swap_setup/bob.rs index 6b905495d..38566dde1 100644 --- a/swap/src/network/swap_setup/bob.rs +++ b/swap/src/network/swap_setup/bob.rs @@ -1,17 +1,14 @@ -use crate::network::swap_setup::{ - protocol, read_cbor_message, write_cbor_message, BlockchainNetwork, SpotPriceError, - SpotPriceRequest, SpotPriceResponse, -}; +use crate::network::swap_setup::{protocol, BlockchainNetwork, SpotPriceError, SpotPriceResponse}; use crate::protocol::bob::{State0, State2}; use crate::protocol::{Message1, Message3}; use crate::{bitcoin, cli, env, monero}; use anyhow::Result; use futures::future::{BoxFuture, OptionFuture}; -use futures::{AsyncWriteExt, FutureExt}; -use libp2p::core::connection::ConnectionId; +use futures::FutureExt; use libp2p::core::upgrade; use libp2p::swarm::{ - ConnectionDenied, ConnectionHandler, FromSwarm, KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm + ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, + NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; use std::collections::VecDeque; @@ -19,7 +16,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; use uuid::Uuid; -use void::Void; + +use super::{read_cbor_message, write_cbor_message, SpotPriceRequest}; #[allow(missing_debug_implementations)] pub struct Behaviour { @@ -52,7 +50,7 @@ impl From for cli::OutEvent { impl NetworkBehaviour for Behaviour { type ConnectionHandler = Handler; - type ToSwarm = ToSwarm; + type ToSwarm = Completed; fn handle_established_inbound_connection( &mut self, @@ -76,26 +74,17 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm) { match event { - FromSwarm::ConnectionEstablished(_) => {}, - FromSwarm::ConnectionClosed(_) => {}, - _ => {}, + FromSwarm::ConnectionEstablished(_) => {} + FromSwarm::ConnectionClosed(_) => {} + _ => {} } } - /*fn on_connection_handler_event( + fn on_connection_handler_event( &mut self, peer_id: PeerId, - _connection_id: ConnectionId, - event: ::OutEvent, - ) { - self.completed_swaps.push_back((peer_id, event)); - }*/ - - fn on_connection_handler_event( - &mut self, - peer_id: PeerId, - _connection_id: libp2p::swarm::ConnectionId, - event: THandlerOutEvent, + _connection_id: libp2p::swarm::ConnectionId, + event: THandlerOutEvent, ) { self.completed_swaps.push_back((peer_id, event)); } @@ -119,7 +108,7 @@ pub struct Handler { timeout: Duration, new_swaps: VecDeque, bitcoin_wallet: Arc, - keep_alive: KeepAlive, + keep_alive: bool, } impl Handler { @@ -130,7 +119,7 @@ impl Handler { timeout: Duration::from_secs(120), new_swaps: VecDeque::default(), bitcoin_wallet, - keep_alive: KeepAlive::Yes, + keep_alive: true, } } } @@ -147,10 +136,9 @@ pub struct NewSwap { #[derive(Debug)] pub struct Completed(Result); -impl ProtocolsHandler for Handler { - type InEvent = NewSwap; - type OutEvent = Completed; - type Error = Void; +impl ConnectionHandler for Handler { + type FromBehaviour = NewSwap; + type ToBehaviour = Completed; type InboundProtocol = upgrade::DeniedUpgrade; type OutboundProtocol = protocol::SwapSetup; type InboundOpenInfo = (); @@ -160,114 +148,125 @@ impl ProtocolsHandler for Handler { SubstreamProtocol::new(upgrade::DeniedUpgrade, ()) } - fn inject_fully_negotiated_inbound(&mut self, _: Void, _: Self::InboundOpenInfo) { - unreachable!("Bob does not support inbound substreams") - } - - fn inject_fully_negotiated_outbound( - &mut self, - mut substream: NegotiatedSubstream, - info: Self::OutboundOpenInfo, - ) { - let bitcoin_wallet = self.bitcoin_wallet.clone(); - let env_config = self.env_config; - - let protocol = tokio::time::timeout(self.timeout, async move { - write_cbor_message( - &mut substream, - SpotPriceRequest { - btc: info.btc, - blockchain_network: BlockchainNetwork { - bitcoin: env_config.bitcoin_network, - monero: env_config.monero_network, - }, - }, - ) - .await?; - - let xmr = Result::from(read_cbor_message::(&mut substream).await?)?; - - let state0 = State0::new( - info.swap_id, - &mut rand::thread_rng(), - info.btc, - xmr, - env_config.bitcoin_cancel_timelock, - env_config.bitcoin_punish_timelock, - info.bitcoin_refund_address, - env_config.monero_finality_confirmations, - info.tx_refund_fee, - info.tx_cancel_fee, - ); - - write_cbor_message(&mut substream, state0.next_message()).await?; - let message1 = read_cbor_message::(&mut substream).await?; - let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?; - - write_cbor_message(&mut substream, state1.next_message()).await?; - let message3 = read_cbor_message::(&mut substream).await?; - let state2 = state1.receive(message3)?; - - write_cbor_message(&mut substream, state2.next_message()).await?; - - substream.flush().await?; - substream.close().await?; - - Ok(state2) - }); - - let max_seconds = self.timeout.as_secs(); - self.outbound_stream = OptionFuture::from(Some( - async move { - protocol.await.map_err(|_| Error::Timeout { - seconds: max_seconds, - })? - } - .boxed(), - )); - } - - fn inject_event(&mut self, new_swap: Self::InEvent) { + fn on_behaviour_event(&mut self, new_swap: Self::FromBehaviour) { self.new_swaps.push_back(new_swap); } - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - _: ProtocolsHandlerUpgrErr, - ) { - } - - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.keep_alive } - #[allow(clippy::type_complexity)] fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< - ProtocolsHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, + ConnectionHandlerEvent, > { if let Some(new_swap) = self.new_swaps.pop_front() { - self.keep_alive = KeepAlive::Yes; - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.keep_alive = true; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(protocol::new(), new_swap), }); } - if let Some(result) = futures::ready!(self.outbound_stream.poll_unpin(cx)) { - self.outbound_stream = OptionFuture::from(None); - return Poll::Ready(ProtocolsHandlerEvent::Custom(Completed(result))); + if let Some(outbound_stream) = self.outbound_stream.as_mut() { + if let Poll::Ready(result) = outbound_stream.poll_unpin(cx) { + self.outbound_stream = None; + self.keep_alive = false; // Set to false after completing the stream + return Poll::Ready(ConnectionHandlerEvent::Custom(Completed(result))); + } } Poll::Pending } + + fn on_connection_event( + &mut self, + event: libp2p::swarm::handler::ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_, _) => { + unreachable!("Bob does not support inbound substreams") + } + libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(outbound) => { + let mut substream = outbound.protocol; + let info = outbound.info; + + let bitcoin_wallet = self.bitcoin_wallet.clone(); + let env_config = self.env_config; + + let bitcoin_wallet = self.bitcoin_wallet.clone(); + let env_config = self.env_config; + + let protocol = tokio::time::timeout(self.timeout, async move { + write_cbor_message( + &mut substream, + SpotPriceRequest { + btc: info.btc, + blockchain_network: BlockchainNetwork { + bitcoin: env_config.bitcoin_network, + monero: env_config.monero_network, + }, + }, + ) + .await?; + + let xmr = Result::from( + read_cbor_message::(&mut substream).await?, + )?; + + let state0 = State0::new( + info.swap_id, + &mut rand::thread_rng(), + info.btc, + xmr, + env_config.bitcoin_cancel_timelock, + env_config.bitcoin_punish_timelock, + info.bitcoin_refund_address, + env_config.monero_finality_confirmations, + info.tx_refund_fee, + info.tx_cancel_fee, + ); + + write_cbor_message(&mut substream, state0.next_message()).await?; + let message1 = read_cbor_message::(&mut substream).await?; + let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?; + + write_cbor_message(&mut substream, state1.next_message()).await?; + let message3 = read_cbor_message::(&mut substream).await?; + let state2 = state1.receive(message3)?; + + write_cbor_message(&mut substream, state2.next_message()).await?; + + substream.flush().await?; + substream.close().await?; + + Ok(state2) + }); + + let max_seconds = self.timeout.as_secs(); + self.outbound_stream = Some(Box::pin( + async move { + protocol.await.map_err(|_| Error::Timeout { + seconds: max_seconds, + })? + } + .boxed(), + )); + self.keep_alive = true; // Ensure the connection stays alive while processing + } + libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(dial_upgrade_err) => { + // Handle dial upgrade error if needed + self.keep_alive = false; // Consider setting to false on error + } + _ => {} + } + } } impl From for Result {