Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added tests for remaining cases and Binary/Text support
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 17, 2022
1 parent 16a3415 commit 5050ded
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 74 deletions.
113 changes: 101 additions & 12 deletions arrow-odbc-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(test)]
use arrow2::array::Int32Array;
use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::io::odbc::api::{Connection, Cursor, Environment, Error as OdbcError};
use arrow2::io::odbc::{buffer_from_metadata, deserialize, infer_schema};
Expand All @@ -17,18 +18,111 @@ const MSSQL: &str =
"Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;";

#[test]
fn test() -> Result<()> {
// Given a table with a single string
fn int() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])];

test(expected, "INT", "(1)", table_name)
}

#[test]
fn int_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(Int32Array::from([Some(1), None])) as _,
])];

test(expected, "INT", "(1),(NULL)", table_name)
}

#[test]
fn bool() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BooleanArray::from_slice([true])) as _
])];

test(expected, "BIT", "(1)", table_name)
}

#[test]
fn bool_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BooleanArray::from([Some(true), None])) as _,
])];

test(expected, "BIT", "(1),(NULL)", table_name)
}

#[test]
fn binary() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BinaryArray::<i32>::from([Some(b"ab")])) as _,
])];

test(
expected,
"VARBINARY(2)",
"(CAST('ab' AS VARBINARY(2)))",
table_name,
)
}

#[test]
fn binary_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected =
vec![Chunk::new(vec![
Box::new(BinaryArray::<i32>::from([Some(b"ab"), None, Some(b"ac")])) as _,
])];

test(
expected,
"VARBINARY(2)",
"(CAST('ab' AS VARBINARY(2))),(NULL),(CAST('ac' AS VARBINARY(2)))",
table_name,
)
}

#[test]
fn utf8_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected =
vec![Chunk::new(vec![
Box::new(Utf8Array::<i32>::from([Some("ab"), None, Some("ac")])) as _,
])];

test(expected, "VARCHAR(2)", "('ab'),(NULL),('ac')", table_name)
}

fn test(
expected: Vec<Chunk<Box<dyn Array>>>,
type_: &str,
insert: &str,
table_name: &str,
) -> Result<()> {
let connection = ENV.connect_with_connection_string(MSSQL).unwrap();
setup_empty_table(&connection, table_name, &["INT"]).unwrap();
setup_empty_table(&connection, table_name, &[type_]).unwrap();
connection
.execute(&format!("INSERT INTO {table_name} (a) VALUES (1)"), ())
.execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ())
.unwrap();

// When
let query = format!("SELECT a FROM {table_name} ORDER BY id");
let mut a = connection.prepare(&query).unwrap();

let chunks = read(&connection, &query)?.1;

assert_eq!(chunks, expected);
Ok(())
}

fn read(
connection: &Connection<'_>,
query: &str,
) -> Result<(Vec<Field>, Vec<Chunk<Box<dyn Array>>>)> {
let mut a = connection.prepare(query).unwrap();
let fields = infer_schema(&a)?;

let max_batch_size = 100;
Expand All @@ -49,12 +143,7 @@ fn test() -> Result<()> {
chunks.push(Chunk::new(arrays));
}

assert_eq!(
chunks,
vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])]
);

Ok(())
Ok((fields, chunks))
}

/// Creates the table and assures it is empty. Columns are named a,b,c, etc.
Expand Down
169 changes: 107 additions & 62 deletions src/io/odbc/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use odbc_api::Bit;

use crate::array::{Array, BooleanArray, PrimitiveArray};
use crate::array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::types::NativeType;

