Skip to content

Commit

Permalink
[GRPC] Enable data service ZSTD and update crate that uses old tonic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
CapCap authored Jun 15, 2024
1 parent 69d241b commit 2c3de68
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ x = "run --package aptos-cargo-cli --bin aptos-cargo-cli --"
[build]
rustflags = ["--cfg", "tokio_unstable", "-C", "force-frame-pointers=yes", "-C", "force-unwind-tables=yes"]

# TODO(grao): Figure out whether we should enable other cpu features, and whether we should use a different way to configure them rather than list every single one here.
# TODO(grao): Figure out whether we should enable othaer cpu features, and whether we should use a different way to configure them rather than list every single one here.
[target.x86_64-unknown-linux-gnu]
rustflags = ["--cfg", "tokio_unstable", "-C", "link-arg=-fuse-ld=lld", "-C", "force-frame-pointers=yes", "-C", "force-unwind-tables=yes", "-C", "target-feature=+sse4.2"]

Expand Down
99 changes: 73 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ codespan = "0.11.1"
codespan-reporting = "0.11.1"
colored = "2.0.0"
concurrent-queue = "2.2.0"
console-subscriber = "0.1.8"
console-subscriber = "0.3.0"
const_format = "0.2.26"
core_affinity = "0.8.1"
coset = "0.3"
Expand Down Expand Up @@ -864,3 +864,4 @@ debug = true
serde-reflection = { git = "https://github.com/aptos-labs/serde-reflection", rev = "73b6bbf748334b71ff6d7d09d06a29e3062ca075" }
merlin = { git = "https://github.com/aptos-labs/merlin" }
x25519-dalek = { git = "https://github.com/aptos-labs/x25519-dalek", branch = "zeroize_v1" }
tonic = { git = "https://github.com/aptos-labs/tonic.git", rev = "0da1ba8b1751d6e19eb55be24cccf9ae933c666e" }
20 changes: 15 additions & 5 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,25 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
.register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET)
.build()
.map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?;
.map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))?
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);

let cache_storage_format: StorageFormat = if self.enable_cache_compression {
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::Base64UncompressedProto
};

println!(
">>>> Starting Redis connection: {:?}",
&self.redis_read_replica_address.0
);
let redis_conn = redis::Client::open(self.redis_read_replica_address.0.clone())?
.get_tokio_connection_manager()
.await?;

println!(">>>> Redis connection established");
// InMemoryCache.
let in_memory_cache =
aptos_indexer_grpc_utils::in_memory_cache::InMemoryCache::new_with_redis_connection(
Expand All @@ -157,6 +164,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
cache_storage_format,
)
.await?;
println!(">>>> InMemoryCache established");
// Add authentication interceptor.
let server = RawDataServerWrapper::new(
self.redis_read_replica_address.clone(),
Expand All @@ -170,9 +178,11 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
Arc::new(in_memory_cache),
)?;
let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(server)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
println!(">>>> Starting gRPC server: {:?}", &svc);

let svc_clone = svc.clone();
let reflection_service_clone = reflection_service.clone();

Expand Down
25 changes: 22 additions & 3 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ pub struct RawDataServerWrapper {
in_memory_cache: Arc<InMemoryCache>,
}

// Exclude in_memory-cache
impl std::fmt::Debug for RawDataServerWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawDataServerWrapper")
.field("redis_client", &"Arc<redis::Client>")
.field("file_store_config", &self.file_store_config)
.field(
"data_service_response_channel_size",
&self.data_service_response_channel_size,
)
.field(
"sender_addresses_to_ignore",
&self.sender_addresses_to_ignore,
)
.field("cache_storage_format", &self.cache_storage_format)
.finish()
}
}

impl RawDataServerWrapper {
pub fn new(
redis_address: RedisUrl,
Expand Down Expand Up @@ -597,14 +616,14 @@ fn ensure_sequential_transactions(mut batches: Vec<Vec<Transaction>>) -> Vec<Tra
// If this batch is fully contained within the previous batch, skip it
if prev_start <= start_version && prev_end >= end_version {
NUM_MULTI_FETCH_OVERLAPPED_VERSIONS
.with_label_values(&[SERVICE_TYPE, &"full"])
.with_label_values(&[SERVICE_TYPE, "full"])
.inc_by(end_version - start_version);
continue;
}
// If this batch overlaps with the previous batch, combine them
if prev_end >= start_version {
NUM_MULTI_FETCH_OVERLAPPED_VERSIONS
.with_label_values(&[SERVICE_TYPE, &"partial"])
.with_label_values(&[SERVICE_TYPE, "partial"])
.inc_by(prev_end - start_version + 1);
tracing::debug!(
batch_first_version = first_version,
Expand All @@ -622,7 +641,7 @@ fn ensure_sequential_transactions(mut batches: Vec<Vec<Transaction>>) -> Vec<Tra
// Otherwise there is a gap
if prev_end + 1 != start_version {
NUM_MULTI_FETCH_OVERLAPPED_VERSIONS
.with_label_values(&[SERVICE_TYPE, &"gap"])
.with_label_values(&[SERVICE_TYPE, "gap"])
.inc_by(prev_end - start_version + 1);

tracing::error!(
Expand Down
12 changes: 6 additions & 6 deletions ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ pub fn bootstrap(
let router = match use_data_service_interface {
false => {
let svc = FullnodeDataServer::new(server)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
tonic_server.add_service(svc)
},
true => {
let svc = RawDataServer::new(localnet_data_server)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
tonic_server.add_service(svc)
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const IN_MEMORY_CACHE_GC_INTERVAL_MS: u64 = 100;
// Max cache entry TTL: 30 seconds.
// const MAX_IN_MEMORY_CACHE_ENTRY_TTL: u64 = 30;
// Warm-up cache entries. Pre-fetch the cache entries to warm up the cache.
pub const WARM_UP_CACHE_ENTRIES: u64 = 20_000;
pub const WARM_UP_CACHE_ENTRIES: u64 = 100;
pub const MAX_REDIS_FETCH_BATCH_SIZE: usize = 500;

/// Configuration for when we want to explicitly declare how large the cache should be.
Expand Down Expand Up @@ -85,6 +85,7 @@ struct CacheMetadata {
}

/// InMemoryCache is a simple in-memory cache that stores the protobuf Transaction.
#[derive(Debug)]
pub struct InMemoryCache {
/// Cache maps the cache key to the deserialized Transaction.
cache: Arc<DashMap<u64, Arc<Transaction>>>,
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn create_grpc_client(address: Url) -> GrpcClientType {
Ok(client
.max_decoding_message_size(usize::MAX)
.max_encoding_message_size(usize::MAX)
.send_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd))
},
Expand Down

0 comments on commit 2c3de68

Please sign in to comment.