diff --git a/coordinator/migrations/2024-02-29-142400_add_dlc_action/down.sql b/coordinator/migrations/2024-02-29-142400_add_dlc_action/down.sql new file mode 100644 index 000000000..7451f4e85 --- /dev/null +++ b/coordinator/migrations/2024-02-29-142400_add_dlc_action/down.sql @@ -0,0 +1,4 @@ + +ALTER TABLE dlc_protocols DROP COLUMN IF EXISTS "protocol_type"; + +DROP TYPE IF EXISTS "Protocol_Type_Type"; diff --git a/coordinator/migrations/2024-02-29-142400_add_dlc_action/up.sql b/coordinator/migrations/2024-02-29-142400_add_dlc_action/up.sql new file mode 100644 index 000000000..3eef96d4e --- /dev/null +++ b/coordinator/migrations/2024-02-29-142400_add_dlc_action/up.sql @@ -0,0 +1,4 @@ + +CREATE TYPE "Protocol_Type_Type" AS ENUM ('open', 'renew', 'settle', 'close', 'force-close', 'rollover'); + +ALTER TABLE dlc_protocols ADD COLUMN "protocol_type" "Protocol_Type_Type" NOT NULL DEFAULT 'open'; diff --git a/coordinator/src/db/custom_types.rs b/coordinator/src/db/custom_types.rs index 80b166130..138e1ebf6 100644 --- a/coordinator/src/db/custom_types.rs +++ b/coordinator/src/db/custom_types.rs @@ -1,6 +1,7 @@ use crate::db::channels::ChannelState; use crate::db::dlc_messages::MessageType; use crate::db::dlc_protocols::DlcProtocolState; +use crate::db::dlc_protocols::DlcProtocolType; use crate::db::polls::PollType; use crate::db::positions::ContractSymbol; use crate::db::positions::PositionState; @@ -11,6 +12,7 @@ use crate::schema::sql_types::MessageTypeType; use crate::schema::sql_types::PollTypeType; use crate::schema::sql_types::PositionStateType; use crate::schema::sql_types::ProtocolStateType; +use crate::schema::sql_types::ProtocolTypeType; use diesel::deserialize; use diesel::deserialize::FromSql; use diesel::pg::Pg; @@ -199,7 +201,35 @@ impl FromSql for DlcProtocolState { b"Pending" => Ok(DlcProtocolState::Pending), b"Success" => Ok(DlcProtocolState::Success), b"Failed" => Ok(DlcProtocolState::Failed), - _ => Err("Unrecognized enum variant for ContractTransactionType".into()), + _ => Err("Unrecognized enum variant for ProtocolStateType".into()), + } + } +} + +impl ToSql for DlcProtocolType { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + DlcProtocolType::Open => out.write_all(b"open")?, + DlcProtocolType::Settle => out.write_all(b"settle")?, + DlcProtocolType::Renew => out.write_all(b"renew")?, + DlcProtocolType::Rollover => out.write_all(b"rollover")?, + DlcProtocolType::Close => out.write_all(b"close")?, + DlcProtocolType::ForceClose => out.write_all(b"force-close")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for DlcProtocolType { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"open" => Ok(DlcProtocolType::Open), + b"settle" => Ok(DlcProtocolType::Settle), + b"renew" => Ok(DlcProtocolType::Renew), + b"rollover" => Ok(DlcProtocolType::Rollover), + b"close" => Ok(DlcProtocolType::Close), + b"force-close" => Ok(DlcProtocolType::ForceClose), + _ => Err("Unrecognized enum variant for ProtocolTypeType".into()), } } } diff --git a/coordinator/src/db/dlc_protocols.rs b/coordinator/src/db/dlc_protocols.rs index c875ef73b..474bb7487 100644 --- a/coordinator/src/db/dlc_protocols.rs +++ b/coordinator/src/db/dlc_protocols.rs @@ -1,7 +1,9 @@ +use crate::db; use crate::dlc_protocol; use crate::dlc_protocol::ProtocolId; use crate::schema::dlc_protocols; use crate::schema::sql_types::ProtocolStateType; +use crate::schema::sql_types::ProtocolTypeType; use bitcoin::secp256k1::PublicKey; use diesel::query_builder::QueryId; use diesel::AsExpression; @@ -37,6 +39,26 @@ impl QueryId for ProtocolStateType { } } +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression, Eq, Hash)] +#[diesel(sql_type = ProtocolTypeType)] +pub(crate) enum DlcProtocolType { + Open, + Renew, + Settle, + Close, + ForceClose, + Rollover, +} + +impl QueryId for ProtocolTypeType { + type QueryId = ProtocolTypeType; + const HAS_STATIC_QUERY_ID: bool = false; + + fn query_id() -> Option { + None + } +} + #[derive(Queryable, Debug)] #[diesel(table_name = protocols)] #[allow(dead_code)] // We have to allow dead code here because diesel needs the fields to be able to derive queryable. @@ -49,17 +71,52 @@ pub(crate) struct DlcProtocol { pub protocol_state: DlcProtocolState, pub trader_pubkey: String, pub timestamp: OffsetDateTime, + pub protocol_type: DlcProtocolType, } pub(crate) fn get_dlc_protocol( conn: &mut PgConnection, protocol_id: ProtocolId, ) -> QueryResult { - let contract_transaction: DlcProtocol = dlc_protocols::table + let dlc_protocol: DlcProtocol = dlc_protocols::table .filter(dlc_protocols::protocol_id.eq(protocol_id.to_uuid())) .first(conn)?; - Ok(dlc_protocol::DlcProtocol::from(contract_transaction)) + let protocol_type = match dlc_protocol.protocol_type { + DlcProtocolType::Open => { + let trade_params = db::trade_params::get(conn, protocol_id)?; + dlc_protocol::DlcProtocolType::Open { trade_params } + } + DlcProtocolType::Renew => { + let trade_params = db::trade_params::get(conn, protocol_id)?; + dlc_protocol::DlcProtocolType::Renew { trade_params } + } + DlcProtocolType::Settle => { + let trade_params = db::trade_params::get(conn, protocol_id)?; + dlc_protocol::DlcProtocolType::Settle { trade_params } + } + DlcProtocolType::Close => dlc_protocol::DlcProtocolType::Close { + trader: PublicKey::from_str(&dlc_protocol.trader_pubkey).expect("valid public key"), + }, + DlcProtocolType::ForceClose => dlc_protocol::DlcProtocolType::ForceClose { + trader: PublicKey::from_str(&dlc_protocol.trader_pubkey).expect("valid public key"), + }, + DlcProtocolType::Rollover => dlc_protocol::DlcProtocolType::Rollover { + trader: PublicKey::from_str(&dlc_protocol.trader_pubkey).expect("valid public key"), + }, + }; + + let protocol = dlc_protocol::DlcProtocol { + id: dlc_protocol.protocol_id.into(), + timestamp: dlc_protocol.timestamp, + channel_id: DlcChannelId::from_hex(&dlc_protocol.channel_id).expect("valid dlc channel id"), + contract_id: ContractId::from_hex(&dlc_protocol.contract_id).expect("valid contract id"), + trader: PublicKey::from_str(&dlc_protocol.trader_pubkey).expect("valid public key"), + protocol_state: dlc_protocol.protocol_state.into(), + protocol_type, + }; + + Ok(protocol) } pub(crate) fn set_dlc_protocol_state_to_failed( @@ -81,8 +138,8 @@ pub(crate) fn set_dlc_protocol_state_to_failed( pub(crate) fn set_dlc_protocol_state_to_success( conn: &mut PgConnection, protocol_id: ProtocolId, - contract_id: ContractId, - channel_id: DlcChannelId, + contract_id: &ContractId, + channel_id: &DlcChannelId, ) -> QueryResult<()> { let affected_rows = diesel::update(dlc_protocols::table) .filter(dlc_protocols::protocol_id.eq(protocol_id.to_uuid())) @@ -104,8 +161,9 @@ pub(crate) fn create( conn: &mut PgConnection, protocol_id: ProtocolId, previous_protocol_id: Option, - contract_id: ContractId, - channel_id: DlcChannelId, + contract_id: &ContractId, + channel_id: &DlcChannelId, + protocol_type: dlc_protocol::DlcProtocolType, trader: &PublicKey, ) -> QueryResult<()> { let affected_rows = diesel::insert_into(dlc_protocols::table) @@ -117,6 +175,7 @@ pub(crate) fn create( dlc_protocols::protocol_state.eq(DlcProtocolState::Pending), dlc_protocols::trader_pubkey.eq(trader.to_string()), dlc_protocols::timestamp.eq(OffsetDateTime::now_utc()), + dlc_protocols::protocol_type.eq(DlcProtocolType::from(protocol_type)), )) .execute(conn)?; @@ -127,19 +186,6 @@ pub(crate) fn create( Ok(()) } -impl From for dlc_protocol::DlcProtocol { - fn from(value: DlcProtocol) -> Self { - dlc_protocol::DlcProtocol { - id: value.protocol_id.into(), - timestamp: value.timestamp, - channel_id: DlcChannelId::from_hex(&value.channel_id).expect("valid dlc channel id"), - contract_id: ContractId::from_hex(&value.contract_id).expect("valid contract id"), - trader: PublicKey::from_str(&value.trader_pubkey).expect("valid public key"), - protocol_state: value.protocol_state.into(), - } - } -} - impl From for DlcProtocolState { fn from(value: dlc_protocol::DlcProtocolState) -> Self { match value { @@ -159,3 +205,16 @@ impl From for dlc_protocol::DlcProtocolState { } } } + +impl From for DlcProtocolType { + fn from(value: dlc_protocol::DlcProtocolType) -> Self { + match value { + dlc_protocol::DlcProtocolType::Open { .. } => DlcProtocolType::Open, + dlc_protocol::DlcProtocolType::Renew { .. } => DlcProtocolType::Renew, + dlc_protocol::DlcProtocolType::Settle { .. } => DlcProtocolType::Settle, + dlc_protocol::DlcProtocolType::Close { .. } => DlcProtocolType::Close, + dlc_protocol::DlcProtocolType::ForceClose { .. } => DlcProtocolType::ForceClose, + dlc_protocol::DlcProtocolType::Rollover { .. } => DlcProtocolType::Rollover, + } + } +} diff --git a/coordinator/src/db/positions.rs b/coordinator/src/db/positions.rs index 085cae41b..a323c4414 100644 --- a/coordinator/src/db/positions.rs +++ b/coordinator/src/db/positions.rs @@ -300,8 +300,8 @@ impl Position { conn: &mut PgConnection, trader_pubkey: String, temporary_contract_id: ContractId, - ) -> Result<()> { - let affected_rows = diesel::update(positions::table) + ) -> QueryResult { + diesel::update(positions::table) .filter(positions::trader_pubkey.eq(trader_pubkey)) .filter( positions::position_state @@ -313,11 +313,7 @@ impl Position { positions::temporary_contract_id.eq(hex::encode(temporary_contract_id)), positions::update_timestamp.eq(OffsetDateTime::now_utc()), )) - .execute(conn)?; - - ensure!(affected_rows > 0, "Could not set position to open"); - - Ok(()) + .execute(conn) } pub fn update_unrealized_pnl(conn: &mut PgConnection, id: i32, pnl: i64) -> Result<()> { diff --git a/coordinator/src/db/trade_params.rs b/coordinator/src/db/trade_params.rs index c6d7aeb54..bdb248b2d 100644 --- a/coordinator/src/db/trade_params.rs +++ b/coordinator/src/db/trade_params.rs @@ -9,7 +9,6 @@ use diesel::QueryDsl; use diesel::QueryResult; use diesel::Queryable; use diesel::RunQueryDsl; -use rust_decimal::prelude::ToPrimitive; use std::str::FromStr; use uuid::Uuid; @@ -29,21 +28,16 @@ pub(crate) struct TradeParams { pub(crate) fn insert( conn: &mut PgConnection, protocol_id: ProtocolId, - params: &commons::TradeParams, + params: &dlc_protocol::TradeParams, ) -> QueryResult<()> { - let average_price = params - .average_execution_price() - .to_f32() - .expect("to fit into f32"); - let affected_rows = diesel::insert_into(trade_params::table) .values(&( trade_params::protocol_id.eq(protocol_id.to_uuid()), trade_params::quantity.eq(params.quantity), trade_params::leverage.eq(params.leverage), - trade_params::trader_pubkey.eq(params.pubkey.to_string()), + trade_params::trader_pubkey.eq(params.trader.to_string()), trade_params::direction.eq(Direction::from(params.direction)), - trade_params::average_price.eq(average_price), + trade_params::average_price.eq(params.average_price), )) .execute(conn)?; diff --git a/coordinator/src/dlc_protocol.rs b/coordinator/src/dlc_protocol.rs index c5131dd56..7099c22d2 100644 --- a/coordinator/src/dlc_protocol.rs +++ b/coordinator/src/dlc_protocol.rs @@ -1,6 +1,7 @@ use crate::db; use crate::position::models::PositionState; use crate::trade::models::NewTrade; +use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; use diesel::r2d2::ConnectionManager; @@ -8,10 +9,12 @@ use diesel::r2d2::Pool; use diesel::result::Error::RollbackTransaction; use diesel::Connection; use diesel::PgConnection; +use diesel::QueryResult; use dlc_manager::ContractId; use dlc_manager::ReferenceId; use ln_dlc_node::node::rust_dlc_manager::DlcChannelId; use rust_decimal::prelude::FromPrimitive; +use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use std::fmt::Display; use std::fmt::Formatter; @@ -102,8 +105,10 @@ pub struct DlcProtocol { pub contract_id: ContractId, pub trader: PublicKey, pub protocol_state: DlcProtocolState, + pub protocol_type: DlcProtocolType, } +#[derive(Clone, Debug)] pub struct TradeParams { pub protocol_id: ProtocolId, pub trader: PublicKey, @@ -113,12 +118,57 @@ pub struct TradeParams { pub direction: Direction, } +impl From<(ProtocolId, &commons::TradeParams)> for TradeParams { + fn from((protocol_id, trade_params): (ProtocolId, &commons::TradeParams)) -> Self { + Self { + protocol_id, + trader: trade_params.pubkey, + quantity: trade_params.quantity, + leverage: trade_params.leverage, + average_price: trade_params + .average_execution_price() + .to_f32() + .expect("to fit into f32"), + direction: trade_params.direction, + } + } +} + pub enum DlcProtocolState { Pending, Success, Failed, } +#[derive(Clone, Debug)] +pub enum DlcProtocolType { + Open { trade_params: TradeParams }, + Renew { trade_params: TradeParams }, + Settle { trade_params: TradeParams }, + Close { trader: PublicKey }, + ForceClose { trader: PublicKey }, + Rollover { trader: PublicKey }, +} + +impl DlcProtocolType { + pub fn get_trader_pubkey(&self) -> &PublicKey { + match self { + DlcProtocolType::Open { + trade_params: TradeParams { trader, .. }, + } => trader, + DlcProtocolType::Renew { + trade_params: TradeParams { trader, .. }, + } => trader, + DlcProtocolType::Settle { + trade_params: TradeParams { trader, .. }, + } => trader, + DlcProtocolType::Close { trader } => trader, + DlcProtocolType::ForceClose { trader } => trader, + DlcProtocolType::Rollover { trader } => trader, + } + } +} + pub struct DlcProtocolExecutor { pool: Pool>, } @@ -136,9 +186,9 @@ impl DlcProtocolExecutor { &self, protocol_id: ProtocolId, previous_protocol_id: Option, - contract_id: ContractId, - channel_id: DlcChannelId, - trade_params: &commons::TradeParams, + contract_id: &ContractId, + channel_id: &DlcChannelId, + protocol_type: DlcProtocolType, ) -> Result<()> { let mut conn = self.pool.get()?; conn.transaction(|conn| { @@ -148,9 +198,18 @@ impl DlcProtocolExecutor { previous_protocol_id, contract_id, channel_id, - &trade_params.pubkey, + protocol_type.clone(), + protocol_type.get_trader_pubkey(), )?; - db::trade_params::insert(conn, protocol_id, trade_params)?; + + match protocol_type { + DlcProtocolType::Open { trade_params } + | DlcProtocolType::Renew { trade_params } + | DlcProtocolType::Settle { trade_params } => { + db::trade_params::insert(conn, protocol_id, &trade_params)?; + } + _ => {} + } diesel::result::QueryResult::Ok(()) })?; @@ -165,126 +224,244 @@ impl DlcProtocolExecutor { Ok(()) } - /// Completes the dlc protocol as successful and updates the 10101 meta data - /// accordingly in a single database transaction. - /// - Set dlc protocol to success - /// - If not closing: Updates the `[PostionState::Proposed`] position state to - /// `[PostionState::Open]` - /// - If closing: Calculates the pnl and sets the `[PositionState::Closing`] position state to - /// `[PositionState::Closed`] - /// - Creates and inserts the new trade + /// Finishes a dlc protocol by the corresponding dlc protocol type handling. pub fn finish_dlc_protocol( &self, protocol_id: ProtocolId, - closing: bool, - contract_id: ContractId, - channel_id: DlcChannelId, + trader_id: &PublicKey, + contract_id: Option, + channel_id: &DlcChannelId, ) -> Result<()> { let mut conn = self.pool.get()?; - conn.transaction(|conn| { - let trade_params: TradeParams = db::trade_params::get(conn, protocol_id)?; - - db::dlc_protocols::set_dlc_protocol_state_to_success( - conn, - protocol_id, - contract_id, - channel_id, - )?; + let dlc_protocol = db::dlc_protocols::get_dlc_protocol(conn, protocol_id)?; + + match dlc_protocol.protocol_type { + DlcProtocolType::Open { trade_params } + | DlcProtocolType::Renew { trade_params } => { + let contract_id = contract_id + .context("missing contract id") + .map_err(|_| RollbackTransaction)?; + self.finish_open_trade_dlc_protocol( + conn, + trade_params, + protocol_id, + &contract_id, + channel_id, + ) + } + DlcProtocolType::Settle { trade_params } => { + let settled_contract = &dlc_protocol.contract_id; - // TODO(holzeis): We are still updating the position based on the position state. This - // will change once we only have a single position per user and representing - // the position only as view on multiple trades. - let position = match closing { - false => db::positions::Position::update_proposed_position( - conn, - trade_params.trader.to_string(), - PositionState::Open, - ), - true => { - let position = match db::positions::Position::get_position_by_trader( + self.finish_close_trade_dlc_protocol( + conn, + trade_params, + protocol_id, + // If the contract got settled, we do not get a new contract id, hence we + // copy the contract id of the settled contract. + settled_contract, + channel_id, + ) + } + DlcProtocolType::Rollover { .. } => { + let contract_id = contract_id + .context("missing contract id") + .map_err(|_| RollbackTransaction)?; + self.finish_rollover_dlc_protocol( conn, - trade_params.trader, - vec![ - // The price doesn't matter here. - PositionState::Closing { closing_price: 0.0 }, - ], - )? { - Some(position) => position, - None => { - tracing::error!("No position in state Closing found."); - return Err(RollbackTransaction); - } - }; - - tracing::debug!( - ?position, - trader_id = %trade_params.trader, - "Finalize closing position", - ); - - let pnl = { - let (initial_margin_long, initial_margin_short) = - match trade_params.direction { - Direction::Long => { - (position.trader_margin, position.coordinator_margin) - } - Direction::Short => { - (position.coordinator_margin, position.trader_margin) - } - }; - - match calculate_pnl( - Decimal::from_f32(position.average_entry_price) - .expect("to fit into decimal"), - Decimal::from_f32(trade_params.average_price) - .expect("to fit into decimal"), - trade_params.quantity, - trade_params.direction, - initial_margin_long as u64, - initial_margin_short as u64, - ) { - Ok(pnl) => pnl, - Err(e) => { - tracing::error!("Failed to calculate pnl. Error: {e:#}"); - return Err(RollbackTransaction); - } - } - }; - - db::positions::Position::set_position_to_closed_with_pnl(conn, position.id, pnl) + trader_id, + protocol_id, + &contract_id, + channel_id, + ) + } + DlcProtocolType::Close { .. } | DlcProtocolType::ForceClose { .. } => { + debug_assert!(false, "Finishing unexpected dlc protocol types"); + Ok(()) } - }?; + } + })?; - let coordinator_margin = calculate_margin( - Decimal::try_from(trade_params.average_price).expect("to fit into decimal"), - trade_params.quantity, - crate::trade::coordinator_leverage_for_trade(&trade_params.trader) - .map_err(|_| RollbackTransaction)?, - ); - - // TODO(holzeis): Add optional pnl to trade. - // Instead of tracking pnl on the position we want to track pnl on the trade. e.g. Long - // -> Short or Short -> Long. - let new_trade = NewTrade { - position_id: position.id, - contract_symbol: position.contract_symbol, - trader_pubkey: trade_params.trader, - quantity: trade_params.quantity, - trader_leverage: trade_params.leverage, - coordinator_margin: coordinator_margin as i64, - trader_direction: trade_params.direction, - average_price: trade_params.average_price, - dlc_expiry_timestamp: None, + Ok(()) + } + + /// Completes the close trade dlc protocol as successful and updates the 10101 meta data + /// accordingly in a single database transaction. + /// - Set dlc protocol to success + /// - Calculates the pnl and sets the `[PositionState::Closing`] position state to + /// `[PositionState::Closed`] + /// - Creates and inserts the new trade + fn finish_close_trade_dlc_protocol( + &self, + conn: &mut PgConnection, + trade_params: TradeParams, + protocol_id: ProtocolId, + settled_contract: &ContractId, + channel_id: &DlcChannelId, + ) -> QueryResult<()> { + db::dlc_protocols::set_dlc_protocol_state_to_success( + conn, + protocol_id, + settled_contract, + channel_id, + )?; + + // TODO(holzeis): We are still updating the position based on the position state. This + // will change once we only have a single position per user and representing + // the position only as view on multiple trades. + let position = match db::positions::Position::get_position_by_trader( + conn, + trade_params.trader, + vec![ + // The price doesn't matter here. + PositionState::Closing { closing_price: 0.0 }, + ], + )? { + Some(position) => position, + None => { + tracing::error!("No position in state Closing found."); + return Err(RollbackTransaction); + } + }; + + tracing::debug!( + ?position, + trader_id = %trade_params.trader, + "Finalize closing position", + ); + + let pnl = { + let (initial_margin_long, initial_margin_short) = match trade_params.direction { + Direction::Long => (position.trader_margin, position.coordinator_margin), + Direction::Short => (position.coordinator_margin, position.trader_margin), }; - db::trades::insert(conn, new_trade)?; + match calculate_pnl( + Decimal::from_f32(position.average_entry_price).expect("to fit into decimal"), + Decimal::from_f32(trade_params.average_price).expect("to fit into decimal"), + trade_params.quantity, + trade_params.direction, + initial_margin_long as u64, + initial_margin_short as u64, + ) { + Ok(pnl) => pnl, + Err(e) => { + tracing::error!("Failed to calculate pnl. Error: {e:#}"); + return Err(RollbackTransaction); + } + } + }; + + db::positions::Position::set_position_to_closed_with_pnl(conn, position.id, pnl)?; + + let coordinator_margin = calculate_margin( + Decimal::try_from(trade_params.average_price).expect("to fit into decimal"), + trade_params.quantity, + crate::trade::coordinator_leverage_for_trade(&trade_params.trader) + .map_err(|_| RollbackTransaction)?, + ); + + // TODO(holzeis): Add optional pnl to trade. + // Instead of tracking pnl on the position we want to track pnl on the trade. e.g. Long + // -> Short or Short -> Long. + let new_trade = NewTrade { + position_id: position.id, + contract_symbol: position.contract_symbol, + trader_pubkey: trade_params.trader, + quantity: trade_params.quantity, + trader_leverage: trade_params.leverage, + coordinator_margin: coordinator_margin as i64, + trader_direction: trade_params.direction, + average_price: trade_params.average_price, + dlc_expiry_timestamp: None, + }; + + db::trades::insert(conn, new_trade)?; + + db::trade_params::delete(conn, protocol_id)?; - db::trade_params::delete(conn, protocol_id) - })?; + Ok(()) + } + + /// Completes the open trade dlc protocol as successful and updates the 10101 meta data + /// accordingly in a single database transaction. + /// - Set dlc protocol to success + /// - Updates the `[PostionState::Proposed`] position state to `[PostionState::Open]` + /// - Creates and inserts the new trade + fn finish_open_trade_dlc_protocol( + &self, + conn: &mut PgConnection, + trade_params: TradeParams, + protocol_id: ProtocolId, + contract_id: &ContractId, + channel_id: &DlcChannelId, + ) -> QueryResult<()> { + db::dlc_protocols::set_dlc_protocol_state_to_success( + conn, + protocol_id, + contract_id, + channel_id, + )?; + + // TODO(holzeis): We are still updating the position based on the position state. This + // will change once we only have a single position per user and representing + // the position only as view on multiple trades. + let position = db::positions::Position::update_proposed_position( + conn, + trade_params.trader.to_string(), + PositionState::Open, + )?; + + let coordinator_margin = calculate_margin( + Decimal::try_from(trade_params.average_price).expect("to fit into decimal"), + trade_params.quantity, + crate::trade::coordinator_leverage_for_trade(&trade_params.trader) + .map_err(|_| RollbackTransaction)?, + ); + + // TODO(holzeis): Add optional pnl to trade. + // Instead of tracking pnl on the position we want to track pnl on the trade. e.g. Long + // -> Short or Short -> Long. + let new_trade = NewTrade { + position_id: position.id, + contract_symbol: position.contract_symbol, + trader_pubkey: trade_params.trader, + quantity: trade_params.quantity, + trader_leverage: trade_params.leverage, + coordinator_margin: coordinator_margin as i64, + trader_direction: trade_params.direction, + average_price: trade_params.average_price, + dlc_expiry_timestamp: None, + }; + + db::trades::insert(conn, new_trade)?; + + db::trade_params::delete(conn, protocol_id)?; Ok(()) } + + /// Completes the rollover dlc protocol as successful and updates the 10101 meta data + /// accordingly in a single database transaction. + fn finish_rollover_dlc_protocol( + &self, + conn: &mut PgConnection, + trader: &PublicKey, + protocol_id: ProtocolId, + contract_id: &ContractId, + channel_id: &DlcChannelId, + ) -> QueryResult<()> { + tracing::debug!(%trader, %protocol_id, "Finalizing rollover"); + db::dlc_protocols::set_dlc_protocol_state_to_success( + conn, + protocol_id, + contract_id, + channel_id, + )?; + + db::positions::Position::set_position_to_open(conn, trader.to_string(), *contract_id)?; + Ok(()) + } } #[cfg(test)] diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 6cd515247..0dcbdfb35 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -6,7 +6,6 @@ use crate::node::storage::NodeStorage; use crate::position::models::PositionState; use crate::storage::CoordinatorTenTenOneStorage; use crate::trade::TradeExecutor; -use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; @@ -270,22 +269,16 @@ impl Node { "DLC channel renew protocol was finalized" ); - if self.is_in_rollover(node_id)? { - self.finalize_rollover(channel_id)?; - } else { - let channel = self.inner.get_dlc_channel_by_id(channel_id)?; - let contract_id = - channel.get_contract_id().context("missing contract id")?; - - let contract_executor = - dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.finish_dlc_protocol( - protocol_id, - false, - contract_id, - channel.get_id(), - )?; - } + let channel = self.inner.get_dlc_channel_by_id(channel_id)?; + + let protocol_executor = + dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.finish_dlc_protocol( + protocol_id, + &node_id, + channel.get_contract_id(), + channel_id, + )?; } ChannelMessage::SettleFinalize(SettleFinalize { channel_id, @@ -316,40 +309,14 @@ impl Node { "DLC channel settle protocol was finalized" ); - let mut connection = self.pool.get()?; - let dlc_protocol = - db::dlc_protocols::get_dlc_protocol(&mut connection, protocol_id)?; - - let trader_id = dlc_protocol.trader.to_string(); - tracing::debug!(trader_id, ?protocol_id, "Finalize closing position",); - - let contract_id = dlc_protocol.contract_id; - - match self.inner.get_closed_contract(contract_id) { - Ok(Some(closed_contract)) => closed_contract, - Ok(None) => { - tracing::error!( - trader_id, - ?protocol_id, - "Can't close position as contract is not closed." - ); - bail!("Can't close position as contract is not closed."); - } - Err(e) => { - tracing::error!( - "Failed to get closed contract from DLC manager storage: {e:#}" - ); - bail!(e); - } - }; - - let contract_executor = + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.finish_dlc_protocol( + protocol_executor.finish_dlc_protocol( protocol_id, - true, - contract_id, - *channel_id, + &node_id, + // the settled signed channel does not have a contract + None, + channel_id, )?; } ChannelMessage::CollaborativeCloseOffer(close_offer) => { @@ -399,16 +366,14 @@ impl Node { ); let channel = self.inner.get_dlc_channel_by_id(&channel_id)?; - let contract_id = - channel.get_contract_id().context("missing contract id")?; - let contract_executor = + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.finish_dlc_protocol( + protocol_executor.finish_dlc_protocol( protocol_id, - false, - contract_id, - channel_id, + &node_id, + channel.get_contract_id(), + &channel_id, )?; } ChannelMessage::Reject(reject) => { diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index e3d50b99e..30cab2d3e 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,5 +1,7 @@ use crate::db; use crate::db::positions; +use crate::dlc_protocol; +use crate::dlc_protocol::DlcProtocolType; use crate::dlc_protocol::ProtocolId; use crate::message::NewUserMessage; use crate::message::OrderbookMessage; @@ -215,16 +217,34 @@ impl Node { ) -> Result<()> { let contract = self.inner.get_contract_by_dlc_channel_id(dlc_channel_id)?; let rollover = Rollover::new(contract, network)?; + let protocol_id = ProtocolId::new(); - tracing::debug!(node_id=%rollover.counterparty_pubkey, "Rollover dlc channel"); + tracing::debug!(node_id=%rollover.counterparty_pubkey, %protocol_id, "Rollover dlc channel"); let contract_input: ContractInput = rollover.clone().into(); - let protocol_id = ProtocolId::new(); - self.inner + let channel = self.inner.get_dlc_channel_by_id(dlc_channel_id)?; + let previous_id = match channel.get_reference_id() { + Some(reference_id) => Some(ProtocolId::try_from(reference_id)?), + None => None, + }; + + let contract_id = self + .inner .propose_dlc_channel_update(dlc_channel_id, contract_input, protocol_id.into()) .await?; + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.start_dlc_protocol( + protocol_id, + previous_id, + &contract_id, + dlc_channel_id, + DlcProtocolType::Rollover { + trader: rollover.counterparty_pubkey, + }, + )?; + // Sets the position state to rollover indicating that a rollover is in progress. let mut connection = self.pool.get()?; db::positions::Position::rollover_position( @@ -244,24 +264,6 @@ impl Node { Ok(position.is_some()) } - - /// Finalizes the rollover protocol with the app setting the position to open. - pub fn finalize_rollover(&self, dlc_channel_id: &DlcChannelId) -> Result<()> { - let contract = self.inner.get_contract_by_dlc_channel_id(dlc_channel_id)?; - let trader_id = contract.get_counter_party_id(); - tracing::debug!( - %trader_id, - dlc_channel_id = %hex::encode(dlc_channel_id), - "Finalizing rollover", - ); - - let mut connection = self.pool.get()?; - db::positions::Position::set_position_to_open( - &mut connection, - contract.get_counter_party_id().to_string(), - contract.get_temporary_id(), - ) - } } impl From for ContractInput { diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index b93246412..e5d15b3a5 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -52,6 +52,10 @@ pub mod sql_types { #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "Protocol_State_Type"))] pub struct ProtocolStateType; + + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "Protocol_Type_Type"))] + pub struct ProtocolTypeType; } diesel::table! { @@ -120,6 +124,7 @@ diesel::table! { diesel::table! { use diesel::sql_types::*; use super::sql_types::ProtocolStateType; + use super::sql_types::ProtocolTypeType; dlc_protocols (id) { id -> Int4, @@ -130,6 +135,7 @@ diesel::table! { protocol_state -> ProtocolStateType, trader_pubkey -> Text, timestamp -> Timestamptz, + protocol_type -> ProtocolTypeType, } } diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index c96986d4c..9c57a0a66 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -2,6 +2,7 @@ use crate::compute_relative_contracts; use crate::db; use crate::decimal_from_f32; use crate::dlc_protocol; +use crate::dlc_protocol::DlcProtocolType; use crate::dlc_protocol::ProtocolId; use crate::message::OrderbookMessage; use crate::node::storage::NodeStorage; @@ -341,13 +342,15 @@ impl TradeExecutor { .await .context("Could not propose DLC channel")?; - let contract_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.start_dlc_protocol( + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.start_dlc_protocol( protocol_id, None, - temporary_contract_id, - temporary_channel_id, - trade_params, + &temporary_contract_id, + &temporary_channel_id, + DlcProtocolType::Open { + trade_params: (protocol_id, trade_params).into(), + }, )?; // After the DLC channel has been proposed the position can be created. This fixes @@ -511,13 +514,15 @@ impl TradeExecutor { .await .context("Could not propose DLC channel update")?; - let contract_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.start_dlc_protocol( + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.start_dlc_protocol( protocol_id, previous_id, - temporary_contract_id, - channel.get_id(), - trade_params, + &temporary_contract_id, + &channel.get_id(), + DlcProtocolType::Renew { + trade_params: (protocol_id, trade_params).into(), + }, )?; // TODO(holzeis): The position should only get created after the dlc protocol has finished @@ -627,13 +632,15 @@ impl TradeExecutor { ) .await?; - let contract_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); - contract_executor.start_dlc_protocol( + let protocol_executor = dlc_protocol::DlcProtocolExecutor::new(self.pool.clone()); + protocol_executor.start_dlc_protocol( protocol_id, previous_id, - contract_id, - channel.get_id(), - trade_params, + &contract_id, + &channel.get_id(), + DlcProtocolType::Settle { + trade_params: (protocol_id, trade_params).into(), + }, )?; db::positions::Position::set_open_position_to_closing( diff --git a/crates/ln-dlc-node/src/node/dlc_channel.rs b/crates/ln-dlc-node/src/node/dlc_channel.rs index 2db019519..ea0401ae6 100644 --- a/crates/ln-dlc-node/src/node/dlc_channel.rs +++ b/crates/ln-dlc-node/src/node/dlc_channel.rs @@ -18,7 +18,6 @@ use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; use dlc_manager::channel::Channel; use dlc_manager::contract::contract_input::ContractInput; -use dlc_manager::contract::ClosedContract; use dlc_manager::contract::Contract; use dlc_manager::contract::ContractDescriptor; use dlc_manager::ContractId; @@ -274,7 +273,7 @@ impl Result<[u8; 32]> { + ) -> Result { tracing::info!(channel_id = %hex::encode(dlc_channel_id), "Proposing a DLC channel update"); spawn_blocking({ let dlc_manager = self.dlc_manager.clone(); @@ -308,7 +307,9 @@ impl Result> { - let contract = self - .dlc_manager - .get_store() - .get_contracts()? - .into_iter() - .find_map(|contract| match contract { - Contract::Closed(closed_contract) - if closed_contract.temporary_contract_id == temporary_contract_id => - { - Some(closed_contract) - } - _ => None, - }); - - Ok(contract) - } - // Rollback the channel to the last "stable" state. Note, this is potentially risky to do as the // counterparty may still old signed transactions, that would allow them to punish us if we were // to publish an outdated transaction. diff --git a/crates/tests-e2e/src/coordinator.rs b/crates/tests-e2e/src/coordinator.rs index 923df9eed..460369f7b 100644 --- a/crates/tests-e2e/src/coordinator.rs +++ b/crates/tests-e2e/src/coordinator.rs @@ -122,7 +122,7 @@ pub enum ChannelState { Cancelled, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq)] pub enum SignedChannelState { Established, SettledOffered, diff --git a/crates/tests-e2e/tests/e2e_close_position.rs b/crates/tests-e2e/tests/e2e_close_position.rs index 21cd2e42e..ab40a9bb2 100644 --- a/crates/tests-e2e/tests/e2e_close_position.rs +++ b/crates/tests-e2e/tests/e2e_close_position.rs @@ -6,6 +6,7 @@ use native::trade::order::api::NewOrder; use native::trade::order::api::OrderType; use native::trade::position::PositionState; use tests_e2e::app::submit_order; +use tests_e2e::coordinator::SignedChannelState; use tests_e2e::setup; use tests_e2e::setup::dummy_order; use tests_e2e::wait_until; @@ -89,6 +90,34 @@ async fn can_open_close_open_close_position() { .unwrap(); tracing::info!(%app_off_chain_balance, "Opened second position"); + // rolling over before closing the second position + tracing::info!("Rollover second position"); + let coordinator = test.coordinator; + let app_pubkey = api::get_node_id().0; + let dlc_channels = coordinator.get_dlc_channels().await.unwrap(); + let dlc_channel = dlc_channels + .into_iter() + .find(|chan| chan.counter_party == app_pubkey) + .unwrap(); + + coordinator + .rollover(&dlc_channel.dlc_channel_id.unwrap()) + .await + .unwrap(); + + wait_until!(test + .app + .rx + .position() + .map(|p| PositionState::Rollover == p.position_state) + .unwrap_or(false)); + wait_until!(test + .app + .rx + .position() + .map(|p| PositionState::Open == p.position_state) + .unwrap_or(false)); + tracing::info!("Closing second position"); let closing_order = NewOrder { @@ -100,6 +129,16 @@ async fn can_open_close_open_close_position() { wait_until!(test.app.rx.position_close().is_some()); + wait_until!({ + let dlc_channels = coordinator.get_dlc_channels().await.unwrap(); + let dlc_channel = dlc_channels + .into_iter() + .find(|chan| chan.counter_party == app_pubkey) + .unwrap(); + + Some(SignedChannelState::Settled) == dlc_channel.signed_channel_state + }); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; // - App off-chain balance is 1_235_000 sats (reserve + margin - 3_750 fee).