From 957b496891f6f1faba609077fc3003a8fa687d56 Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Tue, 16 May 2023 18:57:34 -0700 Subject: [PATCH] add a timeout send for data service. --- Cargo.lock | 2 +- crates/aptos-protos/Cargo.toml | 2 +- .../proto/aptos/indexer/v1/raw_data.proto | 4 + .../aptos-protos/src/pb/aptos.indexer.v1.rs | 180 ++++++++++-------- .../src/pb/aptos.indexer.v1.serde.rs | 20 ++ .../indexer-grpc-data-service/src/service.rs | 66 ++++--- 6 files changed, 161 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef19554a4d27a..7f31272185be2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2418,7 +2418,7 @@ dependencies = [ [[package]] name = "aptos-protos" -version = "0.2.0" +version = "0.2.1" dependencies = [ "pbjson", "prost", diff --git a/crates/aptos-protos/Cargo.toml b/crates/aptos-protos/Cargo.toml index 5969585a531a1..2ca5fcb77d6d2 100644 --- a/crates/aptos-protos/Cargo.toml +++ b/crates/aptos-protos/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aptos-protos" -version = "0.2.0" +version = "0.2.1" authors = ["Aptos Labs "] description = "Aptos Protobuf Definitions" repository = "https://github.com/aptos-labs/aptos-core" diff --git a/crates/aptos-protos/proto/aptos/indexer/v1/raw_data.proto b/crates/aptos-protos/proto/aptos/indexer/v1/raw_data.proto index 527e7002cbbed..2303531db1d7f 100644 --- a/crates/aptos-protos/proto/aptos/indexer/v1/raw_data.proto +++ b/crates/aptos-protos/proto/aptos/indexer/v1/raw_data.proto @@ -14,6 +14,10 @@ message GetTransactionsRequest { // Optional; number of transactions to return in current stream. // If not present, return an infinite stream of transactions. optional uint64 transactions_count = 2 [jstype = JS_STRING]; + + // Optional; number of transactions in each `TransactionsResponse` for current stream. + // If not present, default to 1000. If larger than 1000, request will be rejected. + optional uint64 batch_size = 3; } // TransactionsResponse is a batch of transactions. diff --git a/crates/aptos-protos/src/pb/aptos.indexer.v1.rs b/crates/aptos-protos/src/pb/aptos.indexer.v1.rs index 3e47a7028f635..d9754b71fb78e 100644 --- a/crates/aptos-protos/src/pb/aptos.indexer.v1.rs +++ b/crates/aptos-protos/src/pb/aptos.indexer.v1.rs @@ -10,6 +10,10 @@ pub struct GetTransactionsRequest { /// If not present, return an infinite stream of transactions. #[prost(uint64, optional, tag="2")] pub transactions_count: ::core::option::Option, + /// Optional; batch size of transactions to return in current stream. + /// If not present, return transactions with size up to 1000. + #[prost(uint64, optional, tag="3")] + pub batch_size: ::core::option::Option, } /// TransactionsResponse is a batch of transactions. #[derive(Clone, PartialEq, ::prost::Message)] @@ -23,13 +27,13 @@ pub struct TransactionsResponse { } /// Encoded file descriptor set for the `aptos.indexer.v1` package pub const FILE_DESCRIPTOR_SET: &[u8] = &[ - 0x0a, 0xb6, 0x0b, 0x0a, 0x1f, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x69, 0x6e, 0x64, 0x65, 0x78, + 0x0a, 0xaf, 0x0d, 0x0a, 0x1f, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x61, 0x77, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x1a, 0x2f, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, 0x31, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa8, 0x01, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x54, + 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdb, 0x01, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x10, 0x73, 0x74, 0x61, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x0f, @@ -37,84 +41,100 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = &[ 0x01, 0x01, 0x12, 0x32, 0x0a, 0x12, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x48, 0x01, 0x52, 0x11, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x43, 0x6f, - 0x75, 0x6e, 0x74, 0x88, 0x01, 0x01, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x15, 0x0a, 0x13, 0x5f, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x63, 0x6f, 0x75, - 0x6e, 0x74, 0x22, 0x93, 0x01, 0x0a, 0x14, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, 0x0c, 0x74, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x2a, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, 0x31, 0x2e, 0x76, - 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x74, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, 0x0a, 0x08, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, - 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, - 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x32, 0x70, 0x0a, 0x07, 0x52, 0x61, 0x77, 0x44, - 0x61, 0x74, 0x61, 0x12, 0x65, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x28, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x69, - 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x26, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, - 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x4a, 0x94, 0x07, 0x0a, 0x06, 0x12, - 0x04, 0x03, 0x00, 0x1e, 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, - 0x44, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, - 0x70, 0x74, 0x6f, 0x73, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, - 0x20, 0x53, 0x50, 0x44, 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, - 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, - 0x2d, 0x32, 0x2e, 0x30, 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x19, 0x0a, - 0x09, 0x0a, 0x02, 0x03, 0x00, 0x12, 0x03, 0x07, 0x00, 0x39, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x00, - 0x12, 0x04, 0x09, 0x00, 0x10, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x00, 0x01, 0x12, 0x03, 0x09, - 0x08, 0x1e, 0x0a, 0x39, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x0b, 0x02, 0x27, 0x1a, - 0x2c, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x73, 0x74, 0x61, 0x72, - 0x74, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, - 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x00, 0x02, 0x00, 0x04, 0x12, 0x03, 0x0b, 0x02, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x00, 0x02, 0x00, 0x05, 0x12, 0x03, 0x0b, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, - 0x00, 0x01, 0x12, 0x03, 0x0b, 0x12, 0x22, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x03, - 0x12, 0x03, 0x0b, 0x25, 0x26, 0x0a, 0x88, 0x01, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x01, 0x12, 0x03, - 0x0f, 0x02, 0x29, 0x1a, 0x7b, 0x20, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x3b, 0x20, - 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, 0x74, 0x6f, 0x20, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, - 0x20, 0x69, 0x6e, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x2e, 0x0a, 0x20, 0x49, 0x66, 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x70, 0x72, 0x65, 0x73, - 0x65, 0x6e, 0x74, 0x2c, 0x20, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x20, 0x61, 0x6e, 0x20, 0x69, - 0x6e, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x20, 0x6f, - 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x0a, - 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x04, 0x12, 0x03, 0x0f, 0x02, 0x0a, 0x0a, 0x0c, - 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x05, 0x12, 0x03, 0x0f, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, - 0x04, 0x00, 0x02, 0x01, 0x01, 0x12, 0x03, 0x0f, 0x12, 0x24, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, - 0x02, 0x01, 0x03, 0x12, 0x03, 0x0f, 0x27, 0x28, 0x0a, 0x3e, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, - 0x13, 0x00, 0x19, 0x01, 0x1a, 0x32, 0x20, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x20, 0x69, 0x73, 0x20, 0x61, - 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x0a, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, - 0x03, 0x13, 0x08, 0x1c, 0x0a, 0x2b, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x00, 0x12, 0x03, 0x15, 0x04, - 0x49, 0x1a, 0x1e, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x74, 0x72, - 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x04, 0x12, 0x03, 0x15, 0x04, 0x0c, 0x0a, - 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x06, 0x12, 0x03, 0x15, 0x0d, 0x36, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x01, 0x02, 0x00, 0x01, 0x12, 0x03, 0x15, 0x37, 0x43, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x01, 0x02, 0x00, 0x03, 0x12, 0x03, 0x15, 0x47, 0x48, 0x0a, 0x22, 0x0a, 0x04, 0x04, 0x01, 0x02, - 0x01, 0x12, 0x03, 0x18, 0x04, 0x21, 0x1a, 0x15, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, - 0x64, 0x3b, 0x20, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x20, 0x69, 0x64, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x01, 0x02, 0x01, 0x04, 0x12, 0x03, 0x18, 0x04, 0x0c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x01, 0x02, 0x01, 0x05, 0x12, 0x03, 0x18, 0x0d, 0x13, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, - 0x01, 0x01, 0x12, 0x03, 0x18, 0x14, 0x1c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x03, - 0x12, 0x03, 0x18, 0x1f, 0x20, 0x0a, 0x0a, 0x0a, 0x02, 0x06, 0x00, 0x12, 0x04, 0x1b, 0x00, 0x1e, - 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, 0x00, 0x01, 0x12, 0x03, 0x1b, 0x08, 0x0f, 0x0a, 0x7a, 0x0a, - 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, 0x03, 0x1d, 0x04, 0x56, 0x1a, 0x6d, 0x20, 0x47, 0x65, 0x74, - 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, 0x62, 0x61, - 0x74, 0x63, 0x68, 0x20, 0x77, 0x69, 0x74, 0x68, 0x6f, 0x75, 0x74, 0x20, 0x61, 0x6e, 0x79, 0x20, - 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x20, 0x66, 0x72, 0x6f, 0x6d, 0x20, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, - 0x61, 0x6e, 0x64, 0x20, 0x65, 0x6e, 0x64, 0x20, 0x69, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x20, 0x69, 0x73, 0x20, - 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, - 0x00, 0x01, 0x12, 0x03, 0x1d, 0x08, 0x17, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, - 0x12, 0x03, 0x1d, 0x18, 0x2e, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, - 0x1d, 0x39, 0x3f, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x1d, 0x40, - 0x54, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x6e, 0x74, 0x88, 0x01, 0x01, 0x12, 0x22, 0x0a, 0x0a, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, + 0x73, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x48, 0x02, 0x52, 0x09, 0x62, 0x61, + 0x74, 0x63, 0x68, 0x53, 0x69, 0x7a, 0x65, 0x88, 0x01, 0x01, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, + 0x15, 0x0a, 0x13, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x62, 0x61, 0x74, 0x63, 0x68, + 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x22, 0x93, 0x01, 0x0a, 0x14, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, + 0x0a, 0x0c, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, + 0x31, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x0c, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1e, + 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x48, 0x00, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0b, + 0x0a, 0x09, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x32, 0x70, 0x0a, 0x07, 0x52, + 0x61, 0x77, 0x44, 0x61, 0x74, 0x61, 0x12, 0x65, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x28, 0x2e, 0x61, 0x70, 0x74, 0x6f, + 0x73, 0x2e, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x69, 0x6e, 0x64, 0x65, + 0x78, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x4a, 0xda, 0x08, + 0x0a, 0x06, 0x12, 0x04, 0x03, 0x00, 0x22, 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, + 0x00, 0x12, 0x32, 0x44, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, + 0xa9, 0x20, 0x41, 0x70, 0x74, 0x6f, 0x73, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x0a, 0x20, 0x53, 0x50, 0x44, 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, + 0x2d, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2d, 0x32, 0x2e, 0x30, 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, + 0x00, 0x19, 0x0a, 0x09, 0x0a, 0x02, 0x03, 0x00, 0x12, 0x03, 0x07, 0x00, 0x39, 0x0a, 0x0a, 0x0a, + 0x02, 0x04, 0x00, 0x12, 0x04, 0x09, 0x00, 0x14, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x00, 0x01, + 0x12, 0x03, 0x09, 0x08, 0x1e, 0x0a, 0x39, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x0b, + 0x02, 0x27, 0x1a, 0x2c, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, + 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x04, 0x12, 0x03, 0x0b, 0x02, 0x0a, 0x0a, 0x0c, + 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x05, 0x12, 0x03, 0x0b, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, + 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x0b, 0x12, 0x22, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, + 0x02, 0x00, 0x03, 0x12, 0x03, 0x0b, 0x25, 0x26, 0x0a, 0x88, 0x01, 0x0a, 0x04, 0x04, 0x00, 0x02, + 0x01, 0x12, 0x03, 0x0f, 0x02, 0x29, 0x1a, 0x7b, 0x20, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x61, + 0x6c, 0x3b, 0x20, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, 0x74, 0x6f, 0x20, 0x72, 0x65, 0x74, + 0x75, 0x72, 0x6e, 0x20, 0x69, 0x6e, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x20, 0x49, 0x66, 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x70, + 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, 0x2c, 0x20, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x20, 0x61, + 0x6e, 0x20, 0x69, 0x6e, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x04, 0x12, 0x03, 0x0f, 0x02, + 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x05, 0x12, 0x03, 0x0f, 0x0b, 0x11, 0x0a, + 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x01, 0x12, 0x03, 0x0f, 0x12, 0x24, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x00, 0x02, 0x01, 0x03, 0x12, 0x03, 0x0f, 0x27, 0x28, 0x0a, 0x8b, 0x01, 0x0a, 0x04, + 0x04, 0x00, 0x02, 0x02, 0x12, 0x03, 0x13, 0x02, 0x21, 0x1a, 0x7e, 0x20, 0x4f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x61, 0x6c, 0x3b, 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x20, 0x73, 0x69, 0x7a, 0x65, + 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x20, 0x74, 0x6f, 0x20, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x20, 0x69, 0x6e, 0x20, 0x63, 0x75, + 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x20, 0x49, + 0x66, 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, 0x2c, 0x20, 0x72, + 0x65, 0x74, 0x75, 0x72, 0x6e, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x73, 0x69, 0x7a, 0x65, 0x20, 0x75, 0x70, 0x20, + 0x74, 0x6f, 0x20, 0x31, 0x30, 0x30, 0x30, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, + 0x02, 0x04, 0x12, 0x03, 0x13, 0x02, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x02, 0x05, + 0x12, 0x03, 0x13, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x02, 0x01, 0x12, 0x03, + 0x13, 0x12, 0x1c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x02, 0x03, 0x12, 0x03, 0x13, 0x1f, + 0x20, 0x0a, 0x3e, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, 0x17, 0x00, 0x1d, 0x01, 0x1a, 0x32, 0x20, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x20, + 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, + 0x0a, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, 0x03, 0x17, 0x08, 0x1c, 0x0a, 0x2b, 0x0a, + 0x04, 0x04, 0x01, 0x02, 0x00, 0x12, 0x03, 0x19, 0x04, 0x49, 0x1a, 0x1e, 0x20, 0x52, 0x65, 0x71, + 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x20, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, + 0x02, 0x00, 0x04, 0x12, 0x03, 0x19, 0x04, 0x0c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, + 0x06, 0x12, 0x03, 0x19, 0x0d, 0x36, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x01, 0x12, + 0x03, 0x19, 0x37, 0x43, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x03, 0x12, 0x03, 0x19, + 0x47, 0x48, 0x0a, 0x22, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x01, 0x12, 0x03, 0x1c, 0x04, 0x21, 0x1a, + 0x15, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x20, 0x69, 0x64, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x04, 0x12, + 0x03, 0x1c, 0x04, 0x0c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x05, 0x12, 0x03, 0x1c, + 0x0d, 0x13, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x01, 0x12, 0x03, 0x1c, 0x14, 0x1c, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x03, 0x12, 0x03, 0x1c, 0x1f, 0x20, 0x0a, 0x0a, + 0x0a, 0x02, 0x06, 0x00, 0x12, 0x04, 0x1f, 0x00, 0x22, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, 0x00, + 0x01, 0x12, 0x03, 0x1f, 0x08, 0x0f, 0x0a, 0x7a, 0x0a, 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, 0x03, + 0x21, 0x04, 0x56, 0x1a, 0x6d, 0x20, 0x47, 0x65, 0x74, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x20, 0x77, 0x69, 0x74, + 0x68, 0x6f, 0x75, 0x74, 0x20, 0x61, 0x6e, 0x79, 0x20, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x69, + 0x6e, 0x67, 0x20, 0x66, 0x72, 0x6f, 0x6d, 0x20, 0x73, 0x74, 0x61, 0x72, 0x74, 0x69, 0x6e, 0x67, + 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x61, 0x6e, 0x64, 0x20, 0x65, 0x6e, 0x64, + 0x20, 0x69, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, + 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x20, 0x69, 0x73, 0x20, 0x70, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, + 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x21, 0x08, 0x17, + 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x21, 0x18, 0x2e, 0x0a, 0x0c, + 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, 0x21, 0x39, 0x3f, 0x0a, 0x0c, 0x0a, 0x05, + 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x21, 0x40, 0x54, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, ]; include!("aptos.indexer.v1.serde.rs"); include!("aptos.indexer.v1.tonic.rs"); diff --git a/crates/aptos-protos/src/pb/aptos.indexer.v1.serde.rs b/crates/aptos-protos/src/pb/aptos.indexer.v1.serde.rs index bab9b34866731..a2a36aa92fe07 100644 --- a/crates/aptos-protos/src/pb/aptos.indexer.v1.serde.rs +++ b/crates/aptos-protos/src/pb/aptos.indexer.v1.serde.rs @@ -15,6 +15,9 @@ impl serde::Serialize for GetTransactionsRequest { if self.transactions_count.is_some() { len += 1; } + if self.batch_size.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("aptos.indexer.v1.GetTransactionsRequest", len)?; if let Some(v) = self.starting_version.as_ref() { struct_ser.serialize_field("startingVersion", ToString::to_string(&v).as_str())?; @@ -22,6 +25,9 @@ impl serde::Serialize for GetTransactionsRequest { if let Some(v) = self.transactions_count.as_ref() { struct_ser.serialize_field("transactionsCount", ToString::to_string(&v).as_str())?; } + if let Some(v) = self.batch_size.as_ref() { + struct_ser.serialize_field("batchSize", ToString::to_string(&v).as_str())?; + } struct_ser.end() } } @@ -36,12 +42,15 @@ impl<'de> serde::Deserialize<'de> for GetTransactionsRequest { "startingVersion", "transactions_count", "transactionsCount", + "batch_size", + "batchSize", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { StartingVersion, TransactionsCount, + BatchSize, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -65,6 +74,7 @@ impl<'de> serde::Deserialize<'de> for GetTransactionsRequest { match value { "startingVersion" | "starting_version" => Ok(GeneratedField::StartingVersion), "transactionsCount" | "transactions_count" => Ok(GeneratedField::TransactionsCount), + "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -86,6 +96,7 @@ impl<'de> serde::Deserialize<'de> for GetTransactionsRequest { { let mut starting_version__ = None; let mut transactions_count__ = None; + let mut batch_size__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::StartingVersion => { @@ -104,11 +115,20 @@ impl<'de> serde::Deserialize<'de> for GetTransactionsRequest { map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::BatchSize => { + if batch_size__.is_some() { + return Err(serde::de::Error::duplicate_field("batchSize")); + } + batch_size__ = + map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } } } Ok(GetTransactionsRequest { starting_version: starting_version__, transactions_count: transactions_count__, + batch_size: batch_size__, }) } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 7551f9ab3801b..27560e5c87460 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -23,7 +23,7 @@ use prost::Message; use serde::{Deserialize, Serialize}; use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::sync::{ - mpsc::{channel, error::TrySendError}, + mpsc::{channel, error::SendTimeoutError}, watch::channel as watch_channel, }; use tokio_stream::wrappers::ReceiverStream; @@ -48,13 +48,14 @@ const AHEAD_OF_CACHE_RETRY_SLEEP_DURATION_MS: u64 = 50; // TODO(larry): fix all errors treated as transient errors. const TRANSIENT_DATA_ERROR_RETRY_SLEEP_DURATION_MS: u64 = 1000; -// TODO(larry): replace this with a exponential backoff. -// The server will not fetch more data from the cache and file store until the channel is not full. -const RESPONSE_CHANNEL_FULL_BACKOFF_DURATION_MS: u64 = 1000; // Up to MAX_RESPONSE_CHANNEL_SIZE response can be buffered in the channel. If the channel is full, // the server will not fetch more data from the cache and file store until the channel is not full. const MAX_RESPONSE_CHANNEL_SIZE: usize = 40; +// The server will retry to send the response to the client and give up after RESPONSE_CHANNEL_SEND_TIMEOUT. +// This is to prevent the server from being occupied by a slow client. +const RESPONSE_CHANNEL_SEND_TIMEOUT: Duration = Duration::from_secs(120); + pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, @@ -104,11 +105,14 @@ impl RawData for RawDataServerWrapper { Ok(request_metadata) => request_metadata, _ => return Result::Err(Status::aborted("Invalid request token")), }; + let request = req.into_inner(); + + let transactions_count = request.transactions_count; // Response channel to stream the data to the client. let (tx, rx) = channel(MAX_RESPONSE_CHANNEL_SIZE); - let mut current_version = match req.into_inner().starting_version { - Some(version) => version, + let mut current_version = match &request.starting_version { + Some(version) => *version, None => { return Result::Err(Status::aborted("Starting version is not set")); }, @@ -140,6 +144,7 @@ impl RawData for RawDataServerWrapper { let request_metadata_clone = request_metadata.clone(); tokio::spawn( async move { + let mut transactions_count = transactions_count; let request_metadata = request_metadata_clone; let conn = match redis_client.get_async_connection().await { Ok(conn) => conn, @@ -147,11 +152,8 @@ impl RawData for RawDataServerWrapper { ERROR_COUNT .with_label_values(&["redis_connection_failed"]) .inc(); - tx.send(Err(Status::unavailable( - "[Indexer Data] Cannot connect to Redis; please retry.", - ))) - .await - .unwrap(); + // Connection will be dropped anyway, so we ignore the error here. + let _result = tx.send_timeout(Err(Status::unavailable("[Indexer Data] Cannot connect to Redis; please retry.")), RESPONSE_CHANNEL_SEND_TIMEOUT).await; error!( error = e.to_string(), "[Indexer Data] Failed to get redis connection." @@ -168,11 +170,8 @@ impl RawData for RawDataServerWrapper { ERROR_COUNT .with_label_values(&["redis_get_chain_id_failed"]) .inc(); - tx.send(Err(Status::unavailable( - "[Indexer Data] Cannot get the chain id; please retry.", - ))) - .await - .unwrap(); + // Connection will be dropped anyway, so we ignore the error here. + let _result = tx.send_timeout(Err(Status::unavailable("[Indexer Data] Cannot get the chain id; please retry.")), RESPONSE_CHANNEL_SEND_TIMEOUT).await; error!( error = e.to_string(), "[Indexer Data] Failed to get chain id." @@ -191,7 +190,7 @@ impl RawData for RawDataServerWrapper { loop { // 1. Fetch data from cache and file store. - let transaction_data = match data_fetch( + let mut transaction_data = match data_fetch( current_version, &mut cache_operator, file_store_operator.as_ref(), @@ -216,6 +215,18 @@ impl RawData for RawDataServerWrapper { continue; }, }; + if let Some(count) = transactions_count { + if count == 0 { + // End the data stream. + break; + } else if (count as usize) < transaction_data.len() { + // Trim the data to the requested size. + transaction_data.truncate(count as usize); + transactions_count = Some(0); + } else { + transactions_count = Some(count - transaction_data.len() as u64); + } + }; // 2. Push the data to the response channel, i.e. stream the data to the client. let resp_item = get_transactions_response_builder(transaction_data, chain_id as u32); @@ -229,7 +240,7 @@ impl RawData for RawDataServerWrapper { .timestamp .as_ref() .map(time_diff_since_pb_timestamp_in_secs); - match tx.try_send(Result::::Ok(resp_item)) { + match tx.send_timeout(Result::::Ok(resp_item), RESPONSE_CHANNEL_SEND_TIMEOUT).await { Ok(_) => { PROCESSED_BATCH_SIZE .with_label_values(&[ @@ -261,18 +272,11 @@ impl RawData for RawDataServerWrapper { .observe(data_latency_in_secs); } }, - Err(TrySendError::Full(_)) => { - warn!("[Indexer Data] Receiver is full; retrying."); - tokio::time::sleep(Duration::from_millis( - RESPONSE_CHANNEL_FULL_BACKOFF_DURATION_MS, - )) - .await; - continue; + Err(SendTimeoutError::Timeout(_)) => { + warn!("[Indexer Data] Receiver is full; exiting."); + break; }, - Err(TrySendError::Closed(_)) => { - ERROR_COUNT - .with_label_values(&["response_channel_closed"]) - .inc(); + Err(SendTimeoutError::Closed(_)) => { warn!("[Indexer Data] Receiver is closed; exiting."); break; }, @@ -282,8 +286,8 @@ impl RawData for RawDataServerWrapper { current_version = end_of_batch_version + 1; if watch_sender.send(current_version).is_err() { error!( - "[Indexer Data] Failed to send the current version to the watch channel." - ); + "[Indexer Data] Failed to send the current version to the watch channel." + ); break; } info!(