From e1423de110c31f711e1657285b7fd658b01ec5af Mon Sep 17 00:00:00 2001 From: alnoki <43892045+alnoki@users.noreply.github.com> Date: Sat, 7 Oct 2023 12:28:18 -0700 Subject: [PATCH 1/3] [ECO-624] Fix async dedupe aggregator logic --- ...a19aba42e1e7812129ce7b86faeb1c528ab3.json} | 4 +- ...07cfe1eb4b38d21ee243ec75bb1521c7e244e.json | 17 --- ...b14a33b1c8f435666f5f5f86d97acb311db42.json | 17 +++ src/rust/aggregator/src/data/user_history.rs | 125 +++++++++--------- 4 files changed, 82 insertions(+), 81 deletions(-) rename src/rust/.sqlx/{query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json => query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json} (51%) delete mode 100644 src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json create mode 100644 src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json diff --git a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json b/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json similarity index 51% rename from src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json rename to src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json index 1465a6f47..d0479c837 100644 --- a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json +++ b/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET remaining_size = $1, last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3;\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n last_updated_at = $4,\n order_status = 'open',\n remaining_size = $1\n WHERE order_id = $2 AND market_id = $3;\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4" + "hash": "078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3" } diff --git a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json b/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json deleted file mode 100644 index b799a84b6..000000000 --- a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE remaining_size - $1\n WHEN 0 THEN CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE 'closed'\n END\n ELSE CASE order_type\n WHEN 'swap' THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Numeric", - "Numeric", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e" -} diff --git a/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json b/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json new file mode 100644 index 000000000..efe61400b --- /dev/null +++ b/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Numeric", + "Numeric", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42" +} diff --git a/src/rust/aggregator/src/data/user_history.rs b/src/rust/aggregator/src/data/user_history.rs index b3bef33b3..1fe22989f 100644 --- a/src/rust/aggregator/src/data/user_history.rs +++ b/src/rust/aggregator/src/data/user_history.rs @@ -280,54 +280,35 @@ impl Data for UserHistory { .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?; } + // Step through fill and change events in total order. let mut fill_index = 0; let mut change_index = 0; for _ in 0..(fill_events.len() + change_events.len()) { - match (fill_events.get(fill_index), change_events.get(change_index)) { - (Some(fill), Some(change)) => { - if fill.txn_version < change.txn_version - || (fill.txn_version == change.txn_version - && fill.event_idx < change.event_idx) - { - if fill.maker_address == fill.emit_address { - aggregate_fill( - &mut transaction, - &fill.size, - &fill.maker_order_id, - &fill.market_id, - &fill.time, - ) - .await?; + let (fill_event_to_aggregate, change_event_to_aggregate) = + match (fill_events.get(fill_index), change_events.get(change_index)) { + (Some(fill), Some(change)) => { + if fill.txn_version < change.txn_version + || (fill.txn_version == change.txn_version + && fill.event_idx < change.event_idx) + { + (Some(fill), None) + } else { + (None, Some(change)) } - mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) - .await?; - fill_index = fill_index + 1; - } else { - aggregate_change( - &mut transaction, - &change.new_size, - &change.order_id, - &change.market_id, - &change.time, - &fill.txn_version, - &fill.event_idx, - ) - .await?; - mark_as_aggregated( - &mut transaction, - &change.txn_version, - &change.event_idx, - ) - .await?; - change_index = change_index + 1; } - } + (Some(fill), None) => (Some(fill), None), + (None, Some(change)) => (None, Some(change)), + (None, None) => unreachable!(), + }; + match (fill_event_to_aggregate, change_event_to_aggregate) { (Some(fill), None) => { + // Dedupe if needed by only aggregating events emitted to maker handle. if fill.maker_address == fill.emit_address { - aggregate_fill( + aggregate_fill_for_maker_and_taker( &mut transaction, &fill.size, &fill.maker_order_id, + &fill.taker_order_id, &fill.market_id, &fill.time, ) @@ -335,7 +316,7 @@ impl Data for UserHistory { } mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) .await?; - fill_index = fill_index + 1; + fill_index += 1; } (None, Some(change)) => { aggregate_change( @@ -350,9 +331,9 @@ impl Data for UserHistory { .await?; mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx) .await?; - change_index = change_index + 1; + change_index += 1; } - (None, None) => unreachable!(), + _ => unreachable!(), }; } for x in &cancel_events { @@ -379,36 +360,52 @@ impl Data for UserHistory { } } -async fn aggregate_fill<'a>( +async fn aggregate_fill_for_maker_and_taker<'a>( tx: &mut Transaction<'a, Postgres>, size: &BigDecimal, maker_order_id: &BigDecimal, + taker_order_id: &BigDecimal, market_id: &BigDecimal, time: &DateTime, ) -> DataAggregationResult { + aggregate_fill(tx, size, maker_order_id, market_id, time).await?; + aggregate_fill(tx, size, taker_order_id, market_id, time).await?; + Ok(()) +} + +async fn aggregate_fill<'a>( + tx: &mut Transaction<'a, Postgres>, + size: &BigDecimal, + order_id: &BigDecimal, + market_id: &BigDecimal, + time: &DateTime, +) -> DataAggregationResult { + // To protect against unexpected asynchronous behavior, only update order status to closed upon + // remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and + // change event aggregators should enforce that orders are respectively marked cancelled and + // open whenever they are called, such that events can be aggregated out of order. This logic + // applies for limit orders that post only, take only, and take then post, as well as market + // orders and swaps. sqlx::query!( r#" - UPDATE aggregator.user_history - SET - remaining_size = remaining_size - $1, - total_filled = total_filled + $1, - order_status = CASE remaining_size - $1 - WHEN 0 THEN CASE order_status - WHEN 'cancelled' THEN order_status - ELSE 'closed' - END - ELSE CASE order_type - WHEN 'swap' THEN 'closed' - ELSE order_status - END - END, - last_updated_at = $4 - WHERE order_id = $2 AND market_id = $3 + UPDATE aggregator.user_history + SET + remaining_size = remaining_size - $1, + total_filled = total_filled + $1, + order_status = CASE order_status + WHEN 'cancelled' THEN order_status + ELSE CASE remaining_size - $1 + WHEN 0 THEN 'closed' + ELSE order_status + END + END, + last_updated_at = $4 + WHERE order_id = $2 AND market_id = $3 "#, size, - maker_order_id, + order_id, market_id, - time, + time ) .execute(tx as &mut PgConnection) .await @@ -441,7 +438,7 @@ async fn aggregate_change<'a>( .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; let (order_type, original_size): (OrderType, BigDecimal) = (record.order_type, record.remaining_size); - // If its a limit order and needs reordering + // If it's a limit order and needs reordering if matches!(order_type, OrderType::Limit) && &original_size < new_size { let txn = txn_version .to_bigint() @@ -470,10 +467,14 @@ async fn aggregate_change<'a>( .await .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; } + // Enforce that order status is set to open after a size change to guard against async issues sqlx::query!( r#" UPDATE aggregator.user_history - SET remaining_size = $1, last_updated_at = $4 + SET + last_updated_at = $4, + order_status = 'open', + remaining_size = $1 WHERE order_id = $2 AND market_id = $3; "#, new_size, From 0eb778a087432b6207cb39272d63561178c87acd Mon Sep 17 00:00:00 2001 From: alnoki <43892045+alnoki@users.noreply.github.com> Date: Sun, 8 Oct 2023 09:24:54 -0700 Subject: [PATCH 2/3] Update guard conditions for effectively serial --- ...57c3959f401fb7884a37ebbaf76cab817328.json} | 4 ++-- ...ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json} | 4 ++-- src/rust/aggregator/src/data/user_history.rs | 20 ++++++------------- 3 files changed, 10 insertions(+), 18 deletions(-) rename src/rust/.sqlx/{query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json => query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json} (50%) rename src/rust/.sqlx/{query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json => query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json} (63%) diff --git a/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json b/src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json similarity index 50% rename from src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json rename to src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json index efe61400b..2e4784545 100644 --- a/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json +++ b/src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42" + "hash": "a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328" } diff --git a/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json b/src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json similarity index 63% rename from src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json rename to src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json index d0479c837..c949401f5 100644 --- a/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json +++ b/src/rust/.sqlx/query-cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n last_updated_at = $4,\n order_status = 'open',\n remaining_size = $1\n WHERE order_id = $2 AND market_id = $3;\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n last_updated_at = $4,\n remaining_size = $1\n WHERE order_id = $2 AND market_id = $3;\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3" + "hash": "cc5b2550b2cc5845a564e916ee99ce23aa68f7a6b421ca0e4e7d2d84db7a6fc3" } diff --git a/src/rust/aggregator/src/data/user_history.rs b/src/rust/aggregator/src/data/user_history.rs index 1fe22989f..6c2207fc5 100644 --- a/src/rust/aggregator/src/data/user_history.rs +++ b/src/rust/aggregator/src/data/user_history.rs @@ -53,6 +53,9 @@ impl Data for UserHistory { Some(std::time::Duration::from_secs(5)) } + /// All database interactions are handled in a single atomic transaction. Processor insertions + /// are also handled in a single atomic transaction for each batch of transactions, such that + /// user history aggregation logic is effectively serialized across historical chain state. async fn process_and_save_internal(&mut self) -> DataAggregationResult { let mut transaction = self .pool @@ -380,24 +383,15 @@ async fn aggregate_fill<'a>( market_id: &BigDecimal, time: &DateTime, ) -> DataAggregationResult { - // To protect against unexpected asynchronous behavior, only update order status to closed upon - // remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and - // change event aggregators should enforce that orders are respectively marked cancelled and - // open whenever they are called, such that events can be aggregated out of order. This logic - // applies for limit orders that post only, take only, and take then post, as well as market - // orders and swaps. sqlx::query!( r#" UPDATE aggregator.user_history SET remaining_size = remaining_size - $1, total_filled = total_filled + $1, - order_status = CASE order_status - WHEN 'cancelled' THEN order_status - ELSE CASE remaining_size - $1 - WHEN 0 THEN 'closed' - ELSE order_status - END + order_status = CASE remaining_size - $1 + WHEN 0 THEN 'closed' + ELSE order_status END, last_updated_at = $4 WHERE order_id = $2 AND market_id = $3 @@ -467,13 +461,11 @@ async fn aggregate_change<'a>( .await .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; } - // Enforce that order status is set to open after a size change to guard against async issues sqlx::query!( r#" UPDATE aggregator.user_history SET last_updated_at = $4, - order_status = 'open', remaining_size = $1 WHERE order_id = $2 AND market_id = $3; "#, From 54ad20fce39a0f6dac064f472c8ccf9ed55a4382 Mon Sep 17 00:00:00 2001 From: alnoki <43892045+alnoki@users.noreply.github.com> Date: Sun, 8 Oct 2023 09:39:04 -0700 Subject: [PATCH 3/3] Add market order/swap closed logic --- ...b9c508e039285194b97b6d96e961dd7edc2557fbd2.json} | 4 ++-- src/rust/aggregator/src/data/user_history.rs | 13 ++++++++++--- src/rust/dependencies/aptos-indexer-processors | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) rename src/rust/.sqlx/{query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json => query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json} (50%) diff --git a/src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json b/src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json similarity index 50% rename from src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json rename to src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json index 2e4784545..010e2db41 100644 --- a/src/rust/.sqlx/query-a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328.json +++ b/src/rust/.sqlx/query-b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE order_type\n WHEN 'limit' THEN CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END\n ELSE 'closed'\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "a19af3d32ebfdfd481d8a334981d57c3959f401fb7884a37ebbaf76cab817328" + "hash": "b17a5be1a3797d2bc5bfedb9c508e039285194b97b6d96e961dd7edc2557fbd2" } diff --git a/src/rust/aggregator/src/data/user_history.rs b/src/rust/aggregator/src/data/user_history.rs index 6c2207fc5..7f515b346 100644 --- a/src/rust/aggregator/src/data/user_history.rs +++ b/src/rust/aggregator/src/data/user_history.rs @@ -383,15 +383,22 @@ async fn aggregate_fill<'a>( market_id: &BigDecimal, time: &DateTime, ) -> DataAggregationResult { + // Only limit orders can remain open after a transaction during which they are filled against, + // so flag market orders and swaps as closed by default: if they end up being cancelled instead + // of closed, the cancel event emitted during the same transaction (aggregated after fills) will + // clean up the order status to cancelled. sqlx::query!( r#" UPDATE aggregator.user_history SET remaining_size = remaining_size - $1, total_filled = total_filled + $1, - order_status = CASE remaining_size - $1 - WHEN 0 THEN 'closed' - ELSE order_status + order_status = CASE order_type + WHEN 'limit' THEN CASE remaining_size - $1 + WHEN 0 THEN 'closed' + ELSE order_status + END + ELSE 'closed' END, last_updated_at = $4 WHERE order_id = $2 AND market_id = $3 diff --git a/src/rust/dependencies/aptos-indexer-processors b/src/rust/dependencies/aptos-indexer-processors index 1a68026c4..c9dd14def 160000 --- a/src/rust/dependencies/aptos-indexer-processors +++ b/src/rust/dependencies/aptos-indexer-processors @@ -1 +1 @@ -Subproject commit 1a68026c49c7b6f9965132850ce4ea289e5f0794 +Subproject commit c9dd14def07cc26ae349028188bfea7131c55d50