diff --git a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json b/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json deleted file mode 100644 index b799a84b6..000000000 --- a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE remaining_size - $1\n WHEN 0 THEN CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE 'closed'\n END\n ELSE CASE order_type\n WHEN 'swap' THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Numeric", - "Numeric", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e" -} diff --git a/src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json b/src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json new file mode 100644 index 000000000..010e2db41 --- /dev/null +++ b/src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE order_type\n WHEN 'limit' THEN CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END\n ELSE 'closed'\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Numeric", + "Numeric", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2" +} diff --git a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json b/src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json similarity index 55% rename from src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json rename to src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json index 1465a6f47..c949401f5 100644 --- a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json +++ b/src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET remaining_size = $1, last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3;\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n last_updated_at = $4,\n remaining_size = $1\n WHERE order_id = $2 AND market_id = $3;\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4" + "hash": "cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3" } diff --git a/src/rust/aggregator/src/data/user_history.rs b/src/rust/aggregator/src/data/user_history.rs index a63d154a2..9eee6118e 100644 --- a/src/rust/aggregator/src/data/user_history.rs +++ b/src/rust/aggregator/src/data/user_history.rs @@ -53,6 +53,9 @@ impl Data for UserHistory { Some(std::time::Duration::from_secs(5)) } + /// All database interactions are handled in a single atomic transaction. Processor insertions + /// are also handled in a single atomic transaction for each batch of transactions, such that + /// user history aggregation logic is effectively serialized across historical chain state. async fn process_and_save_internal(&mut self) -> DataAggregationResult { let mut transaction = self .pool @@ -283,54 +286,35 @@ impl Data for UserHistory { .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?; } + // Step through fill and change events in total order. let mut fill_index = 0; let mut change_index = 0; for _ in 0..(fill_events.len() + change_events.len()) { - match (fill_events.get(fill_index), change_events.get(change_index)) { - (Some(fill), Some(change)) => { - if fill.txn_version < change.txn_version - || (fill.txn_version == change.txn_version - && fill.event_idx < change.event_idx) - { - if fill.maker_address == fill.emit_address { - aggregate_fill( - &mut transaction, - &fill.size, - &fill.maker_order_id, - &fill.market_id, - &fill.time, - ) - .await?; + let (fill_event_to_aggregate, change_event_to_aggregate) = + match (fill_events.get(fill_index), change_events.get(change_index)) { + (Some(fill), Some(change)) => { + if fill.txn_version < change.txn_version + || (fill.txn_version == change.txn_version + && fill.event_idx < change.event_idx) + { + (Some(fill), None) + } else { + (None, Some(change)) } - mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) - .await?; - fill_index = fill_index + 1; - } else { - aggregate_change( - &mut transaction, - &change.new_size, - &change.order_id, - &change.market_id, - &change.time, - &fill.txn_version, - &fill.event_idx, - ) - .await?; - mark_as_aggregated( - &mut transaction, - &change.txn_version, - &change.event_idx, - ) - .await?; - change_index = change_index + 1; } - } + (Some(fill), None) => (Some(fill), None), + (None, Some(change)) => (None, Some(change)), + (None, None) => unreachable!(), + }; + match (fill_event_to_aggregate, change_event_to_aggregate) { (Some(fill), None) => { + // Dedupe if needed by only aggregating events emitted to maker handle. if fill.maker_address == fill.emit_address { - aggregate_fill( + aggregate_fill_for_maker_and_taker( &mut transaction, &fill.size, &fill.maker_order_id, + &fill.taker_order_id, &fill.market_id, &fill.time, ) @@ -338,7 +322,7 @@ impl Data for UserHistory { } mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) .await?; - fill_index = fill_index + 1; + fill_index += 1; } (None, Some(change)) => { aggregate_change( @@ -353,9 +337,9 @@ impl Data for UserHistory { .await?; mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx) .await?; - change_index = change_index + 1; + change_index += 1; } - (None, None) => unreachable!(), + _ => unreachable!(), }; } for x in &cancel_events { @@ -382,36 +366,50 @@ impl Data for UserHistory { } } -async fn aggregate_fill<'a>( +async fn aggregate_fill_for_maker_and_taker<'a>( tx: &mut Transaction<'a, Postgres>, size: &BigDecimal, maker_order_id: &BigDecimal, + taker_order_id: &BigDecimal, market_id: &BigDecimal, time: &DateTime, ) -> DataAggregationResult { + aggregate_fill(tx, size, maker_order_id, market_id, time).await?; + aggregate_fill(tx, size, taker_order_id, market_id, time).await?; + Ok(()) +} + +async fn aggregate_fill<'a>( + tx: &mut Transaction<'a, Postgres>, + size: &BigDecimal, + order_id: &BigDecimal, + market_id: &BigDecimal, + time: &DateTime, +) -> DataAggregationResult { + // Only limit orders can remain open after a transaction during which they are filled against, + // so flag market orders and swaps as closed by default: if they end up being cancelled instead + // of closed, the cancel event emitted during the same transaction (aggregated after fills) will + // clean up the order status to cancelled. sqlx::query!( r#" - UPDATE aggregator.user_history - SET - remaining_size = remaining_size - $1, - total_filled = total_filled + $1, - order_status = CASE remaining_size - $1 - WHEN 0 THEN CASE order_status - WHEN 'cancelled' THEN order_status - ELSE 'closed' - END - ELSE CASE order_type - WHEN 'swap' THEN 'closed' - ELSE order_status - END - END, - last_updated_at = $4 - WHERE order_id = $2 AND market_id = $3 + UPDATE aggregator.user_history + SET + remaining_size = remaining_size - $1, + total_filled = total_filled + $1, + order_status = CASE order_type + WHEN 'limit' THEN CASE remaining_size - $1 + WHEN 0 THEN 'closed' + ELSE order_status + END + ELSE 'closed' + END, + last_updated_at = $4 + WHERE order_id = $2 AND market_id = $3 "#, size, - maker_order_id, + order_id, market_id, - time, + time ) .execute(tx as &mut PgConnection) .await @@ -444,7 +442,7 @@ async fn aggregate_change<'a>( .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; let (order_type, original_size): (OrderType, BigDecimal) = (record.order_type, record.remaining_size); - // If its a limit order and needs reordering + // If it's a limit order and needs reordering if matches!(order_type, OrderType::Limit) && &original_size < new_size { let txn = txn_version .to_bigint() @@ -476,7 +474,9 @@ async fn aggregate_change<'a>( sqlx::query!( r#" UPDATE aggregator.user_history - SET remaining_size = $1, last_updated_at = $4 + SET + last_updated_at = $4, + remaining_size = $1 WHERE order_id = $2 AND market_id = $3; "#, new_size, diff --git a/src/rust/dependencies/aptos-indexer-processors b/src/rust/dependencies/aptos-indexer-processors index 1a68026c4..c9dd14def 160000 --- a/src/rust/dependencies/aptos-indexer-processors +++ b/src/rust/dependencies/aptos-indexer-processors @@ -1 +1 @@ -Subproject commit 1a68026c49c7b6f9965132850ce4ea289e5f0794 +Subproject commit c9dd14def07cc26ae349028188bfea7131c55d50