Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved test fixtures #2749

Merged
merged 2 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ pub(crate) fn files_matching_predicate<'a>(
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
//let expr = logical_expr_to_physical_expr(predicate, snapshot.arrow_schema()?.as_ref());
let expr = SessionContext::new()
.create_physical_expr(predicate, &snapshot.arrow_schema()?.to_dfschema()?)?;
let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?;
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,9 @@ impl DeltaTableError {
);
Self::NotATable(msg)
}

/// Create a [Generic](DeltaTableError::Generic) error with the given message.
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
}
15 changes: 3 additions & 12 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
// use std::io::{Cursor, Read};
// use std::sync::Arc;

// use roaring::RoaringTreemap;
use crate::DeltaConfigKey;
use maplit::hashset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use url::Url;

use super::schema::StructType;
use crate::kernel::{error::Error, DeltaResult};
use crate::DeltaConfigKey;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
/// Defines a file format used in table
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct Format {
/// Name of the encoding for files in this table
pub provider: String,
Expand Down Expand Up @@ -1126,15 +1123,9 @@ pub(crate) mod serde_path {
#[cfg(test)]
mod tests {
use std::path::PathBuf;
// use std::sync::Arc;

// use object_store::local::LocalFileSystem;

use crate::kernel::PrimitiveType;

use super::*;
// use crate::client::filesystem::ObjectStoreFileSystemClient;
// use crate::executor::tokio::TokioBackgroundExecutor;
use crate::kernel::PrimitiveType;

fn dv_relateive() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
Expand Down
53 changes: 53 additions & 0 deletions crates/core/src/kernel/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use delta_kernel::{
schema::StructField,
};
use object_store::path::Path;
#[cfg(test)]
use serde_json::Value;
use std::cmp::Ordering;
use urlencoding::encode;

Expand All @@ -21,6 +23,9 @@ pub trait ScalarExt: Sized {
fn serialize_encoded(&self) -> String;
/// Create a [`Scalar`] from an arrow array row
fn from_array(arr: &dyn Array, index: usize) -> Option<Self>;
/// Serialize as serde_json::Value
#[cfg(test)]
fn to_json(&self) -> serde_json::Value;
}

impl ScalarExt for Scalar {
Expand Down Expand Up @@ -218,6 +223,54 @@ impl ScalarExt for Scalar {
| Null => None,
}
}

/// Serializes this scalar as a serde_json::Value.
#[cfg(test)]
fn to_json(&self) -> serde_json::Value {
match self {
Self::String(s) => Value::String(s.to_owned()),
Self::Byte(b) => Value::Number(serde_json::Number::from(*b)),
Self::Short(s) => Value::Number(serde_json::Number::from(*s)),
Self::Integer(i) => Value::Number(serde_json::Number::from(*i)),
Self::Long(l) => Value::Number(serde_json::Number::from(*l)),
Self::Float(f) => Value::Number(serde_json::Number::from_f64(*f as f64).unwrap()),
Self::Double(d) => Value::Number(serde_json::Number::from_f64(*d).unwrap()),
Self::Boolean(b) => Value::Bool(*b),
Self::TimestampNtz(ts) | Self::Timestamp(ts) => {
let ts = Utc.timestamp_micros(*ts).single().unwrap();
Value::String(ts.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
}
Self::Date(days) => {
let date = DateTime::from_timestamp(*days as i64 * 24 * 3600, 0).unwrap();
Value::String(date.format("%Y-%m-%d").to_string())
}
Self::Decimal(value, _, scale) => match scale.cmp(&0) {
Ordering::Equal => Value::String(value.to_string()),
Ordering::Greater => {
let scalar_multiple = 10_i128.pow(*scale as u32);
let mut s = String::new();
s.push_str((value / scalar_multiple).to_string().as_str());
s.push('.');
s.push_str(&format!(
"{:0>scale$}",
value % scalar_multiple,
scale = *scale as usize
));
Value::String(s)
}
Ordering::Less => {
let mut s = value.to_string();
for _ in 0..*scale {
s.push('0');
}
Value::String(s)
}
},
Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())),
Self::Null(_) => Value::Null,
Self::Struct(_) => unimplemented!(),
}
}
}

fn create_escaped_binary_string(data: &[u8]) -> String {
Expand Down
95 changes: 94 additions & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,31 @@ impl Snapshot {
),
]))
}

/// Get the partition values schema of the snapshot
pub fn partitions_schema(
&self,
table_schema: Option<&StructType>,
) -> DeltaResult<Option<StructType>> {
if self.metadata().partition_columns.is_empty() {
return Ok(None);
}
let schema = table_schema.unwrap_or_else(|| self.schema());
Ok(Some(StructType::new(
self.metadata
.partition_columns
.iter()
.map(|col| {
schema.field(col).map(|field| field.clone()).ok_or_else(|| {
DeltaTableError::Generic(format!(
"Partition column {} not found in schema",
col
))
})
})
.collect::<Result<Vec<_>, _>>()?,
)))
}
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
Expand All @@ -369,7 +394,7 @@ pub struct EagerSnapshot {

// NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because
// we do not yet enforce a consistent schema across all batches we read from the log.
files: Vec<RecordBatch>,
pub(crate) files: Vec<RecordBatch>,
}

impl EagerSnapshot {
Expand Down Expand Up @@ -752,6 +777,7 @@ mod tests {
use super::*;
use crate::kernel::Remove;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::test_utils::ActionFactory;

#[tokio::test]
async fn test_snapshots() -> TestResult {
Expand Down Expand Up @@ -956,4 +982,71 @@ mod tests {

Ok(())
}

#[test]
fn test_partition_schema() {
let schema = StructType::new(vec![
StructField::new("id", DataType::LONG, true),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
]);

let partition_columns = vec!["date".to_string()];
let metadata = ActionFactory::metadata(&schema, Some(&partition_columns), None);
let protocol = ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>);

let commit_data = CommitData::new(
vec![
Action::Protocol(protocol.clone()),
Action::Metadata(metadata.clone()),
],
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: Some(partition_columns),
predicate: None,
},
HashMap::new(),
vec![],
);
let (log_segment, _) = LogSegment::new_test(vec![&commit_data]).unwrap();

let snapshot = Snapshot {
log_segment: log_segment.clone(),
protocol: protocol.clone(),
metadata,
schema: schema.clone(),
table_url: "table".to_string(),
config: Default::default(),
};

let expected = StructType::new(vec![StructField::new("date", DataType::DATE, true)]);
assert_eq!(snapshot.partitions_schema(None).unwrap(), Some(expected));

let metadata = ActionFactory::metadata(&schema, None::<Vec<&str>>, None);
let commit_data = CommitData::new(
vec![
Action::Protocol(protocol.clone()),
Action::Metadata(metadata.clone()),
],
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: None,
predicate: None,
},
HashMap::new(),
vec![],
);
let (log_segment, _) = LogSegment::new_test(vec![&commit_data]).unwrap();

let snapshot = Snapshot {
log_segment,
config: Default::default(),
protocol: protocol.clone(),
metadata,
schema: schema.clone(),
table_url: "table".to_string(),
};

assert_eq!(snapshot.partitions_schema(None).unwrap(), None);
}
}
3 changes: 3 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub mod schema;
pub mod storage;
pub mod table;

#[cfg(test)]
pub mod test_utils;

#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
pub mod writer;
Expand Down
Loading
Loading