Skip to content

Commit

Permalink
feat!: standardize file format names & stop wrapping Footer contents …
Browse files Browse the repository at this point in the history
…in messages (#1275)
  • Loading branch information
lwwmanning authored Nov 13, 2024
1 parent 5e07dc9 commit 9c486c4
Show file tree
Hide file tree
Showing 43 changed files with 981 additions and 982 deletions.
8 changes: 3 additions & 5 deletions bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use vortex::dtype::field::Field;
use vortex::error::VortexResult;
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::serde::layouts::{
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
};
use vortex::serde::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical};

use crate::tokio_runtime::TOKIO_RUNTIME;
Expand Down Expand Up @@ -108,7 +106,7 @@ fn vortex_compress_write(
buf: &mut Vec<u8>,
) -> VortexResult<u64> {
async fn async_write(array: &Array, cursor: &mut Cursor<&mut Vec<u8>>) -> VortexResult<()> {
let mut writer = LayoutWriter::new(cursor);
let mut writer = VortexFileWriter::new(cursor);

writer = writer.write_array_columns(array.clone()).await?;
writer.finalize().await?;
Expand All @@ -125,7 +123,7 @@ fn vortex_compress_write(

fn vortex_decompress_read(runtime: &Runtime, buf: Arc<Vec<u8>>) -> VortexResult<ArrayRef> {
async fn async_read(buf: Arc<Vec<u8>>) -> VortexResult<Array> {
let builder: LayoutBatchStreamBuilder<_> = LayoutBatchStreamBuilder::new(
let builder: VortexReadBuilder<_> = VortexReadBuilder::new(
buf,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
Expand Down
10 changes: 4 additions & 6 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::serde::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt, VortexWrite};
use vortex::serde::layouts::{
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
};
use vortex::{Array, IntoArray, IntoCanonical};

pub const BATCH_SIZE: usize = 65_536;
Expand All @@ -46,7 +44,7 @@ pub struct VortexFooter {
pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();

LayoutBatchStreamBuilder::new(
VortexReadBuilder::new(
file,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
Expand All @@ -65,7 +63,7 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

LayoutWriter::new(write)
VortexFileWriter::new(write)
.write_array_columns(chunked)
.await?
.finalize()
Expand Down Expand Up @@ -114,7 +112,7 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
reader: T,
indices: &[u64],
) -> VortexResult<Array> {
LayoutBatchStreamBuilder::new(
VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::serde::layouts::LayoutWriter;
use vortex::serde::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
Expand Down Expand Up @@ -231,7 +231,7 @@ async fn register_vortex_file(
create_dir_all(&vortex_dir)?;
let output_file = &vortex_dir
.join(file.file_name().unwrap())
.with_extension("vxf");
.with_extension(VORTEX_FILE_EXTENSION);
let vtx_file = idempotent_async(output_file, |vtx_file| async move {
let record_batches = session
.read_csv(
Expand Down Expand Up @@ -303,7 +303,7 @@ async fn register_vortex_file(
.open(&vtx_file)
.await?;

let mut writer = LayoutWriter::new(f);
let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

Expand Down
61 changes: 35 additions & 26 deletions docs/file_format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,42 @@ Layouts
^^^^^^^

Vortex arrays have the same binary representation in-memory, on-disk, and over-the-wire; however,
all the rows of all the columns are not necessarily contiguously laid out. Vortex has three kinds of
*layouts* which recursively compose: the *flat layout*, the *column layout*, and the *chunked
layout*.
all the rows of all the columns are not necessarily contiguously laid out. Vortex currently has
three kinds of *layouts* which recursively compose: the *flat layout*, the *columnar layout*, and
the *chunked layout*.

The flat layout is a contiguous sequence of bytes. Any Vortex array encoding can be serialized into
the flat layout.
The flat layout is a contiguous sequence of bytes. *Any* Vortex array encoding (including
struct-typed arrays) can be serialized into the flat layout.

The column layout lays out each column of a struct-typed array as a separate sequence of bytes. Each
column may or may not recursively use a chunked layout. Column layouts permit readers to push-down
The columnar layout lays out each column of a struct-typed array as a separate sequence of bytes. Each
column may or may not recursively use a chunked layout. Columnar layouts permit readers to push-down
column projections.

The chunked layout lays out an array as a sequence of row chunks. Each chunk may have a different
size. A chunked layout permits reader to push-down row filters based on statistics which we describe
later. Note that, if the laid out array is a struct array, each column uses the same chunk
size. This is equivalent to Parquet's row groups.
size. A chunked layout permits reader to push-down row filters based on statistics and/or row offsets.
Note that if the laid out array is a struct array, each column will use the same chunk size. This is
equivalent to Parquet's row groups.

A few examples of concrete layouts:

1. Chunked of struct of chunked of flat: essentially a Parquet layout with row groups in which each
column's values are contiguously stored in pages.
2. Struct of chunked of flat: eliminates row groups, retaining only pages.
3. Struct of flat: prevents row filter push-down because each array is, to the layout, an opaque
sequence of bytes.
1. Chunked of columnar of chunked of flat: essentially a Parquet layout with row groups in which each
column's values are contiguously stored in pages. Note that in this case, the pages within each
"row group" may be of different sizes / do not have to be aligned.
2. Columnar of chunked of flat: eliminates row groups, retaining only pages.
3. Columnar of flat: prevents row filter push-down because each column is an opaque sequence of bytes.

The chunked layout stores, per chunk, metadata necessary for effective row filtering such as
sortedness, constancy, the minimum value, the maximum value, and the number of null rows. Readers
consult these metadata tables to avoid reading chunks without relevant data.
The chunked layout has an optional child that corresponds to a Vortex `StructArray` of per-chunk
statistics (sometimes referred to as a "statistics table"), which contains metadata necessary for
effective row filtering such as sortedness, the minimum value, the maximum value, and the number of
null rows. Other statistics (e.g., sortedness) are stored inline with the data.

The current writer implementation writes all such "metadata" IPC messages after writing all of the
"data" IPC messages (allowing us to maximize the probability that metadata pruning can proceed
after the first read from disk / object storage). The location of the metadata messages is encoded
in the layout, which is then serialized just before the very end of the file.

One implication of this is that the precise location of the metadata is not itself part of the file
format specification. Instead, it is fully described by the layout.

.. card::

Expand All @@ -52,25 +61,25 @@ consult these metadata tables to avoid reading chunks without relevant data.

+++

The Vortex file format has five sections: data, statistics, schema, footer, and postscript. The
postscript describes the locating of the schema and layout which in turn describe how to
interpret the data and metadata. The schema describes the logical type. The metadata contains
information necessary for row filtering.
The Vortex file format has two top-level sections:

1. Data (typically array IPC messages, followed by statistics, though that's a writer implementation detail),
2. Footer, which contains the schema (i.e., the logical type), the layout, a postscript (containing offsets), and an 8-byte end-of-file marker.

.. _included-codecs:

Encodings
^^^^^^^^^

- Most of the Arrow encodings.
- Chunked, a sequence of arrays.
- Constant, a value and a length.
- Sparse, a value plus a pair of arrays representing exceptions: an array of indices and of values.
- Chunked: a sequence of arrays.
- Constant: a value and a length.
- Sparse: a default value, plus a pair of arrays representing exceptions: an array of indices and of values.
- FastLanes Frame-of-Reference, BitPacking, and Delta.
- Fast Static Symbol Table (FSST).
- Adapative Lossless Floating Point (ALP).
- ALP Real Double (ALP-RD).
- ByteBool, one byte per Boolean value.
- ByteBool: one byte per Boolean value.
- ZigZag.

Specification
Expand Down
28 changes: 14 additions & 14 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use tokio::fs::File;
use vortex::arrow::infer_schema;
use vortex::dtype::field::Field;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::error::{vortex_err, VortexResult};
use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT;
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt};
use vortex::serde::layouts::{
LayoutBatchStream, LayoutBatchStreamBuilder, LayoutContext, LayoutDescriptorReader,
LayoutDeserializer, Projection, RowFilter, VortexRecordBatchReader,
use vortex::serde::file::{
read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter,
VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader,
};
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt};
use vortex::Array;

use crate::expr::PyExpr;
Expand All @@ -29,8 +29,8 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
projection: Projection,
row_filter: Option<RowFilter>,
indices: Option<Array>,
) -> VortexResult<LayoutBatchStream<T>> {
let mut builder = LayoutBatchStreamBuilder::new(
) -> VortexResult<VortexFileArrayStream<T>> {
let mut builder = VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
Expand Down Expand Up @@ -63,13 +63,13 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
}

pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
LayoutDescriptorReader::new(LayoutDeserializer::new(
ALL_ENCODINGS_CONTEXT.clone(),
LayoutContext::default().into(),
))
.read_footer(&reader, reader.size().await)
.await?
.dtype()
let initial_read = read_initial_bytes(&reader, reader.size().await).await?;
DType::try_from(
initial_read
.fb_schema()?
.dtype()
.ok_or_else(|| vortex_err!("Failed to fetch dtype from initial read"))?,
)
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
Expand Down
4 changes: 2 additions & 2 deletions pyvortex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pyo3::pyfunction;
use pyo3::types::PyString;
use tokio::fs::File;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::serde::layouts::LayoutWriter;
use vortex::serde::file::VortexFileWriter;
use vortex::Array;

use crate::dataset::{ObjectStoreUrlDataset, TokioFileDataset};
Expand Down Expand Up @@ -225,7 +225,7 @@ pub fn write_path(
) -> PyResult<()> {
async fn run(array: &Array, fname: &str) -> PyResult<()> {
let file = File::create(Path::new(fname)).await?;
let mut writer = LayoutWriter::new(file);
let mut writer = VortexFileWriter::new(file);

writer = writer.write_array_columns(array.clone()).await?;
writer.finalize().await?;
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_array::validity::Validity;
use vortex_array::{Context, IntoArray};
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::layouts::LayoutWriter;
use vortex_serde::file::VortexFileWriter;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -47,7 +47,7 @@ async fn main() -> anyhow::Result<()> {
.open(&filepath)
.await?;

let writer = LayoutWriter::new(f);
let writer = VortexFileWriter::new(f);
let writer = writer.write_array_columns(st.into_array()).await?;
writer.finalize().await?;

Expand Down
8 changes: 4 additions & 4 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use futures::{FutureExt as _, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use vortex_array::Context;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_serde::io::ObjectStoreReadAt;
use vortex_serde::layouts::{
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, Projection, RowFilter,
use vortex_serde::file::{
LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder,
};
use vortex_serde::io::ObjectStoreReadAt;

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
Expand All @@ -27,7 +27,7 @@ impl FileOpener for VortexFileOpener {
let read_at =
ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone());

let mut builder = LayoutBatchStreamBuilder::new(
let mut builder = VortexReadBuilder::new(
read_at,
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
);
Expand Down
39 changes: 31 additions & 8 deletions vortex-flatbuffers/flatbuffers/vortex-serde/footer.fbs
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
/// A `Buffer` is a simple container for the `begin` and `end` byte offsets within the file.
/// These offsets are absolute (i.e., relative to the start of the file).
struct Buffer {
begin: uint64;
end: uint64;
}

/// A `Layout` is a recursive data structure that describes the physical layout of the data in a Vortex file.
/// As a starting, concrete example, the first three Layout encodings are defined as:
///
/// 1. encoding == 1, `Flat` -> one buffer, zero child Layouts
/// 2. encoding == 2, `Chunked` -> zero buffers, one or more child Layouts (used for chunks of rows)
/// 3. encoding == 3, `Columnar` -> zero buffers, one or more child Layouts (used for columns of structs)
///
/// The `row_count` represents the number of rows represented by this Layout. This is very useful for
/// pruning the Layout tree based on row filters.
///
/// The `metadata` field is fully opaque at this layer, and allows the Layout implementation corresponding to
/// `encoding` to embed additional information that may be useful for the reader. For example, the `ChunkedLayout`
/// uses the first byte of the `metadata` array as a boolean to indicate whether the first child Layout represents
/// the statistics table for the other chunks.
table Layout {
encoding: uint16;
buffers: [Buffer];
children: [Layout];
length: uint64;
metadata: [ubyte];
}

table Footer {
layout: Layout;
row_count: uint64;
metadata: [ubyte];
}

/// The `Postscript` is guaranteed by the file format to never exceed 65528 bytes (i.e., u16::MAX - 8 bytes)
/// in length, and is immediately followed by an 8-byte `EndOfFile` struct.
///
/// The `EndOfFile` struct cannot change size without breaking backwards compatibility. It is not written/read
/// using flatbuffers, but the equivalent flatbuffer definition would be:
///
/// struct EndOfFile {
/// version: uint16;
/// footer_length: uint16;
/// magic: [uint8; 4]; // "VTXF"
/// }
///
table Postscript {
schema_offset: uint64;
footer_offset: uint64;
layout_offset: uint64;
}

root_type Layout;
root_type Postscript;
root_type Footer;
Loading

0 comments on commit 9c486c4

Please sign in to comment.