From 979d410e76b8ba51e8800a5d52023c21d709ebc5 Mon Sep 17 00:00:00 2001 From: Yuun Lim <38443641+yuunlimm@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:52:15 -0700 Subject: [PATCH] update generate_outpu_file function to correctly strcuture output folder (#75) * update generate_outpu_file function to correctly strcuture output folder * update endoing_version in stream config * stub in ver 1 txn for chain id check --- .../testing-framework/src/mock_grpc.rs | 37 ++++++--------- .../testing-framework/src/sdk_test_context.rs | 45 +++++++++++-------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/aptos-indexer-processors-sdk/testing-framework/src/mock_grpc.rs b/aptos-indexer-processors-sdk/testing-framework/src/mock_grpc.rs index a609a6b..4d03026 100644 --- a/aptos-indexer-processors-sdk/testing-framework/src/mock_grpc.rs +++ b/aptos-indexer-processors-sdk/testing-framework/src/mock_grpc.rs @@ -28,38 +28,30 @@ impl RawData for MockGrpcServer { req: Request, ) -> Result, Status> { let request = req.into_inner(); - let starting_version = request.starting_version.unwrap(); + let starting_version = request.starting_version.unwrap_or(0); // Default to 0 if starting_version is not provided let transactions_count = request.transactions_count.unwrap_or(1); // Default to 1 if transactions_count is not provided - - // Collect transactions starting from `starting_version`, without any gaps, up to `transactions_count`. let mut collected_transactions = Vec::new(); - let mut current_version = starting_version; - // Step 1: Build a map of transactions keyed by version for quick access let mut transaction_map = HashMap::new(); for transaction_response in &self.transactions_response { for tx in &transaction_response.transactions { - transaction_map.insert(tx.version, tx); + transaction_map.insert(tx.version, tx.clone()); } } - // Step 2: Collect transactions in a consecutive sequence starting from `starting_version` - while collected_transactions.len() < transactions_count as usize { - if let Some(tx) = transaction_map.get(¤t_version) { - let mut cloned_tx = (*tx).clone(); - - // Ensure that the version is consecutive - cloned_tx.version = collected_transactions.len() as u64 + starting_version; - - collected_transactions.push(cloned_tx); // Collect the transaction with the adjusted version - current_version += 1; // Move to the next expected version - } else { - // If no transaction is found for the current version, stop looking for more - break; - } - } + let mut sorted_transactions: Vec<_> = transaction_map + .iter() + .filter(|(&version, _)| version >= starting_version) + .map(|(_, tx)| tx.clone()) + .collect(); + sorted_transactions.sort_by_key(|tx| tx.version); + + collected_transactions.extend( + sorted_transactions + .into_iter() + .take(transactions_count as usize), + ); - // Step 3: Build the response with the collected transactions (without gaps) let result = if !collected_transactions.is_empty() { TransactionsResponse { transactions: collected_transactions, @@ -72,7 +64,6 @@ impl RawData for MockGrpcServer { default_transaction_response }; - // Step 4: Create a stream and return the response let stream = futures::stream::iter(vec![Ok(result)]); Ok(Response::new(Box::pin(stream))) } diff --git a/aptos-indexer-processors-sdk/testing-framework/src/sdk_test_context.rs b/aptos-indexer-processors-sdk/testing-framework/src/sdk_test_context.rs index f494bb4..1739f7a 100644 --- a/aptos-indexer-processors-sdk/testing-framework/src/sdk_test_context.rs +++ b/aptos-indexer-processors-sdk/testing-framework/src/sdk_test_context.rs @@ -28,28 +28,33 @@ const SLEEP_DURATION: Duration = Duration::from_millis(250); impl SdkTestContext { pub async fn new(txn_bytes: &[&[u8]]) -> anyhow::Result { - let transaction_batches = txn_bytes + let mut transaction_batches = txn_bytes .iter() .enumerate() .map(|(idx, txn)| { - // Deserialize the transaction - let mut transaction = - serde_json::from_slice::(txn).map_err(|err| { - anyhow::anyhow!( - "Failed to parse transaction at index {}: {}", - idx, - format_serde_error(err) - ) - })?; - - // Update the transaction version to enforce ordering (txn1, txn2, txn3, ...) - // This ensures that the mock gRPC returns consecutive transaction versions. - transaction.version = idx as u64 + 1; + let transaction = serde_json::from_slice::(txn).map_err(|err| { + anyhow::anyhow!( + "Failed to parse transaction at index {}: {}", + idx, + format_serde_error(err) + ) + })?; Ok::(transaction) // Explicit type annotation }) .collect::, _>>()?; + let version_1_exists = transaction_batches.iter().any(|tx| tx.version == 1); + + // Append the dummy transaction with version 1 if it doesn't exist to pass chain_id_check + if !version_1_exists { + let dummy_transaction = Transaction { + version: 1, + ..Transaction::default() + }; + transaction_batches.push(dummy_transaction); + } + let mut context = SdkTestContext { transaction_batches, port: None, @@ -162,6 +167,7 @@ impl SdkTestContext { pub fn create_transaction_stream_config( &self, + starting_version: u64, txn_count: u64, ) -> TransactionStreamConfig { let data_service_address = format!( @@ -171,8 +177,8 @@ impl SdkTestContext { TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse(&data_service_address) .expect("Could not parse database url"), - starting_version: Some(1), - request_ending_version: Some(txn_count), + starting_version: Some(starting_version), + request_ending_version: Some(starting_version + txn_count - 1), auth_token: "".to_string(), request_name_header: "sdk-testing".to_string(), indexer_grpc_http2_ping_interval_secs: 30, @@ -226,9 +232,10 @@ pub fn generate_output_file( let file_path = match custom_file_name { Some(custom_name) => { // If custom_file_name is present, build the file path using it - PathBuf::from(&output_dir).join(processor_name).join( - format!("{}.json", custom_name), // Including table_name in the format - ) + PathBuf::from(&output_dir) + .join(processor_name) + .join(custom_name) + .join(format!("{}.json", table_name)) }, None => { // Default case: use table_name and txn_version to construct file name