diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 12afa0511..310258c30 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -9,7 +9,7 @@ use url::Url; use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor}; use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; -use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar}; +use crate::expressions::{Expression, ExpressionRef, Scalar}; use crate::features::ColumnMappingMode; use crate::scan::state::{DvInfo, Stats}; use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType}; diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index ec757d48a..4240e49df 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -3,8 +3,9 @@ //! use std::cmp::Ordering; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use url::Url; @@ -19,106 +20,6 @@ use crate::utils::require; use crate::{DeltaResult, Engine, Error, FileSystemClient, Version}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; - -#[derive(Debug)] -#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct LogSegment { - log_root: Url, - /// Reverse order sorted commit files in the log segment - pub(crate) commit_files: Vec, - /// checkpoint files in the log segment. - pub(crate) checkpoint_files: Vec, -} - -impl LogSegment { - /// Read a stream of log data from this log segment. - /// - /// The log files will be read from most recent to oldest. - /// The boolean flags indicates whether the data was read from - /// a commit file (true) or a checkpoint file (false). - /// - /// `read_schema` is the schema to read the log files with. This can be used - /// to project the log files to a subset of the columns. - /// - /// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the - /// query's predicate, but rather a predicate for filtering log files themselves. - #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] - fn replay( - &self, - engine: &dyn Engine, - commit_read_schema: SchemaRef, - checkpoint_read_schema: SchemaRef, - meta_predicate: Option, - ) -> DeltaResult, bool)>> + Send> { - let json_client = engine.get_json_handler(); - let commit_stream = json_client - .read_json_files( - &self.commit_files, - commit_read_schema, - meta_predicate.clone(), - )? - .map_ok(|batch| (batch, true)); - - let parquet_client = engine.get_parquet_handler(); - let checkpoint_stream = parquet_client - .read_parquet_files( - &self.checkpoint_files, - checkpoint_read_schema, - meta_predicate, - )? - .map_ok(|batch| (batch, false)); - - let batches = commit_stream.chain(checkpoint_stream); - - Ok(batches) - } - - fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult> { - let data_batches = self.replay_for_metadata(engine)?; - let mut metadata_opt: Option = None; - let mut protocol_opt: Option = None; - for batch in data_batches { - let (batch, _) = batch?; - if metadata_opt.is_none() { - metadata_opt = crate::actions::Metadata::try_new_from_data(batch.as_ref())?; - } - if protocol_opt.is_none() { - protocol_opt = crate::actions::Protocol::try_new_from_data(batch.as_ref())?; - } - if metadata_opt.is_some() && protocol_opt.is_some() { - // we've found both, we can stop - break; - } - } - match (metadata_opt, protocol_opt) { - (Some(m), Some(p)) => Ok(Some((m, p))), - (None, Some(_)) => Err(Error::MissingMetadata), - (Some(_), None) => Err(Error::MissingProtocol), - _ => Err(Error::MissingMetadataAndProtocol), - } - } - - // Factored out to facilitate testing - fn replay_for_metadata( - &self, - engine: &dyn Engine, - ) -> DeltaResult, bool)>> + Send> { - let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?; - // filter out log files that do not contain metadata or protocol information - use Expression as Expr; - static META_PREDICATE: LazyLock> = LazyLock::new(|| { - Some(Arc::new(Expr::or( - Expr::column([METADATA_NAME, "id"]).is_not_null(), - Expr::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(), - ))) - }); - // read the same protocol and metadata schema for both commits and checkpoints - self.replay(engine, schema.clone(), schema, META_PREDICATE.clone()) - } -} - // TODO expose methods for accessing the files of a table (with file pruning). /// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists /// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they