Skip to content

Commit

Permalink
Rename Array -> ArrayData (#1316)
Browse files Browse the repository at this point in the history
:)
  • Loading branch information
gatesn authored Nov 15, 2024
1 parent 7585509 commit c31ae9d
Show file tree
Hide file tree
Showing 216 changed files with 1,124 additions and 1,052 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vortex::dtype::{DType, Nullability};
use vortex::ipc::stream_reader::StreamArrayReader;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::validity::Validity;
use vortex::{Context, IntoArray, IntoCanonical};
use vortex::{Context, IntoArrayData, IntoCanonical};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
Expand Down
16 changes: 8 additions & 8 deletions bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical};
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoCanonical};

use crate::tokio_runtime::TOKIO_RUNTIME;

Expand Down Expand Up @@ -100,7 +100,7 @@ fn parquet_decompress_read(buf: bytes::Bytes) -> usize {
nbytes
}

fn parquet_compressed_written_size(array: &Array, compression: Compression) -> usize {
fn parquet_compressed_written_size(array: &ArrayData, compression: Compression) -> usize {
let chunked = ChunkedArray::try_from(array).unwrap();
let (batches, schema) = chunked_to_vec_record_batch(chunked);
parquet_compress_write(batches, schema, compression, &mut Vec::new())
Expand All @@ -109,10 +109,10 @@ fn parquet_compressed_written_size(array: &Array, compression: Compression) -> u
fn vortex_compress_write(
runtime: &Runtime,
compressor: &SamplingCompressor<'_>,
array: &Array,
array: &ArrayData,
buf: &mut Vec<u8>,
) -> VortexResult<u64> {
async fn async_write(array: &Array, cursor: &mut Cursor<&mut Vec<u8>>) -> VortexResult<()> {
async fn async_write(array: &ArrayData, cursor: &mut Cursor<&mut Vec<u8>>) -> VortexResult<()> {
let mut writer = VortexFileWriter::new(cursor);

writer = writer.write_array_columns(array.clone()).await?;
Expand All @@ -129,7 +129,7 @@ fn vortex_compress_write(
}

fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<ArrayRef> {
async fn async_read(buf: Buffer) -> VortexResult<Array> {
async fn async_read(buf: Buffer) -> VortexResult<ArrayData> {
let builder: VortexReadBuilder<_> = VortexReadBuilder::new(
buf,
LayoutDeserializer::new(
Expand All @@ -140,7 +140,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<ArrayR

let stream = builder.build().await?;
let dtype = stream.dtype().clone();
let vecs: Vec<Array> = stream.try_collect().await?;
let vecs: Vec<ArrayData> = stream.try_collect().await?;

ChunkedArray::try_new(vecs, dtype).map(|e| e.into())
}
Expand All @@ -154,7 +154,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<ArrayR
fn vortex_compressed_written_size(
runtime: &Runtime,
compressor: &SamplingCompressor<'_>,
array: &Array,
array: &ArrayData,
) -> VortexResult<u64> {
vortex_compress_write(runtime, compressor, array, &mut Vec::new())
}
Expand All @@ -168,7 +168,7 @@ fn benchmark_compress<F, U>(
bench_name: &str,
) where
F: Fn() -> U,
U: AsRef<Array>,
U: AsRef<ArrayData>,
{
// if no logging is enabled, enable it
if !LOG_INITIALIZED.swap(true, Ordering::SeqCst) {
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/compressor_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use vortex::sampling_compressor::compressors::zigzag::ZigZagCompressor;
use vortex::sampling_compressor::compressors::CompressorRef;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::validity::Validity;
use vortex::{IntoArray as _, IntoCanonical};
use vortex::{IntoArrayData as _, IntoCanonical};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/benches/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vortex::sampling_compressor::compressors::dict::DictCompressor;
use vortex::sampling_compressor::compressors::r#for::FoRCompressor;
use vortex::sampling_compressor::compressors::CompressorRef;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{Array, Context};
use vortex::{ArrayData, Context};
use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions};

pub static CTX: LazyLock<Context> = LazyLock::new(|| {
Expand Down Expand Up @@ -79,7 +79,7 @@ fn toy_dataset_arrow() -> RecordBatch {
.unwrap()
}

fn toy_dataset_vortex(compress: bool) -> Array {
fn toy_dataset_vortex(compress: bool) -> ArrayData {
let uncompressed = toy_dataset_arrow().try_into().unwrap();

if !compress {
Expand Down
14 changes: 7 additions & 7 deletions bench-vortex/src/bin/notimplemented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use vortex::runend_bool::RunEndBoolArray;
use vortex::scalar::ScalarValue;
use vortex::validity::Validity;
use vortex::zigzag::ZigZagArray;
use vortex::{Array, IntoArray};
use vortex::{ArrayData, IntoArrayData};

const OPERATORS: [Operator; 6] = [
Operator::Lte,
Expand All @@ -35,7 +35,7 @@ const OPERATORS: [Operator; 6] = [
Operator::NotEq,
];

fn fsst_array() -> Array {
fn fsst_array() -> ArrayData {
let input_array = varbin_array();
let compressor = fsst_train_compressor(&input_array).unwrap();

Expand All @@ -44,7 +44,7 @@ fn fsst_array() -> Array {
.into_array()
}

fn varbin_array() -> Array {
fn varbin_array() -> ArrayData {
let mut input_array = VarBinBuilder::<i32>::with_capacity(3);
input_array.push_value(b"The Greeks never said that the limit could not be overstepped");
input_array.push_value(
Expand All @@ -56,7 +56,7 @@ fn varbin_array() -> Array {
.into_array()
}

fn varbinview_array() -> Array {
fn varbinview_array() -> ArrayData {
VarBinViewArray::from_iter_str(vec![
"The Greeks never said that the limit could not be overstepped",
"They said it existed and that whoever dared to exceed it was mercilessly struck down",
Expand All @@ -65,7 +65,7 @@ fn varbinview_array() -> Array {
.into_array()
}

fn enc_impls() -> Vec<Array> {
fn enc_impls() -> Vec<ArrayData> {
vec![
ALPArray::try_new(
PrimitiveArray::from(vec![1]).into_array(),
Expand Down Expand Up @@ -175,7 +175,7 @@ fn bool_to_cell(val: bool) -> Cell {
Cell::new(if val { "✓" } else { "𐄂" }).style_spec(style)
}

fn compute_funcs(encodings: &[Array]) {
fn compute_funcs(encodings: &[ArrayData]) {
let mut table = Table::new();
table.add_row(Row::new(
vec![
Expand Down Expand Up @@ -214,7 +214,7 @@ fn compute_funcs(encodings: &[Array]) {
table.printstd();
}

fn compare_funcs(encodings: &[Array]) {
fn compare_funcs(encodings: &[ArrayData]) {
for arr in encodings {
println!("\nArray {} compare functions", arr.encoding().id().as_ref());
let mut table = Table::new();
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::io::TokioAdapter;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::{Array, IntoArray};
use vortex::{ArrayData, IntoArrayData};

use crate::idempotent;
use crate::reader::BATCH_SIZE;
Expand Down Expand Up @@ -46,7 +46,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
let array = ChunkedArray::try_new(
reader
.into_iter()
.map(|batch_result| Array::try_from(batch_result.unwrap()).unwrap())
.map(|batch_result| ArrayData::try_from(batch_result.unwrap()).unwrap())
.collect(),
dtype,
)
Expand Down Expand Up @@ -92,7 +92,7 @@ pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> PathBuf {

pub trait BenchmarkDataset {
fn as_uncompressed(&self);
fn to_vortex_array(&self) -> VortexResult<Array>;
fn to_vortex_array(&self) -> VortexResult<ArrayData>;
fn compress_to_vortex(&self) -> VortexResult<()>;
fn write_as_parquet(&self);
fn write_as_vortex(&self) -> impl Future<Output = ()>;
Expand Down
14 changes: 7 additions & 7 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::fastlanes::DeltaEncoding;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{Array, Context, IntoArray};
use vortex::{ArrayData, Context, IntoArrayData};

use crate::data_downloads::FileType;
use crate::reader::BATCH_SIZE;
Expand Down Expand Up @@ -124,7 +124,7 @@ pub fn setup_logger(level: LevelFilter) {
.unwrap();
}

pub fn fetch_taxi_data() -> Array {
pub fn fetch_taxi_data() -> ArrayData {
let file = File::open(taxi_data_parquet()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_batch_size(BATCH_SIZE).build().unwrap();
Expand All @@ -134,7 +134,7 @@ pub fn fetch_taxi_data() -> Array {
reader
.into_iter()
.map(|batch_result| batch_result.unwrap())
.map(Array::try_from)
.map(ArrayData::try_from)
.map(Result::unwrap)
.collect_vec(),
DType::from_arrow(schema),
Expand All @@ -143,7 +143,7 @@ pub fn fetch_taxi_data() -> Array {
.into_array()
}

pub fn compress_taxi_data() -> Array {
pub fn compress_taxi_data() -> ArrayData {
CompressionStrategy::compress(&SamplingCompressor::default(), &fetch_taxi_data()).unwrap()
}

Expand Down Expand Up @@ -201,7 +201,7 @@ mod test {
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::{Array, IntoCanonical};
use vortex::{ArrayData, IntoCanonical};

use crate::taxi_data::taxi_data_parquet;
use crate::{compress_taxi_data, setup_logger};
Expand All @@ -223,7 +223,7 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = Array::from_arrow(arrow_array.clone(), false);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = vortex_array.into_canonical().unwrap().into_arrow().unwrap();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
Expand All @@ -242,7 +242,7 @@ mod test {
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = Array::from_arrow(arrow_array.clone(), false);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);

let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_canonical().unwrap().into_arrow().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/public_bi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::fs::File;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::error::VortexResult;
use vortex::{Array, ArrayDType, IntoArray};
use vortex::{ArrayDType, ArrayData, IntoArrayData};

use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType};
use crate::public_bi_data::PBIDataset::*;
Expand Down Expand Up @@ -555,7 +555,7 @@ impl BenchmarkDataset for BenchmarkDatasets {
}
}

fn to_vortex_array(&self) -> VortexResult<Array> {
fn to_vortex_array(&self) -> VortexResult<ArrayData> {
self.write_as_parquet();

let arrays = self
Expand Down
18 changes: 9 additions & 9 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use vortex::file::{
};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{Array, IntoArray, IntoCanonical};
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
Expand All @@ -46,7 +46,7 @@ pub struct VortexFooter {
pub dtype_range: Range<u64>,
}

pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
let file = TokioFile::open(path).unwrap();

VortexReadBuilder::new(
Expand Down Expand Up @@ -85,12 +85,12 @@ pub fn read_parquet_to_vortex<P: AsRef<Path>>(parquet_path: P) -> VortexResult<C
let dtype = DType::from_arrow(reader.schema());
let chunks = reader
.map(|batch_result| batch_result.unwrap())
.map(Array::try_from)
.map(ArrayData::try_from)
.collect::<VortexResult<Vec<_>>>()?;
ChunkedArray::try_new(chunks, dtype)
}

pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<Array> {
pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ArrayData> {
let chunked = read_parquet_to_vortex(parquet_path)?;
CompressionStrategy::compress(&SamplingCompressor::default(), &chunked.into_array())
}
Expand All @@ -117,33 +117,33 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
) -> VortexResult<Array> {
) -> VortexResult<ArrayData> {
VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
),
)
.with_indices(Array::from(indices.to_vec()))
.with_indices(ArrayData::from(indices.to_vec()))
.build()
.await?
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
.and_then(IntoCanonical::into_canonical)
.map(Array::from)
.map(ArrayData::from)
}

pub async fn take_vortex_object_store(
fs: Arc<dyn ObjectStore>,
path: object_store::path::Path,
indices: &[u64],
) -> VortexResult<Array> {
) -> VortexResult<ArrayData> {
take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await
}

pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<ArrayData> {
take_vortex(TokioFile::open(path)?, indices).await
}

Expand Down
Loading

0 comments on commit c31ae9d

Please sign in to comment.