Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move protocol validation to TableConfiguration #665

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl From<Error> for KernelError {
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
Error::ChangeDataFeedUnsupported(_, _) => KernelError::ChangeDataFeedUnsupported,
Error::ChangeDataFeedIncompatibleSchema(_, _) => {
KernelError::ChangeDataFeedIncompatibleSchema
}
Expand Down
291 changes: 12 additions & 279 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
//! Provides parsing and manipulation of the various actions defined in the [Delta
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

use std::any::type_name;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::str::FromStr;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::LazyLock;

use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::schema::{SchemaRef, StructType};
use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
use crate::table_features::{ReaderFeatures, WriterFeatures};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor as _};
use crate::{DeltaResult, EngineData, RowVisitor as _};
use visitors::{MetadataVisitor, ProtocolVisitor};

use delta_kernel_derive::Schema;
Expand Down Expand Up @@ -149,53 +143,37 @@ impl Metadata {
pub struct Protocol {
/// The minimum version of the Delta read protocol that a client must implement
/// in order to correctly read this table
min_reader_version: i32,
pub(crate) min_reader_version: i32,
/// The minimum version of the Delta write protocol that a client must implement
/// in order to correctly write this table
min_writer_version: i32,
pub(crate) min_writer_version: i32,
/// A collection of features that a client must implement in order to correctly
/// read this table (exist only when minReaderVersion is set to 3)
#[serde(skip_serializing_if = "Option::is_none")]
reader_features: Option<Vec<String>>,
pub(crate) reader_features: Option<Vec<String>>,
/// A collection of features that a client must implement in order to correctly
/// write this table (exist only when minWriterVersion is set to 7)
#[serde(skip_serializing_if = "Option::is_none")]
writer_features: Option<Vec<String>>,
pub(crate) writer_features: Option<Vec<String>>,
}

impl Protocol {
/// Try to create a new Protocol instance from reader/writer versions and table features. This
/// can fail if the protocol is invalid.
pub fn try_new(
pub fn new(
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<impl IntoIterator<Item = impl Into<String>>>,
writer_features: Option<impl IntoIterator<Item = impl Into<String>>>,
) -> DeltaResult<Self> {
if min_reader_version == 3 {
require!(
reader_features.is_some(),
Error::invalid_protocol(
"Reader features must be present when minimum reader version = 3"
)
);
}
if min_writer_version == 7 {
require!(
writer_features.is_some(),
Error::invalid_protocol(
"Writer features must be present when minimum writer version = 7"
)
);
}
) -> Self {
let reader_features = reader_features.map(|f| f.into_iter().map(Into::into).collect());
let writer_features = writer_features.map(|f| f.into_iter().map(Into::into).collect());
Ok(Protocol {
Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
})
}
}

/// Create a new Protocol by visiting the EngineData and extracting the first protocol row into
Expand Down Expand Up @@ -237,92 +215,9 @@ impl Protocol {
self.writer_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
}

/// Check if reading a table with this protocol is supported. That is: does the kernel support
/// the specified protocol reader version and all enabled reader features? If yes, returns unit
/// type, otherwise will return an error.
pub fn ensure_read_supported(&self) -> DeltaResult<()> {
match &self.reader_features {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if self.min_reader_version == 3 => {
ensure_supported_features(reader_features, &SUPPORTED_READER_FEATURES)
}
// if min_reader_version = 3 and no reader features => ERROR
// NOTE this is caught by the protocol parsing.
None if self.min_reader_version == 3 => Err(Error::internal_error(
"Reader features must be present when minimum reader version = 3",
)),
// if min_reader_version = 1,2 and there are no reader features => OK
None if self.min_reader_version == 1 || self.min_reader_version == 2 => Ok(()),
// if min_reader_version = 1,2 and there are reader features => ERROR
// NOTE this is caught by the protocol parsing.
Some(_) if self.min_reader_version == 1 || self.min_reader_version == 2 => {
Err(Error::internal_error(
"Reader features must not be present when minimum reader version = 1 or 2",
))
}
// any other min_reader_version is not supported
_ => Err(Error::Unsupported(format!(
"Unsupported minimum reader version {}",
self.min_reader_version
))),
}
}

/// Check if writing to a table with this protocol is supported. That is: does the kernel
/// support the specified protocol writer version and all enabled writer features?
pub fn ensure_write_supported(&self) -> DeltaResult<()> {
match &self.writer_features {
// if min_reader_version = 3 and min_writer_version = 7 and all writer features are
// supported => OK
Some(writer_features)
if self.min_reader_version == 3 && self.min_writer_version == 7 =>
{
ensure_supported_features(writer_features, &SUPPORTED_WRITER_FEATURES)
}
// otherwise not supported
_ => Err(Error::unsupported(
"Only tables with min reader version 3 and min writer version 7 with no table features are supported."
)),
}
}
}

// given unparsed `table_features`, parse and check if they are subset of `supported_features`
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
where
<T as FromStr>::Err: Display,
T: Debug + FromStr + Hash + Eq,
{
let error = |unsupported, unsupported_or_unknown| {
let supported = supported_features.iter().collect::<Vec<_>>();
let features_type = type_name::<T>()
.rsplit("::")
.next()
.unwrap_or("table features");
Error::Unsupported(format!(
"{} {} {:?}. Supported {} are {:?}",
unsupported_or_unknown, features_type, unsupported, features_type, supported
))
};
let parsed_features: HashSet<T> = table_features
.iter()
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
.collect::<Result<_, Error>>()?;
parsed_features
.is_subset(supported_features)
.then_some(())
.ok_or_else(|| {
let unsupported = parsed_features
.difference(supported_features)
.map(|f| format!("{:?}", f))
.collect::<Vec<_>>();
error(unsupported, "Unsupported")
})
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
Expand Down Expand Up @@ -696,166 +591,4 @@ mod tests {
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_validate_protocol() {
let invalid_protocols = [
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: Some(vec![]),
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: None,
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: None,
},
];
for Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
} in invalid_protocols
{
assert!(matches!(
Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features
),
Err(Error::InvalidProtocol(_)),
));
}
}

#[test]
fn test_v2_checkpoint_unsupported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());

let protocol = Protocol::try_new(
4,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
}

#[test]
fn test_ensure_read_supported() {
let protocol = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: Some(vec![]),
};
assert!(protocol.ensure_read_supported().is_ok());

let empty_features: [String; 0] = [];
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());

let protocol = Protocol::try_new(
3,
7,
Some(&empty_features),
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());

let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 7,
reader_features: None,
writer_features: None,
};
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol {
min_reader_version: 2,
min_writer_version: 7,
reader_features: None,
writer_features: None,
};
assert!(protocol.ensure_read_supported().is_ok());
}

#[test]
fn test_ensure_write_supported() {
let protocol = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: Some(vec![]),
};
assert!(protocol.ensure_write_supported().is_ok());

let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
Some([WriterFeatures::DeletionVectors]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
}

#[test]
fn test_ensure_supported_features() {
let supported_features = [
ReaderFeatures::ColumnMapping,
ReaderFeatures::DeletionVectors,
]
.into_iter()
.collect();
let table_features = vec![ReaderFeatures::ColumnMapping.to_string()];
ensure_supported_features(&table_features, &supported_features).unwrap();

// test unknown features
let table_features = vec![ReaderFeatures::ColumnMapping.to_string(), "idk".to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
match error {
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [ColumnMapping, DeletionVectors]"
=> {},
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [DeletionVectors, ColumnMapping]"
=> {},
_ => panic!("Expected unsupported error"),
}
}
}
Loading
Loading