Skip to content

Commit

Permalink
Remove old log segment
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 6, 2024
1 parent 5edf4db commit 0b8463a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 102 deletions.
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use url::Url;

use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar};
use crate::expressions::{Expression, ExpressionRef, Scalar};
use crate::features::ColumnMappingMode;
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
Expand Down
103 changes: 2 additions & 101 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
//!
use std::cmp::Ordering;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use url::Url;
Expand All @@ -19,106 +20,6 @@ use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileSystemClient, Version};

const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";

#[derive(Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct LogSegment {
log_root: Url,
/// Reverse order sorted commit files in the log segment
pub(crate) commit_files: Vec<FileMeta>,
/// checkpoint files in the log segment.
pub(crate) checkpoint_files: Vec<FileMeta>,
}

impl LogSegment {
/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
/// The boolean flags indicates whether the data was read from
/// a commit file (true) or a checkpoint file (false).
///
/// `read_schema` is the schema to read the log files with. This can be used
/// to project the log files to a subset of the columns.
///
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
/// query's predicate, but rather a predicate for filtering log files themselves.
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn replay(
&self,
engine: &dyn Engine,
commit_read_schema: SchemaRef,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let json_client = engine.get_json_handler();
let commit_stream = json_client
.read_json_files(
&self.commit_files,
commit_read_schema,
meta_predicate.clone(),
)?
.map_ok(|batch| (batch, true));

let parquet_client = engine.get_parquet_handler();
let checkpoint_stream = parquet_client
.read_parquet_files(
&self.checkpoint_files,
checkpoint_read_schema,
meta_predicate,
)?
.map_ok(|batch| (batch, false));

let batches = commit_stream.chain(checkpoint_stream);

Ok(batches)
}

fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<Option<(Metadata, Protocol)>> {
let data_batches = self.replay_for_metadata(engine)?;
let mut metadata_opt: Option<Metadata> = None;
let mut protocol_opt: Option<Protocol> = None;
for batch in data_batches {
let (batch, _) = batch?;
if metadata_opt.is_none() {
metadata_opt = crate::actions::Metadata::try_new_from_data(batch.as_ref())?;
}
if protocol_opt.is_none() {
protocol_opt = crate::actions::Protocol::try_new_from_data(batch.as_ref())?;
}
if metadata_opt.is_some() && protocol_opt.is_some() {
// we've found both, we can stop
break;
}
}
match (metadata_opt, protocol_opt) {
(Some(m), Some(p)) => Ok(Some((m, p))),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
_ => Err(Error::MissingMetadataAndProtocol),
}
}

// Factored out to facilitate testing
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| {
Some(Arc::new(Expr::or(
Expr::column([METADATA_NAME, "id"]).is_not_null(),
Expr::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(),
)))
});
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}
}

// TODO expose methods for accessing the files of a table (with file pruning).
/// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists
/// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they
Expand Down

0 comments on commit 0b8463a

Please sign in to comment.