Skip to content

Commit

Permalink
hack
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Dec 16, 2024
1 parent 13ce62a commit e79f02c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 60 deletions.
4 changes: 2 additions & 2 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ impl FileOpener for ParquetOpener {

let stream = builder.with_batch_size(batch_size).build()?;

println!("START");
tokio::time::sleep(std::time::Duration::from_millis(10000)).await; // simulate IO delay
// println!("read IO");
// tokio::time::sleep(std::time::Duration::from_millis(10000)).await; // simulate IO delay

let stream = stream.map(move |rbr| {
// re-order each batch if needed
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/default/parquet2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn read_files(
// let semaphore = Arc::new(Semaphore::new(TASK_LIMIT));
let (tx, rx) = mpsc::channel();

println!("files len: {:?}", files.len());
// println!("files len: {:?}", files.len());

let file_opener: Arc<dyn FileOpener + Send + Sync> = Arc::from(file_opener);
let len = files.len();
Expand All @@ -97,12 +97,12 @@ fn read_files(
let tx_clone = tx.clone();

let f = file_opener.clone();
println!("file: {:?}", file);
// println!("file: {:?}", file);
let handle = runtime.spawn(async move {
// read the file as stream
let mut stream = f.open(file.clone(), None).unwrap().await.unwrap();
// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
println!("HERE");
// println!("sleeping");
while let Some(result) = stream.next().await {
let _ = tx_clone.send(result);
}
Expand Down
123 changes: 68 additions & 55 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,63 +471,76 @@ impl Scan {
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>>
.flatten_ok();

let result = scan_files_iter
.map(move |scan_file| -> DeltaResult<_> {
let scan_file = scan_file?;
let file_path = table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine.as_ref(), &table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
location: file_path,
let mut file_metas = Vec::new();
let mut partition_values_vec = Vec::new();
let mut selection_vectors = Vec::new();

for scan_file_res in scan_files_iter {
let scan_file = scan_file_res?;
let file_path = table_root.join(&scan_file.path)?;
let selection_vector = scan_file
.dv_info
.get_selection_vector(engine.as_ref(), &table_root)?;

let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
location: file_path,
};

file_metas.push(meta);
partition_values_vec.push(scan_file.partition_values);
selection_vectors.push(selection_vector);
}

// read all files at once
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&file_metas,
global_state.physical_schema.clone(),
physical_predicate.clone(),
)?;

let engine_clone = engine.clone();
let global_state_clone = global_state.clone();
let all_fields_clone = all_fields.clone();

let mut selection_vectors_iter = selection_vectors.into_iter();
let mut partition_values_iter = partition_values_vec.into_iter();

let result = read_result_iter
.enumerate()
.map(move |(i, read_result)| -> DeltaResult<_> {
let read_result = read_result?; // this is the parquet batch for the i-th file

// let partition_values = partition_values_iter
// .nth(i)
// .expect("Partition values length mismatch with read_result_iter");
// let mut selection_vector = selection_vectors_iter
// .nth(i)
// .expect("Selection vector length mismatch with read_result_iter");

let logical = transform_to_logical_internal(
engine_clone.as_ref(),
read_result,
&global_state_clone,
&HashMap::new(),
&all_fields_clone,
have_partition_cols,
);

// let len = logical.as_ref().map_or(0, |res| res.len());
// let mut sv = selection_vector.take();
// let rest = split_vector(sv.as_mut(), len, None);

let scan_result = ScanResult {
raw_data: logical,
raw_mask: None,
};

// WARNING: We validated the physical predicate against a schema that includes
// partition columns, but the read schema we use here does _NOT_ include partition
// columns. So we cannot safely assume that all column references are valid. See
// https://github.com/delta-io/delta-kernel-rs/issues/434 for more details.
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.physical_schema.clone(),
physical_predicate.clone(),
)?;

// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
let all_fields = all_fields.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
engine.as_ref(),
read_result,
&global_state,
&scan_file.partition_values,
&all_fields,
have_partition_cols,
);
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
// trying to return a captured variable. We're going to reassign `selection_vector`
// to `rest` in a moment anyway
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
raw_mask: sv,
};
selection_vector = rest;
Ok(result)
}))
})
// Iterator<DeltaResult<Iterator<DeltaResult<ScanResult>>>> to Iterator<DeltaResult<DeltaResult<ScanResult>>>
.flatten_ok()
// Iterator<DeltaResult<DeltaResult<ScanResult>>> to Iterator<DeltaResult<ScanResult>>
.map(|x| x?);

Ok(scan_result)
});

Ok(result)
}
}
Expand Down

0 comments on commit e79f02c

Please sign in to comment.