diff --git a/Cargo.lock b/Cargo.lock index e0ae92397a970..04aa3ac80e577 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10490,7 +10490,6 @@ dependencies = [ "risingwave_rpc_client", "risingwave_storage", "rw_futures_util", - "scopeguard", "tempfile", "thiserror-ext", "tikv-jemallocator", @@ -11015,6 +11014,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "rw_futures_util", + "scopeguard", "sea-schema", "serde", "serde_derive", diff --git a/src/batch/executors/Cargo.toml b/src/batch/executors/Cargo.toml index 47483f5d55235..f085f931e587b 100644 --- a/src/batch/executors/Cargo.toml +++ b/src/batch/executors/Cargo.toml @@ -35,7 +35,6 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_storage = { workspace = true } rw_futures_util = { workspace = true } -scopeguard = "1" thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", @@ -47,7 +46,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", ] } tokio-postgres = "0.7" -tokio-stream = { workspace = true } tracing = "0.1" uuid = { version = "1", features = ["v4"] } @@ -61,6 +59,7 @@ risingwave_expr_impl = { workspace = true } risingwave_hummock_sdk = { workspace = true } tempfile = "3" tikv-jemallocator = { workspace = true } +tokio-stream = { workspace = true } [[bench]] name = "filter" diff --git a/src/batch/executors/src/executor/iceberg_scan.rs b/src/batch/executors/src/executor/iceberg_scan.rs index 384e992a50d3f..a52a9d69a9024 100644 --- a/src/batch/executors/src/executor/iceberg_scan.rs +++ b/src/batch/executors/src/executor/iceberg_scan.rs @@ -12,20 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use itertools::Itertools; -use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array}; +use risingwave_common::array::DataChunk; use risingwave_common::catalog::{ Field, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema, }; use risingwave_common::types::{DataType, ScalarImpl}; -use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::WithOptionsSecResolved; -use risingwave_connector::source::iceberg::{IcebergFileScanTask, IcebergProperties, IcebergSplit}; +use risingwave_connector::source::iceberg::{ + IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk, +}; use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; use risingwave_expr::expr::LiteralExpression; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -92,7 +90,6 @@ impl IcebergScanExecutor { async fn do_execute(mut self: Box) { let table = self.iceberg_config.load_table().await?; let data_types = self.schema.data_types(); - let table_name = table.identifier().name().to_owned(); let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) { Some(IcebergFileScanTask::Data(data_file_scan_tasks)) => data_file_scan_tasks, @@ -110,58 +107,20 @@ impl IcebergScanExecutor { } }; - let mut read_bytes = 0; - let _metrics_report_guard = scopeguard::guard( - (read_bytes, table_name, self.metrics.clone()), - |(read_bytes, table_name, metrics)| { - if let Some(metrics) = metrics { - metrics - .iceberg_scan_metrics() - .iceberg_read_bytes - .with_guarded_label_values(&[&table_name]) - .inc_by(read_bytes as _); - } - }, - ); for data_file_scan_task in data_file_scan_tasks { - let data_file_path = data_file_scan_task.data_file_path.clone(); - let data_sequence_number = data_file_scan_task.sequence_number; - - let reader = table - .reader_builder() - .with_batch_size(self.batch_size) - .build(); - let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); - - let mut record_batch_stream = - reader.read(Box::pin(file_scan_stream)).await?.enumerate(); - - while let Some((index, record_batch)) = record_batch_stream.next().await { - let record_batch = record_batch?; - - // iceberg_t1_source - let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - if self.need_seq_num { - let (mut columns, visibility) = chunk.into_parts(); - columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( - vec![data_sequence_number; visibility.len()], - )))); - chunk = DataChunk::from_parts(columns.into(), visibility) - }; - if self.need_file_path_and_pos { - let (mut columns, visibility) = chunk.into_parts(); - columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter( - vec![data_file_path.as_str(); visibility.len()], - )))); - let index_start = (index * self.batch_size) as i64; - columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( - (index_start..(index_start + visibility.len() as i64)) - .collect::>(), - )))); - chunk = DataChunk::from_parts(columns.into(), visibility) - } + #[for_await] + for chunk in scan_task_to_chunk( + table.clone(), + data_file_scan_task, + IcebergScanOpts { + batch_size: self.batch_size, + need_seq_num: self.need_seq_num, + need_file_path_and_pos: self.need_file_path_and_pos, + }, + self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()), + ) { + let chunk = chunk?; assert_eq!(chunk.data_types(), data_types); - read_bytes += chunk.estimated_heap_size() as u64; yield chunk; } } diff --git a/src/batch/src/monitor/stats.rs b/src/batch/src/monitor/stats.rs index e9db97b0c91a9..528f907c4cb19 100644 --- a/src/batch/src/monitor/stats.rs +++ b/src/batch/src/monitor/stats.rs @@ -19,8 +19,9 @@ use prometheus::{ Histogram, IntGauge, Registry, histogram_opts, register_histogram_with_registry, register_int_counter_with_registry, }; -use risingwave_common::metrics::{LabelGuardedIntCounterVec, TrAdderGauge}; +use risingwave_common::metrics::TrAdderGauge; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; +use risingwave_connector::source::iceberg::IcebergScanMetrics; /// Metrics for batch executor. /// Currently, it contains: @@ -93,8 +94,8 @@ impl BatchMetricsInner { &self.batch_manager_metrics } - pub fn iceberg_scan_metrics(&self) -> &IcebergScanMetrics { - &self.iceberg_scan_metrics + pub fn iceberg_scan_metrics(&self) -> Arc { + self.iceberg_scan_metrics.clone() } pub fn for_test() -> BatchMetrics { @@ -182,29 +183,3 @@ impl BatchSpillMetrics { Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone()) } } - -#[derive(Clone)] -pub struct IcebergScanMetrics { - pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>, -} - -impl IcebergScanMetrics { - fn new(registry: &Registry) -> Self { - let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!( - "iceberg_read_bytes", - "Total size of iceberg read requests", - &["table_name"], - registry - ) - .unwrap(); - - Self { iceberg_read_bytes } - } - - pub fn for_test() -> Arc { - Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone()) - } -} - -pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock = - LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY)); diff --git a/src/batch/src/task/env.rs b/src/batch/src/task/env.rs index cf13192d42ce7..c4d6f7483b2ec 100644 --- a/src/batch/src/task/env.rs +++ b/src/batch/src/task/env.rs @@ -17,14 +17,13 @@ use std::sync::Arc; use risingwave_common::config::{BatchConfig, MetricLevel}; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::worker_util::WorkerNodeId; +use risingwave_connector::source::iceberg::IcebergScanMetrics; use risingwave_connector::source::monitor::SourceMetrics; use risingwave_dml::dml_manager::DmlManagerRef; use risingwave_rpc_client::ComputeClientPoolRef; use risingwave_storage::StateStoreImpl; -use crate::monitor::{ - BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics, IcebergScanMetrics, -}; +use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics}; use crate::task::BatchManager; /// The global environment for task execution. diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 5af3f3b1910f9..be125378253a2 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -18,7 +18,6 @@ use std::time::Duration; use risingwave_batch::monitor::{ GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS, - GLOBAL_ICEBERG_SCAN_METRICS, }; use risingwave_batch::rpc::service::task_service::BatchServiceImpl; use risingwave_batch::spill::spill_op::SpillOp; @@ -40,6 +39,7 @@ use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer}; +use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS; use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_dml::dml_manager::DmlManager; use risingwave_pb::common::WorkerType; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3c192795ae6a8..ccea9309035fc 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -121,6 +121,7 @@ rustls-native-certs = "0.8" rustls-pemfile = "2" rustls-pki-types = "1" rw_futures_util = { workspace = true } +scopeguard = "1" sea-schema = { version = "0.16", default-features = false, features = [ "discovery", "sqlx-postgres", @@ -137,7 +138,6 @@ strum = "0.26" strum_macros = "0.26" tempfile = "3" thiserror = "1" - thiserror-ext = { workspace = true } # To easiy get the type_name and impl IntoSql for rust_decimal, we fork the crate. # Another reason is that we are planning to refactor their IntoSql trait to allow specifying the type to convert. diff --git a/src/connector/src/source/iceberg/metrics.rs b/src/connector/src/source/iceberg/metrics.rs new file mode 100644 index 0000000000000..9f1e1b87bb4cd --- /dev/null +++ b/src/connector/src/source/iceberg/metrics.rs @@ -0,0 +1,46 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, LazyLock}; + +use prometheus::Registry; +use risingwave_common::metrics::LabelGuardedIntCounterVec; +use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; +use risingwave_common::register_guarded_int_counter_vec_with_registry; + +#[derive(Clone)] +pub struct IcebergScanMetrics { + pub iceberg_read_bytes: LabelGuardedIntCounterVec<1>, +} + +impl IcebergScanMetrics { + fn new(registry: &Registry) -> Self { + let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!( + "iceberg_read_bytes", + "Total size of iceberg read requests", + &["table_name"], + registry + ) + .unwrap(); + + Self { iceberg_read_bytes } + } + + pub fn for_test() -> Arc { + Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone()) + } +} + +pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock = + LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY)); diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 928cf56a4a353..808d2b544a323 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -14,12 +14,14 @@ pub mod parquet_file_handler; +mod metrics; use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use futures_async_stream::for_await; +use futures::StreamExt; +use futures_async_stream::{for_await, try_stream}; use iceberg::Catalog; use iceberg::expr::Predicate as IcebergPredicate; use iceberg::scan::FileScanTask; @@ -27,6 +29,8 @@ use iceberg::spec::{DataContentType, ManifestList}; use iceberg::table::Table; use itertools::Itertools; pub use parquet_file_handler::*; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array}; use risingwave_common::bail; use risingwave_common::catalog::{ ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, @@ -34,10 +38,11 @@ use risingwave_common::catalog::{ }; use risingwave_common::types::JsonbVal; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType; use serde::{Deserialize, Serialize}; -use tokio_stream::StreamExt; +pub use self::metrics::{GLOBAL_ICEBERG_SCAN_METRICS, IcebergScanMetrics}; use crate::connector_common::IcebergCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; @@ -165,6 +170,7 @@ impl IcebergFileScanTask { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct IcebergSplit { pub split_id: i64, + // TODO: remove this field. It seems not used. pub snapshot_id: i64, pub task: IcebergFileScanTask, } @@ -439,6 +445,7 @@ impl IcebergSplitEnumerator { Ok(vec![split]) } + /// List all files in the snapshot to check if there are deletes. pub async fn all_delete_parameters( table: &Table, snapshot_id: i64, @@ -510,6 +517,70 @@ impl IcebergSplitEnumerator { } } +pub struct IcebergScanOpts { + pub batch_size: usize, + pub need_seq_num: bool, + pub need_file_path_and_pos: bool, +} + +#[try_stream(ok = DataChunk, error = ConnectorError)] +pub async fn scan_task_to_chunk( + table: Table, + data_file_scan_task: FileScanTask, + IcebergScanOpts { + batch_size, + need_seq_num, + need_file_path_and_pos, + }: IcebergScanOpts, + metrics: Option>, +) { + let table_name = table.identifier().name().to_owned(); + + let mut read_bytes = scopeguard::guard(0, |read_bytes| { + if let Some(metrics) = metrics { + metrics + .iceberg_read_bytes + .with_guarded_label_values(&[&table_name]) + .inc_by(read_bytes as _); + } + }); + + let data_file_path = data_file_scan_task.data_file_path.clone(); + let data_sequence_number = data_file_scan_task.sequence_number; + + let reader = table.reader_builder().with_batch_size(batch_size).build(); + let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); + + // FIXME: what if the start position is not 0? The logic for index seems not correct. + let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate(); + + while let Some((index, record_batch)) = record_batch_stream.next().await { + let record_batch = record_batch?; + + let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + if need_seq_num { + let (mut columns, visibility) = chunk.into_parts(); + columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( + vec![data_sequence_number; visibility.len()], + )))); + chunk = DataChunk::from_parts(columns.into(), visibility) + }; + if need_file_path_and_pos { + let (mut columns, visibility) = chunk.into_parts(); + columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter( + vec![data_file_path.as_str(); visibility.len()], + )))); + let index_start = (index * batch_size) as i64; + columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( + (index_start..(index_start + visibility.len() as i64)).collect::>(), + )))); + chunk = DataChunk::from_parts(columns.into(), visibility) + } + *read_bytes += chunk.estimated_heap_size() as u64; + yield chunk; + } +} + #[derive(Debug)] pub struct IcebergFileReader {}