Skip to content

Commit

Permalink
[kv store] add metrics (#21385)
Browse files Browse the repository at this point in the history
## Description 

bigtable get metrics


---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
phoenix-o authored Mar 10, 2025
1 parent 9c16c2e commit 80e680b
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 5 deletions.
4 changes: 4 additions & 0 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ async fn main() -> Result<()> {
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
"ingestion".to_string(),
&registry,
)
.await?;
bigtable_store = Some(BigTableProgressStore::new(bigtable_client));
Expand Down Expand Up @@ -173,6 +175,8 @@ async fn main() -> Result<()> {
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
"ingestion".to_string(),
&registry,
)
.await?;
let worker_pool = WorkerPool::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-jsonrpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Context {
let pg_loader = Arc::new(pg_reader.as_data_loader());

let kv_loader = if let Some(config) = config.bigtable.clone() {
let bigtable_reader = BigtableReader::new(config.instance_id).await?;
let bigtable_reader = BigtableReader::new(config.instance_id, registry).await?;
KvLoader::new_with_bigtable(Arc::new(bigtable_reader.as_data_loader()))
} else {
KvLoader::new_with_pg(pg_loader.clone())
Expand Down
15 changes: 11 additions & 4 deletions crates/sui-indexer-alt-jsonrpc/src/data/bigtable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use async_graphql::dataloader::DataLoader;
use prometheus::Registry;
use sui_kvstore::BigTableClient;

use crate::data::error::Error;
Expand All @@ -13,15 +14,21 @@ use crate::data::error::Error;
pub struct BigtableReader(pub(crate) BigTableClient);

impl BigtableReader {
pub(crate) async fn new(instance_id: String) -> Result<Self, Error> {
pub(crate) async fn new(instance_id: String, registry: &Registry) -> Result<Self, Error> {
if std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_err() {
return Err(Error::BigtableCreate(anyhow::anyhow!(
"Environment variable GOOGLE_APPLICATION_CREDENTIALS is not set"
)));
}
let client = BigTableClient::new_remote(instance_id, true, None)
.await
.map_err(Error::BigtableCreate)?;
let client = BigTableClient::new_remote(
instance_id,
true,
None,
"indexer-alt-jsonrpc".to_string(),
registry,
)
.await
.map_err(Error::BigtableCreate)?;
Ok(Self(client))
}

Expand Down
89 changes: 89 additions & 0 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::bigtable::metrics::KvMetrics;
use crate::bigtable::proto::bigtable::v2::bigtable_client::BigtableClient as BigtableInternalClient;
use crate::bigtable::proto::bigtable::v2::mutate_rows_request::Entry;
use crate::bigtable::proto::bigtable::v2::mutation::SetCell;
Expand All @@ -14,11 +15,13 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use gcp_auth::{Token, TokenProvider};
use http::{HeaderValue, Request, Response};
use prometheus::Registry;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::Instant;
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::digests::CheckpointDigest;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,6 +65,8 @@ struct AuthChannel {
pub struct BigTableClient {
table_prefix: String,
client: BigtableInternalClient<AuthChannel>,
client_name: String,
metrics: Option<Arc<KvMetrics>>,
}

#[async_trait]
Expand Down Expand Up @@ -281,13 +286,17 @@ impl BigTableClient {
Ok(Self {
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_id),
client: BigtableInternalClient::new(auth_channel),
client_name: "local".to_string(),
metrics: None,
})
}

pub async fn new_remote(
instance_id: String,
is_read_only: bool,
timeout: Option<Duration>,
client_name: String,
registry: &Registry,
) -> Result<Self> {
let policy = if is_read_only {
"https://www.googleapis.com/auth/bigtable.data.readonly"
Expand Down Expand Up @@ -319,6 +328,8 @@ impl BigTableClient {
Ok(Self {
table_prefix,
client: BigtableInternalClient::new(auth_channel),
client_name,
metrics: Some(KvMetrics::new(registry)),
})
}

Expand Down Expand Up @@ -423,6 +434,54 @@ impl BigTableClient {
&mut self,
table_name: &str,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Vec<(Bytes, Bytes)>>> {
let elapsed = Instant::now().elapsed();
let num_keys_requested = keys.len();
let result = self.multi_get_internal(table_name, keys).await;
let labels = [&self.client_name, table_name];
match &self.metrics {
None => result,
Some(metrics) => match result {
Err(e) => {
metrics.kv_get_errors.with_label_values(&labels).inc();
Err(e)
}
Ok(result) => {
metrics
.kv_get_batch_size
.with_label_values(&labels)
.observe(num_keys_requested as f64);
if num_keys_requested > result.len() {
metrics
.kv_get_not_found
.with_label_values(&labels)
.inc_by((num_keys_requested - result.len()) as u64);
}
metrics
.kv_get_success
.with_label_values(&labels)
.inc_by(result.len() as u64);
let elapsed_ms = elapsed.as_millis() as f64;
metrics
.kv_get_latency_ms
.with_label_values(&labels)
.observe(elapsed_ms);
if num_keys_requested > 0 {
metrics
.kv_get_latency_ms_per_key
.with_label_values(&labels)
.observe(elapsed_ms / num_keys_requested as f64);
}
Ok(result)
}
},
}
}

pub async fn multi_get_internal(
&mut self,
table_name: &str,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Vec<(Bytes, Bytes)>>> {
let request = ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
Expand All @@ -444,6 +503,36 @@ impl BigTableClient {
&mut self,
table_name: &str,
upper_limit: Bytes,
) -> Result<Vec<(Bytes, Vec<(Bytes, Bytes)>)>> {
let elapsed = Instant::now().elapsed();
let result = self.reversed_scan_internal(table_name, upper_limit).await;
let labels = [&self.client_name, table_name];
match &self.metrics {
Some(metrics) => match result {
Ok(result) => {
metrics.kv_scan_success.with_label_values(&labels).inc();
if result.is_empty() {
metrics.kv_scan_not_found.with_label_values(&labels).inc();
}
metrics
.kv_scan_latency_ms
.with_label_values(&labels)
.observe(elapsed.as_millis() as f64);
Ok(result)
}
Err(e) => {
metrics.kv_scan_error.with_label_values(&labels).inc();
Err(e)
}
},
None => result,
}
}

async fn reversed_scan_internal(
&mut self,
table_name: &str,
upper_limit: Bytes,
) -> Result<Vec<(Bytes, Vec<(Bytes, Bytes)>)>> {
let range = RowRange {
start_key: None,
Expand Down
110 changes: 110 additions & 0 deletions crates/sui-kvstore/src/bigtable/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramVec,
IntCounterVec, Registry,
};
use std::sync::Arc;

pub(crate) struct KvMetrics {
pub kv_get_success: IntCounterVec,
pub kv_get_not_found: IntCounterVec,
pub kv_get_errors: IntCounterVec,
pub kv_get_latency_ms: HistogramVec,
pub kv_get_batch_size: HistogramVec,
pub kv_get_latency_ms_per_key: HistogramVec,
pub kv_scan_success: IntCounterVec,
pub kv_scan_not_found: IntCounterVec,
pub kv_scan_error: IntCounterVec,
pub kv_scan_latency_ms: HistogramVec,
}

impl KvMetrics {
pub(crate) fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
kv_get_success: register_int_counter_vec_with_registry!(
"kv_get_success",
"Number of successful fetches from kv store",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_not_found: register_int_counter_vec_with_registry!(
"kv_get_not_found",
"Number of fetches from kv store that returned not found",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_errors: register_int_counter_vec_with_registry!(
"kv_get_errors",
"Number of fetches from kv store that returned an error",
&["client", "table"],
registry,
)
.unwrap(),
kv_get_latency_ms: register_histogram_vec_with_registry!(
"kv_get_latency_ms",
"Latency of fetches from kv store",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 24)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
kv_get_batch_size: register_histogram_vec_with_registry!(
"kv_get_batch_size",
"Number of keys fetched per batch from kv store",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 20)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
kv_get_latency_ms_per_key: register_histogram_vec_with_registry!(
"kv_get_latency_ms_per_key",
"Latency of fetches from kv store per key",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 24)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
kv_scan_success: register_int_counter_vec_with_registry!(
"kv_scan_success",
"Number of successful scans from kv store",
&["client", "table"],
registry,
)
.unwrap(),
kv_scan_not_found: register_int_counter_vec_with_registry!(
"kv_scan_not_found",
"Number of fetches from kv store that returned not found",
&["client", "table"],
registry,
)
.unwrap(),
kv_scan_error: register_int_counter_vec_with_registry!(
"kv_scan_error",
"Number of scans from kv store that returned an error",
&["client", "table"],
registry,
)
.unwrap(),
kv_scan_latency_ms: register_histogram_vec_with_registry!(
"kv_scan_latency_ms",
"Latency of scans from kv store",
&["client", "table"],
prometheus::exponential_buckets(1.0, 1.6, 24)
.unwrap()
.to_vec(),
registry,
)
.unwrap(),
})
}
}
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod client;
mod metrics;
pub(crate) mod progress_store;
mod proto;
pub(crate) mod worker;

0 comments on commit 80e680b

Please sign in to comment.