Expand All @@ -11,96 +12,140 @@ use super::api::buffers::AnyColumnView;
/// This is CPU-bounded
pub fn deserialize(column: AnyColumnView, data_type: DataType) -> Box<dyn Array> {
match column {
AnyColumnView::Text(_) => todo!(),
AnyColumnView::Text(slice) => Box::new(utf8(
data_type,
slice.values(),
slice.lengths(),
slice.max_len(),
)) as _,
AnyColumnView::WText(_) => todo!(),
AnyColumnView::Binary(_) => todo!(),
AnyColumnView::Binary(slice) => Box::new(binary(
data_type,
slice.values(),
slice.lengths(),
slice.max_len(),
)) as _,
AnyColumnView::Date(_) => todo!(),
AnyColumnView::Time(_) => todo!(),
AnyColumnView::Timestamp(_) => todo!(),
AnyColumnView::F64(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::F32(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::I8(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::I16(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::I32(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::I64(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::U8(values) => Box::new(deserialize_p(data_type, values)) as _,
AnyColumnView::Bit(values) => Box::new(deserialize_bool(data_type, values)) as _,
AnyColumnView::F64(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::F32(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::I8(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::I16(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::I32(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::I64(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::U8(values) => Box::new(p(data_type, values)) as _,
AnyColumnView::Bit(values) => Box::new(bool(data_type, values)) as _,
AnyColumnView::NullableDate(_) => todo!(),
AnyColumnView::NullableTime(_) => todo!(),
AnyColumnView::NullableTimestamp(_) => todo!(),
AnyColumnView::NullableF64(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableF32(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableI8(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableI16(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableI32(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableI64(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableU8(slice) => Box::new(deserialize_p_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableBit(slice) => Box::new(deserialize_bool_optional(
data_type,
slice.values(),
slice.indicators(),
)) as _,
AnyColumnView::NullableF64(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableF32(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableI8(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableI16(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableI32(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableI64(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableU8(slice) => {
Box::new(p_optional(data_type, slice.values(), slice.indicators())) as _
}
AnyColumnView::NullableBit(slice) => {
Box::new(bool_optional(data_type, slice.values(), slice.indicators())) as _
}
}
}

fn deserialize_bitmap(values: &[isize]) -> Option<Bitmap> {
fn bitmap(values: &[isize]) -> Option<Bitmap> {
MutableBitmap::from_trusted_len_iter(values.iter().map(|x| *x != -1)).into()
}

fn deserialize_p<T: NativeType>(data_type: DataType, values: &[T]) -> PrimitiveArray<T> {
fn p<T: NativeType>(data_type: DataType, values: &[T]) -> PrimitiveArray<T> {
PrimitiveArray::from_data(data_type, values.to_vec().into(), None)
}

fn deserialize_p_optional<T: NativeType>(
fn p_optional<T: NativeType>(
data_type: DataType,
values: &[T],
indicators: &[isize],
) -> PrimitiveArray<T> {
let validity = deserialize_bitmap(indicators);
let validity = bitmap(indicators);
PrimitiveArray::from_data(data_type, values.to_vec().into(), validity)
}

fn deserialize_bool(data_type: DataType, values: &[Bit]) -> BooleanArray {
fn bool(data_type: DataType, values: &[Bit]) -> BooleanArray {
let values = values.iter().map(|x| x.as_bool());
let values = Bitmap::from_trusted_len_iter(values);
BooleanArray::from_data(data_type, values, None)
}

fn deserialize_bool_optional(
data_type: DataType,
values: &[Bit],
indicators: &[isize],
) -> BooleanArray {
let validity = deserialize_bitmap(indicators);
fn bool_optional(data_type: DataType, values: &[Bit], indicators: &[isize]) -> BooleanArray {
let validity = bitmap(indicators);
let values = values.iter().map(|x| x.as_bool());
let values = Bitmap::from_trusted_len_iter(values);
BooleanArray::from_data(data_type, values, validity)
}

fn binary_generic(
slice: &[u8],
lengths: &[isize],
max_length: usize,
null_terminator: usize,
) -> (Buffer<i32>, Buffer<u8>, Option<Bitmap>) {
let mut validity = MutableBitmap::with_capacity(lengths.len());

println!("{:?}", lengths);
println!("{:?}", slice);
let mut offsets = Vec::with_capacity(lengths.len() + 1);
offsets.push(0i32);
let mut length = 0;
offsets.extend(lengths.iter().map(|&indicator| {
validity.push(indicator != -1);
length += if indicator > 0 { indicator as i32 } else { 0 };
length
}));
// the loop above ensures monotonicity
// this proves boundness
assert!((length as usize) < slice.len());

let mut values = Vec::<u8>::with_capacity(length as usize);
offsets.windows(2).enumerate().for_each(|(index, x)| {
let len = (x[1] - x[0]) as usize;
let offset = index * (max_length + null_terminator);
// this bound check is not necessary
values.extend_from_slice(&slice[offset..offset + len])
});

// this O(N) check is not necessary

(offsets.into(), values.into(), validity.into())
}

fn binary(
data_type: DataType,
slice: &[u8],
lengths: &[isize],
max_length: usize,
) -> BinaryArray<i32> {
let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 0);

// this O(N) check is not necessary
BinaryArray::from_data(data_type, offsets, values, validity)
}

fn utf8(data_type: DataType, slice: &[u8], lengths: &[isize], max_length: usize) -> Utf8Array<i32> {
let (offsets, values, validity) = binary_generic(slice, lengths, max_length, 1);

// this O(N) check is necessary for the utf8 validity
Utf8Array::from_data(data_type, offsets, values, validity)
}

0 comments on commit 5050ded

Please sign in to comment.