Skip to content

Commit

Permalink
[FIX] (WIP) casting of arrays from daft to arrow with unsigned
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 21, 2024
1 parent 074f460 commit 4159ac9
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{collections::HashMap, future::ready};

use common_daft_config::DaftExecutionConfig;
use daft_core::{series::Series, utils::arrow::cast_array_from_daft_if_needed};
use daft_schema::{dtype::DataType, field::Field, schema::Schema};
use daft_table::Table;
use futures::stream;
use spark_connect::{ExecutePlanResponse, Relation};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -49,7 +52,40 @@ impl Session {
let tables = result.get_tables()?;

for table in tables.as_slice() {
let response = context.gen_response(table)?;
// Inside the for loop over tables
let mut arrow_arrays = Vec::with_capacity(table.num_columns());
let mut column_names = Vec::with_capacity(table.num_columns());
let mut field_types = Vec::with_capacity(table.num_columns());

for i in 0..table.num_columns() {
let s = table.get_column_by_index(i)?;
let arrow_array = s.to_arrow();
let arrow_array =
cast_array_from_daft_if_needed(arrow_array.to_boxed());

// todo(correctness): logical types probably get **DESTROYED** here 💥😭😭
let daft_data_type = DataType::from(arrow_array.data_type());

// Store the actual type after potential casting
field_types.push(Field::new(s.name(), daft_data_type));
column_names.push(s.name().to_string());
arrow_arrays.push(arrow_array);
}

// Create new schema with actual types after casting
let new_schema = Schema::new(field_types)?;

// Convert arrays back to series
let series = arrow_arrays
.into_iter()
.zip(column_names)
.map(|(array, name)| Series::try_from((name.as_str(), array)))
.try_collect()?;

// Create table from series
let new_table = Table::new_with_size(new_schema, series, table.len())?;

let response = context.gen_response(&new_table)?;
tx.blocking_send(Ok(response)).unwrap();
}
}
Expand Down

0 comments on commit 4159ac9

Please sign in to comment.