Skip to content

Commit

Permalink
update generate_outpu_file function to correctly strcuture output fol…
Browse files Browse the repository at this point in the history
…der (#75)

* update generate_outpu_file function to correctly strcuture output folder

* update endoing_version in stream config

* stub in ver 1 txn for chain id check
  • Loading branch information
yuunlimm authored Oct 22, 2024
1 parent bd7b619 commit 979d410
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 42 deletions.
37 changes: 14 additions & 23 deletions aptos-indexer-processors-sdk/testing-framework/src/mock_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,30 @@ impl RawData for MockGrpcServer {
req: Request<GetTransactionsRequest>,
) -> Result<Response<Self::GetTransactionsStream>, Status> {
let request = req.into_inner();
let starting_version = request.starting_version.unwrap();
let starting_version = request.starting_version.unwrap_or(0); // Default to 0 if starting_version is not provided
let transactions_count = request.transactions_count.unwrap_or(1); // Default to 1 if transactions_count is not provided

// Collect transactions starting from `starting_version`, without any gaps, up to `transactions_count`.
let mut collected_transactions = Vec::new();
let mut current_version = starting_version;

// Step 1: Build a map of transactions keyed by version for quick access
let mut transaction_map = HashMap::new();
for transaction_response in &self.transactions_response {
for tx in &transaction_response.transactions {
transaction_map.insert(tx.version, tx);
transaction_map.insert(tx.version, tx.clone());
}
}

// Step 2: Collect transactions in a consecutive sequence starting from `starting_version`
while collected_transactions.len() < transactions_count as usize {
if let Some(tx) = transaction_map.get(&current_version) {
let mut cloned_tx = (*tx).clone();

// Ensure that the version is consecutive
cloned_tx.version = collected_transactions.len() as u64 + starting_version;

collected_transactions.push(cloned_tx); // Collect the transaction with the adjusted version
current_version += 1; // Move to the next expected version
} else {
// If no transaction is found for the current version, stop looking for more
break;
}
}
let mut sorted_transactions: Vec<_> = transaction_map
.iter()
.filter(|(&version, _)| version >= starting_version)
.map(|(_, tx)| tx.clone())
.collect();
sorted_transactions.sort_by_key(|tx| tx.version);

collected_transactions.extend(
sorted_transactions
.into_iter()
.take(transactions_count as usize),
);

// Step 3: Build the response with the collected transactions (without gaps)
let result = if !collected_transactions.is_empty() {
TransactionsResponse {
transactions: collected_transactions,
Expand All @@ -72,7 +64,6 @@ impl RawData for MockGrpcServer {
default_transaction_response
};

// Step 4: Create a stream and return the response
let stream = futures::stream::iter(vec![Ok(result)]);
Ok(Response::new(Box::pin(stream)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,33 @@ const SLEEP_DURATION: Duration = Duration::from_millis(250);

impl SdkTestContext {
pub async fn new(txn_bytes: &[&[u8]]) -> anyhow::Result<Self> {
let transaction_batches = txn_bytes
let mut transaction_batches = txn_bytes
.iter()
.enumerate()
.map(|(idx, txn)| {
// Deserialize the transaction
let mut transaction =
serde_json::from_slice::<Transaction>(txn).map_err(|err| {
anyhow::anyhow!(
"Failed to parse transaction at index {}: {}",
idx,
format_serde_error(err)
)
})?;

// Update the transaction version to enforce ordering (txn1, txn2, txn3, ...)
// This ensures that the mock gRPC returns consecutive transaction versions.
transaction.version = idx as u64 + 1;
let transaction = serde_json::from_slice::<Transaction>(txn).map_err(|err| {
anyhow::anyhow!(
"Failed to parse transaction at index {}: {}",
idx,
format_serde_error(err)
)
})?;

Ok::<Transaction, anyhow::Error>(transaction) // Explicit type annotation
})
.collect::<Result<Vec<Transaction>, _>>()?;

let version_1_exists = transaction_batches.iter().any(|tx| tx.version == 1);

// Append the dummy transaction with version 1 if it doesn't exist to pass chain_id_check
if !version_1_exists {
let dummy_transaction = Transaction {
version: 1,
..Transaction::default()
};
transaction_batches.push(dummy_transaction);
}

let mut context = SdkTestContext {
transaction_batches,
port: None,
Expand Down Expand Up @@ -162,6 +167,7 @@ impl SdkTestContext {

pub fn create_transaction_stream_config(
&self,
starting_version: u64,
txn_count: u64,
) -> TransactionStreamConfig {
let data_service_address = format!(
Expand All @@ -171,8 +177,8 @@ impl SdkTestContext {
TransactionStreamConfig {
indexer_grpc_data_service_address: Url::parse(&data_service_address)
.expect("Could not parse database url"),
starting_version: Some(1),
request_ending_version: Some(txn_count),
starting_version: Some(starting_version),
request_ending_version: Some(starting_version + txn_count - 1),
auth_token: "".to_string(),
request_name_header: "sdk-testing".to_string(),
indexer_grpc_http2_ping_interval_secs: 30,
Expand Down Expand Up @@ -226,9 +232,10 @@ pub fn generate_output_file(
let file_path = match custom_file_name {
Some(custom_name) => {
// If custom_file_name is present, build the file path using it
PathBuf::from(&output_dir).join(processor_name).join(
format!("{}.json", custom_name), // Including table_name in the format
)
PathBuf::from(&output_dir)
.join(processor_name)
.join(custom_name)
.join(format!("{}.json", table_name))
},
None => {
// Default case: use table_name and txn_version to construct file name
Expand Down

0 comments on commit 979d410

Please sign in to comment.