Skip to content

Commit

Permalink
refactor(iceberg): move scan from batch to connector (#20567)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Feb 24, 2025
1 parent e1e6f6d commit 15d8ef6
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/batch/executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
scopeguard = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand All @@ -47,7 +46,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
] }
tokio-postgres = "0.7"
tokio-stream = { workspace = true }
tracing = "0.1"
uuid = { version = "1", features = ["v4"] }

Expand All @@ -61,6 +59,7 @@ risingwave_expr_impl = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
tempfile = "3"
tikv-jemallocator = { workspace = true }
tokio-stream = { workspace = true }

[[bench]]
name = "filter"
Expand Down
73 changes: 16 additions & 57 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema,
};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::WithOptionsSecResolved;
use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergProperties, IcebergSplit};
use risingwave_connector::source::iceberg::{
IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk,
};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_expr::expr::LiteralExpression;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -92,7 +90,6 @@ impl IcebergScanExecutor {
async fn do_execute(mut self: Box<Self>) {
let table = self.iceberg_config.load_table().await?;
let data_types = self.schema.data_types();
let table_name = table.identifier().name().to_owned();

let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks,
Expand All @@ -110,58 +107,20 @@ impl IcebergScanExecutor {
}
};

let mut read_bytes = 0;
let _metrics_report_guard = scopeguard::guard(
(read_bytes, table_name, self.metrics.clone()),
|(read_bytes, table_name, metrics)| {
if let Some(metrics) = metrics {
metrics
.iceberg_scan_metrics()
.iceberg_read_bytes
.with_guarded_label_values(&[&table_name])
.inc_by(read_bytes as _);
}
},
);
for data_file_scan_task in data_file_scan_tasks {
let data_file_path = data_file_scan_task.data_file_path.clone();
let data_sequence_number = data_file_scan_task.sequence_number;

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));

let mut record_batch_stream =
reader.read(Box::pin(file_scan_stream)).await?.enumerate();

while let Some((index, record_batch)) = record_batch_stream.next().await {
let record_batch = record_batch?;

// iceberg_t1_source
let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
if self.need_seq_num {
let (mut columns, visibility) = chunk.into_parts();
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
vec![data_sequence_number; visibility.len()],
))));
chunk = DataChunk::from_parts(columns.into(), visibility)
};
if self.need_file_path_and_pos {
let (mut columns, visibility) = chunk.into_parts();
columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
vec![data_file_path.as_str(); visibility.len()],
))));
let index_start = (index * self.batch_size) as i64;
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
(index_start..(index_start + visibility.len() as i64))
.collect::<Vec<i64>>(),
))));
chunk = DataChunk::from_parts(columns.into(), visibility)
}
#[for_await]
for chunk in scan_task_to_chunk(
table.clone(),
data_file_scan_task,
IcebergScanOpts {
batch_size: self.batch_size,
need_seq_num: self.need_seq_num,
need_file_path_and_pos: self.need_file_path_and_pos,
},
self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
) {
let chunk = chunk?;
assert_eq!(chunk.data_types(), data_types);
read_bytes += chunk.estimated_heap_size() as u64;
yield chunk;
}
}
Expand Down
33 changes: 4 additions & 29 deletions src/batch/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use prometheus::{
Histogram, IntGauge, Registry, histogram_opts, register_histogram_with_registry,
register_int_counter_with_registry,
};
use risingwave_common::metrics::{LabelGuardedIntCounterVec, TrAdderGauge};
use risingwave_common::metrics::TrAdderGauge;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_connector::source::iceberg::IcebergScanMetrics;

/// Metrics for batch executor.
/// Currently, it contains:
Expand Down Expand Up @@ -93,8 +94,8 @@ impl BatchMetricsInner {
&self.batch_manager_metrics
}

pub fn iceberg_scan_metrics(&self) -> &IcebergScanMetrics {
&self.iceberg_scan_metrics
pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
self.iceberg_scan_metrics.clone()
}

pub fn for_test() -> BatchMetrics {
Expand Down Expand Up @@ -182,29 +183,3 @@ impl BatchSpillMetrics {
Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone())
}
}

#[derive(Clone)]
pub struct IcebergScanMetrics {
pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>,
}

impl IcebergScanMetrics {
fn new(registry: &Registry) -> Self {
let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
"iceberg_read_bytes",
"Total size of iceberg read requests",
&["table_name"],
registry
)
.unwrap();

Self { iceberg_read_bytes }
}

pub fn for_test() -> Arc<Self> {
Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
}
}

pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));
5 changes: 2 additions & 3 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;
use risingwave_common::config::{BatchConfig, MetricLevel};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::worker_util::WorkerNodeId;
use risingwave_connector::source::iceberg::IcebergScanMetrics;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_storage::StateStoreImpl;

use crate::monitor::{
BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics, IcebergScanMetrics,
};
use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics};
use crate::task::BatchManager;

/// The global environment for task execution.
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::Duration;

use risingwave_batch::monitor::{
GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
GLOBAL_ICEBERG_SCAN_METRICS,
};
use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
use risingwave_batch::spill::spill_op::SpillOp;
Expand All @@ -40,6 +39,7 @@ use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::common::WorkerType;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ rustls-native-certs = "0.8"
rustls-pemfile = "2"
rustls-pki-types = "1"
rw_futures_util = { workspace = true }
scopeguard = "1"
sea-schema = { version = "0.16", default-features = false, features = [
"discovery",
"sqlx-postgres",
Expand All @@ -137,7 +138,6 @@ strum = "0.26"
strum_macros = "0.26"
tempfile = "3"
thiserror = "1"

thiserror-ext = { workspace = true }
# To easiy get the type_name and impl IntoSql for rust_decimal, we fork the crate.
# Another reason is that we are planning to refactor their IntoSql trait to allow specifying the type to convert.
Expand Down
46 changes: 46 additions & 0 deletions src/connector/src/source/iceberg/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, LazyLock};

use prometheus::Registry;
use risingwave_common::metrics::LabelGuardedIntCounterVec;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::register_guarded_int_counter_vec_with_registry;

#[derive(Clone)]
pub struct IcebergScanMetrics {
pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>,
}

impl IcebergScanMetrics {
fn new(registry: &Registry) -> Self {
let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
"iceberg_read_bytes",
"Total size of iceberg read requests",
&["table_name"],
registry
)
.unwrap();

Self { iceberg_read_bytes }
}

pub fn for_test() -> Arc<Self> {
Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
}
}

pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));
Loading

0 comments on commit 15d8ef6

Please sign in to comment.