From 4eef14ee1bd2f7fee0c2f91b5541bffdf258e583 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Jun 2024 07:19:33 -0400 Subject: [PATCH 01/15] Update `string-view` branch to arrow-rs main (#10966) * Pin to arrow main * Fix clippy with latest arrow * Uncomment test that needs new arrow-rs to work * Update datafusion-cli Cargo.lock * Update Cargo.lock * tapelo --- datafusion-cli/Cargo.lock | 246 +++++++++++++++++++++++++++++++++++--- 1 file changed, 227 insertions(+), 19 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index af5d358b5064..1f0547d4bda7 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1489,6 +1489,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "displaydoc" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -2051,14 +2062,134 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "idna" -version = "0.5.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "icu_normalizer", + "icu_properties", + "smallvec", + "utf8_iter", ] [[package]] @@ -2270,6 +2401,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" + [[package]] name = "lock_api" version = "0.4.12" @@ -3440,6 +3577,12 @@ dependencies = [ "syn 2.0.71", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -3636,6 +3779,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -3829,27 +3982,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "unicode-bidi" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" - [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" -[[package]] -name = "unicode-normalization" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -3891,6 +4029,18 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -4241,6 +4391,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "xmlparser" version = "0.13.6" @@ -4256,6 +4418,30 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yoke" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -4282,6 +4468,28 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +[[package]] +name = "zerovec" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "zstd" version = "0.12.4" From ed3da82f37c673c9e650dfc80b6709aa71a25f61 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 11 Jul 2024 23:44:08 -0400 Subject: [PATCH 02/15] merge --- Cargo.toml | 5 +- datafusion/physical-expr-common/Cargo.toml | 1 + .../src/binary_view_map.rs | 587 ++++++++++++++++++ datafusion/physical-expr-common/src/lib.rs | 1 + 4 files changed, 592 insertions(+), 2 deletions(-) create mode 100644 datafusion/physical-expr-common/src/binary_view_map.rs diff --git a/Cargo.toml b/Cargo.toml index b1f07aa531df..9509c41acf42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,8 +70,9 @@ arrow = { version = "52.1.0", features = [ arrow-array = { version = "52.1.0", default-features = false, features = [ "chrono-tz", ] } -arrow-buffer = { version = "52.1.0", default-features = false } -arrow-flight = { version = "52.1.0", features = [ +arrow-buffer = { version = "52.0.0", default-features = false } +arrow-data = { version = "52.0.0", default-features = false } +arrow-flight = { version = "52.0.0", features = [ "flight-sql-experimental", ] } arrow-ipc = { version = "52.1.0", default-features = false, features = [ diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 3ef2d5345533..bbb899d09fe0 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -38,6 +38,7 @@ path = "src/lib.rs" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-data = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs new file mode 100644 index 000000000000..9422a05f42de --- /dev/null +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -0,0 +1,587 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from +//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. +use ahash::RandomState; +use arrow::array::cast::AsArray; +use arrow::array::{ + Array, ArrayRef, BinaryViewArray, BooleanBufferBuilder, GenericByteViewArray, + StringViewArray, +}; +use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; +use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; +use arrow_data::ByteView; +use datafusion_common::hash_utils::create_hashes; +use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; +use std::fmt::Debug; +use std::sync::Arc; + +/// Should the output be a String or Binary? +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutputType { + /// `StringArray` or `LargeStringArray` + Utf8View, + /// `BinaryArray` or `LargeBinaryArray` + BinaryView, +} + +/// HashSet optimized for storing string or binary values that can produce that +/// the final set as a GenericStringArray with minimal copies. +#[derive(Debug)] +pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); + +impl ArrowBytesViewSet { + pub fn new(output_type: OutputType) -> Self { + Self(ArrowBytesViewMap::new(output_type)) + } + + /// Return the contents of this set and replace it with a new empty + /// set with the same output type + pub(super) fn take(&mut self) -> Self { + Self(self.0.take()) + } + + /// Inserts each value from `values` into the set + pub fn insert(&mut self, values: &ArrayRef) { + fn make_payload_fn(_value: Option<&[u8]>) {} + fn observe_payload_fn(_payload: ()) {} + self.0 + .insert_if_new(values, make_payload_fn, observe_payload_fn); + } + + /// Converts this set into a `StringArray`/`LargeStringArray` or + /// `BinaryArray`/`LargeBinaryArray` containing each distinct value that + /// was interned. This is done without copying the values. + pub fn into_state(self) -> ArrayRef { + self.0.into_state() + } + + /// Returns the total number of distinct values (including nulls) seen so far + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// returns the total number of distinct values (not including nulls) seen so far + pub fn non_null_len(&self) -> usize { + self.0.non_null_len() + } + + /// Return the total size, in bytes, of memory used to store the data in + /// this set, not including `self` + pub fn size(&self) -> usize { + self.0.size() + } +} + +/// Optimized map for storing Arrow "byte view" types (`StringView`, `BinaryView`) +/// values that can produce the set of keys on +/// output as `GenericBinaryViewArray` without copies. +/// +/// Equivalent to `HashSet` but with better performance for arrow +/// data. +/// +/// # Generic Arguments +/// +/// * `V`: payload type +/// +/// # Description +/// +/// This is a specialized HashMap with the following properties: +/// +/// 1. Optimized for storing and emitting Arrow byte types (e.g. +/// `StringViewArray` / `BinaryViewArray`) very efficiently by minimizing copying of +/// the string values themselves, both when inserting and when emitting the +/// final array. +/// +/// +/// 2. Retains the insertion order of entries in the final array. The values are +/// in the same order as they were inserted. +/// +/// Note this structure can be used as a `HashSet` by specifying the value type +/// as `()`, as is done by [`ArrowByteViewSet`]. +/// +/// This map is used by the special `COUNT DISTINCT` aggregate function to +/// store the distinct values, and by the `GROUP BY` operator to store +/// group values when they are a single string array. + +pub struct ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + /// Should the output be String or Binary? + output_type: OutputType, + /// Underlying hash set for each distinct value + map: hashbrown::raw::RawTable>, + /// Total size of the map in bytes + map_size: usize, + view_buffers: Vec, + views: Vec, + /// random state used to generate hashes + random_state: RandomState, + /// buffer that stores hash values (reused across batches to save allocations) + hashes_buffer: Vec, + /// `(payload, null_index)` for the 'null' value, if any + /// NOTE null_index is the logical index in the final array, not the index + /// in the buffer + null: Option<(V, usize)>, + + // the length of the input array. Used to determine if we want to gc the output array + // to avoid holding the view_buffers too long. + input_len: usize, +} + +/// The size, in number of entries, of the initial hash table +const INITIAL_MAP_CAPACITY: usize = 128; + +impl ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + pub fn new(output_type: OutputType) -> Self { + Self { + output_type, + map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY), + map_size: 0, + views: Vec::new(), + view_buffers: Vec::new(), + random_state: RandomState::new(), + hashes_buffer: vec![], + null: None, + input_len: 0, + } + } + + /// Return the contents of this map and replace it with a new empty map with + /// the same output type + pub fn take(&mut self) -> Self { + let mut new_self = Self::new(self.output_type); + std::mem::swap(self, &mut new_self); + new_self + } + + /// Inserts each value from `values` into the map, invoking `payload_fn` for + /// each value if *not* already present, deferring the allocation of the + /// payload until it is needed. + /// + /// Note that this is different than a normal map that would replace the + /// existing entry + /// + /// # Arguments: + /// + /// `values`: array whose values are inserted + /// + /// `make_payload_fn`: invoked for each value that is not already present + /// to create the payload, in order of the values in `values` + /// + /// `observe_payload_fn`: invoked once, for each value in `values`, that was + /// already present in the map, with corresponding payload value. + /// + /// # Returns + /// + /// The payload value for the entry, either the existing value or + /// the newly inserted value + /// + /// # Safety: + /// + /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked + /// with valid values from `values`, not for the `NULL` value. + pub fn insert_if_new( + &mut self, + values: &ArrayRef, + make_payload_fn: MP, + observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + { + // Sanity array type + match self.output_type { + OutputType::BinaryView => { + assert!(matches!(values.data_type(), DataType::BinaryView)); + self.insert_if_new_inner::( + values, + make_payload_fn, + observe_payload_fn, + ) + } + OutputType::Utf8View => { + assert!(matches!(values.data_type(), DataType::Utf8View)); + self.insert_if_new_inner::( + values, + make_payload_fn, + observe_payload_fn, + ) + } + }; + } + + /// Generic version of [`Self::insert_if_new`] that handles `ByteViewType` + /// (both StringView and BinaryView) + /// + /// Note this is the only function that is generic on [`ByteViewType`], which + /// avoids having to template the entire structure, making the code + /// simpler and understand and reducing code bloat due to duplication. + /// + /// See comments on `insert_if_new` for more details + fn insert_if_new_inner( + &mut self, + values: &ArrayRef, + mut make_payload_fn: MP, + mut observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + B: ByteViewType, + { + // step 1: compute hashes + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(values.len(), 0); + create_hashes(&[values.clone()], &self.random_state, batch_hashes) + // hash is supported for all types and create_hashes only + // returns errors for unsupported types + .unwrap(); + + // step 2: insert each value into the set, if not already present + let values = values.as_byte_view::(); + + self.input_len += values.len(); + + let buffer_offset = self.view_buffers.len(); + self.view_buffers.extend_from_slice(values.data_buffers()); + + // Ensure lengths are equivalent + assert_eq!(values.len(), batch_hashes.len()); + + for (view_idx, (value, &hash)) in + values.iter().zip(batch_hashes.iter()).enumerate() + { + // handle null value + let Some(value) = value else { + let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { + payload + } else { + let payload = make_payload_fn(None); + let null_index = self.views.len(); + self.views.push(0); + self.null = Some((payload, null_index)); + payload + }; + observe_payload_fn(payload); + continue; + }; + + // get the value as bytes + let value: &[u8] = value.as_ref(); + + let entry = self.map.get_mut(hash, |header| { + let v = unsafe { self.views.get_unchecked(header.view_idx) }; + + let len = *v as u32; + if len as usize != value.len() { + return false; + } + + // We should probably change arrow-rs to provide a + // GenericByteViewArray::value_from_view() method + let b = if len <= 12 { + unsafe { + GenericByteViewArray::::inline_value( + v, + len as usize, + ) + } + } else { + let view = ByteView::from(*v); + + let data = unsafe { + self.view_buffers.get_unchecked(view.buffer_index as usize) + }; + let offset = view.offset as usize; + unsafe { data.get_unchecked(offset..offset + len as usize) } + }; + b == value + }); + + let payload = if let Some(entry) = entry { + entry.payload + } else { + // no existing value, make a new one. + let payload = make_payload_fn(Some(value)); + let new_header = Entry { view_idx, payload }; + + let view = if value.len() <= 12 { + unsafe { *values.views().get_unchecked(view_idx) } + } else { + let v = unsafe { *values.views().get_unchecked(view_idx) }; + let mut v = ByteView::from(v); + v.buffer_index += buffer_offset as u32; + v.as_u128() + }; + self.views.push(view); + self.map + .insert_accounted(new_header, |_| hash, &mut self.map_size); + payload + }; + observe_payload_fn(payload); + } + } + + /// Converts this set into a `StringViewArray`, or `BinaryViewArray`, + /// containing each distinct value + /// that was inserted. This is done without copying the values. + /// + /// The values are guaranteed to be returned in the same order in which + /// they were first seen. + pub fn into_state(self) -> ArrayRef { + // Only make a `NullBuffer` if there was a null value + let nulls = self.null.map(|(_payload, null_index)| { + let num_values = self.views.len(); + single_null_buffer(num_values, null_index) + }); + + let len = self.views.len(); + let b = ScalarBuffer::new(Buffer::from_vec(self.views), 0, len); + match self.output_type { + OutputType::BinaryView => { + // SAFETY: the offsets were constructed correctly + let mut array = unsafe { + BinaryViewArray::new_unchecked(b, self.view_buffers, nulls) + }; + if array.len() < (self.input_len / 2) { + // arrow gc by default will deduplicate strings, it should not do that. + // todo: file a ticket to change it. + array = array.gc(); + } + Arc::new(array) + } + OutputType::Utf8View => { + // SAFETY: + // 1. the offsets were constructed safely + // + // 2. we asserted the input arrays were all the correct type and + // thus since all the values that went in were valid (e.g. utf8) + // so are all the values that come out + let mut array = unsafe { + StringViewArray::new_unchecked(b, self.view_buffers, nulls) + }; + if array.len() < (self.input_len / 2) { + array = array.gc(); + } + Arc::new(array) + } + } + } + + /// Total number of entries (including null, if present) + pub fn len(&self) -> usize { + self.non_null_len() + self.null.map(|_| 1).unwrap_or(0) + } + + /// Is the set empty? + pub fn is_empty(&self) -> bool { + self.map.is_empty() && self.null.is_none() + } + + /// Number of non null entries + pub fn non_null_len(&self) -> usize { + self.map.len() + } + + /// Return the total size, in bytes, of memory used to store the data in + /// this set, not including `self` + pub fn size(&self) -> usize { + // view buffers are from upstream string view, not technically from us. + self.map_size + self.views.allocated_size() + self.hashes_buffer.allocated_size() + } +} + +/// Returns a `NullBuffer` with a single null value at the given index +fn single_null_buffer(num_values: usize, null_index: usize) -> NullBuffer { + let mut bool_builder = BooleanBufferBuilder::new(num_values); + bool_builder.append_n(num_values, true); + bool_builder.set_bit(null_index, false); + NullBuffer::from(bool_builder.finish()) +} + +impl Debug for ArrowBytesViewMap +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowBytesMap") + .field("map", &"") + .field("map_size", &self.map_size) + .field("views", &self.views) + .field("random_state", &self.random_state) + .field("hashes_buffer", &self.hashes_buffer) + .finish() + } +} + +/// Entry in the hash table -- see [`ArrowViewBytesMap`] for more details +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +struct Entry +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + /// The idx into the views array + view_idx: usize, + + /// value stored by the entry + payload: V, +} + +#[cfg(test)] +mod tests { + use super::*; + + // asserts that the set contains the expected strings, in the same order + fn assert_set(set: ArrowBytesViewSet, expected: &[Option<&str>]) { + let strings = set.into_state(); + let strings = strings.as_string_view(); + let state = strings.into_iter().collect::>(); + assert_eq!(state, expected); + } + + #[test] + fn string_view_set_empty() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(0)); + set.insert(&array); + assert_eq!(set.len(), 0); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[]); + } + + #[test] + fn string_view_set_one_null() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(1)); + set.insert(&array); + assert_eq!(set.len(), 1); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[None]); + } + + #[test] + fn string_view_set_many_null() { + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(StringViewArray::new_null(11)); + set.insert(&array); + assert_eq!(set.len(), 1); + assert_eq!(set.non_null_len(), 0); + assert_set(set, &[None]); + } + + #[test] + fn test_string_view_set_basic() { + // basic test for mixed small and large string values + let values = GenericByteViewArray::from(vec![ + Some("a"), + Some("b"), + Some("CXCCCCCCCCAABB"), // 14 bytes + Some(""), + Some("cbcxx"), // 5 bytes + None, + Some("AAAAAAAA"), // 8 bytes + Some("BBBBBQBBBAAA"), // 12 bytes + Some("a"), + Some("cbcxx"), + Some("b"), + Some("cbcxx"), + Some(""), + None, + Some("BBBBBQBBBAAA"), + Some("BBBBBQBBBAAA"), + Some("AAAAAAAA"), + Some("CXCCCCCCCCAABB"), + ]); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(values); + set.insert(&array); + // values mut appear be in the order they were inserted + assert_set( + set, + &[ + Some("a"), + Some("b"), + Some("CXCCCCCCCCAABB"), + Some(""), + Some("cbcxx"), + None, + Some("AAAAAAAA"), + Some("BBBBBQBBBAAA"), + ], + ); + } + + #[test] + fn test_string_set_non_utf8() { + // basic test for mixed small and large string values + let values = GenericByteViewArray::from(vec![ + Some("a"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + Some("🔥"), + Some("✨✨✨"), + Some("foobarbaz"), + Some("🔥"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + ]); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let array: ArrayRef = Arc::new(values); + set.insert(&array); + // strings mut appear be in the order they were inserted + assert_set( + set, + &[ + Some("a"), + Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"), + Some("🔥"), + Some("✨✨✨"), + Some("foobarbaz"), + ], + ); + } + + // Test use of binary output type + #[test] + fn test_binary_set() { + let v: Vec> = vec![ + Some(b"a"), + Some(b"CXCCCCCCCCCCCCC"), + None, + Some(b"CXCCCCCCCCCCCCC"), + ]; + let values: ArrayRef = Arc::new(BinaryViewArray::from(v)); + + let expected: Vec> = + vec![Some(b"a"), Some(b"CXCCCCCCCCCCCCC"), None]; + let expected: ArrayRef = Arc::new(GenericByteViewArray::from(expected)); + + let mut set = ArrowBytesViewSet::new(OutputType::BinaryView); + set.insert(&values); + assert_eq!(&set.into_state(), &expected); + } +} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 8d50e0b964e5..7d53ca685fc5 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -18,6 +18,7 @@ pub mod aggregate; pub mod binary_map; pub mod datum; +pub mod binary_view_map; pub mod expressions; pub mod physical_expr; pub mod sort_expr; From 7dc596aeb84d2486e6f374b21c90a02474bc8be5 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 11 Jul 2024 23:49:59 -0400 Subject: [PATCH 03/15] update cast --- datafusion/common/src/cast.rs | 5 +++++ datafusion/common/src/hash_utils.rs | 8 +++++--- datafusion/physical-expr-common/src/binary_view_map.rs | 6 ------ datafusion/physical-expr-common/src/lib.rs | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index 0586fcf5e2ae..0f325d47cb62 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -127,6 +127,11 @@ pub fn as_generic_binary_array( Ok(downcast_value!(array, GenericBinaryArray, T)) } +// Downcast ArrayRef to BinaryViewArray +pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> { + Ok(downcast_value!(array, BinaryViewArray)) +} + // Downcast ArrayRef to GenericListArray pub fn as_generic_list_array( array: &dyn Array, diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index a1e94b55bf23..a0c13c86fc1d 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -28,9 +28,9 @@ use arrow_buffer::IntervalDayTime; use arrow_buffer::IntervalMonthDayNano; use crate::cast::{ - as_boolean_array, as_fixed_size_list_array, as_generic_binary_array, - as_large_list_array, as_list_array, as_primitive_array, as_string_array, - as_struct_array, + as_binary_view_array, as_boolean_array, as_fixed_size_list_array, + as_generic_binary_array, as_large_list_array, as_list_array, as_primitive_array, + as_string_array, as_string_view_array, as_struct_array, }; use crate::error::{Result, _internal_err}; @@ -371,9 +371,11 @@ pub fn create_hashes<'a>( DataType::Null => hash_null(random_state, hashes_buffer, rehash), DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash), DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash), + DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash), DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), DataType::Binary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), + DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeBinary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), DataType::FixedSizeBinary(_) => { diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 9422a05f42de..c8bb4ea45825 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -50,12 +50,6 @@ impl ArrowBytesViewSet { Self(ArrowBytesViewMap::new(output_type)) } - /// Return the contents of this set and replace it with a new empty - /// set with the same output type - pub(super) fn take(&mut self) -> Self { - Self(self.0.take()) - } - /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { fn make_payload_fn(_value: Option<&[u8]>) {} diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index 7d53ca685fc5..f03eedd4cf65 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -17,8 +17,8 @@ pub mod aggregate; pub mod binary_map; -pub mod datum; pub mod binary_view_map; +pub mod datum; pub mod expressions; pub mod physical_expr; pub mod sort_expr; From b70bce4a5a55773339bf5fbbc4b532048543fa56 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 11 Jul 2024 23:59:38 -0400 Subject: [PATCH 04/15] consistent dep --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9509c41acf42..00b22e8806ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,9 +70,9 @@ arrow = { version = "52.1.0", features = [ arrow-array = { version = "52.1.0", default-features = false, features = [ "chrono-tz", ] } -arrow-buffer = { version = "52.0.0", default-features = false } -arrow-data = { version = "52.0.0", default-features = false } -arrow-flight = { version = "52.0.0", features = [ +arrow-buffer = { version = "52.1.0", default-features = false } +arrow-data = { version = "52.1.0", default-features = false } +arrow-flight = { version = "52.1.0", features = [ "flight-sql-experimental", ] } arrow-ipc = { version = "52.1.0", default-features = false, features = [ From edb2665df396728469ad2bafc5028e1dd9067ca3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 12 Jul 2024 10:07:54 -0400 Subject: [PATCH 05/15] fix ci --- datafusion-cli/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 1f0547d4bda7..48d996e5b721 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1369,6 +1369,7 @@ version = "40.0.0" dependencies = [ "ahash", "arrow", + "arrow-data", "datafusion-common", "datafusion-expr", "hashbrown 0.14.5", From be858efab583867a8ffd809790abe98868cff0a3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 12 Jul 2024 10:34:49 -0400 Subject: [PATCH 06/15] add more tests --- .../src/binary_view_map.rs | 172 +++++++++++++++++- 1 file changed, 171 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c8bb4ea45825..d90134a78c75 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -321,7 +321,12 @@ where } else { // no existing value, make a new one. let payload = make_payload_fn(Some(value)); - let new_header = Entry { view_idx, payload }; + + let inner_view_idx = self.views.len(); + let new_header = Entry { + view_idx: inner_view_idx, + payload, + }; let view = if value.len() <= 12 { unsafe { *values.views().get_unchecked(view_idx) } @@ -447,6 +452,8 @@ where #[cfg(test)] mod tests { + use hashbrown::HashMap; + use super::*; // asserts that the set contains the expected strings, in the same order @@ -578,4 +585,167 @@ mod tests { set.insert(&values); assert_eq!(&set.into_state(), &expected); } + + // inserting strings into the set does not increase reported memory + #[test] + fn test_string_set_memory_usage() { + let strings1 = StringViewArray::from(vec![ + Some("a"), + Some("b"), + Some("CXCCCCCCCCCCC"), // 13 bytes + Some("AAAAAAAA"), // 8 bytes + Some("BBBBBQBBB"), // 9 bytes + ]); + let total_strings1_len = strings1 + .iter() + .map(|s| s.map(|s| s.len()).unwrap_or(0)) + .sum::(); + let values1: ArrayRef = Arc::new(StringViewArray::from(strings1)); + + // Much larger strings in strings2 + let strings2 = StringViewArray::from(vec![ + "FOO".repeat(1000), + "BAR larger than 12 bytes.".repeat(1000), + "more unique.".repeat(1000), + "more unique2.".repeat(1000), + "BAZ".repeat(3000), + ]); + let total_strings2_len = strings2 + .iter() + .map(|s| s.map(|s| s.len()).unwrap_or(0)) + .sum::(); + let values2: ArrayRef = Arc::new(StringViewArray::from(strings2)); + + let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); + let size_empty = set.size(); + + set.insert(&values1); + let size_after_values1 = set.size(); + assert!(size_empty < size_after_values1); + assert!( + size_after_values1 > total_strings1_len, + "expect {size_after_values1} to be more than {total_strings1_len}" + ); + assert!(size_after_values1 < total_strings1_len + total_strings2_len); + + // inserting the same strings should not affect the size + set.insert(&values1); + assert_eq!(set.size(), size_after_values1); + + // inserting the large strings should increase the reported size + set.insert(&values2); + let size_after_values2 = set.size(); + assert!(size_after_values2 > size_after_values1); + + // the consumed size should be less than the sum of the sizes of the strings + // bc the strings are deduplicated + assert!(size_after_values2 < total_strings1_len + total_strings2_len); + } + + #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] + struct TestPayload { + // store the string value to check against input + index: usize, // store the index of the string (each new string gets the next sequential input) + } + + /// Wraps an [`ArrowBytesViewMap`], validating its invariants + struct TestMap { + map: ArrowBytesViewMap, + // stores distinct strings seen, in order + strings: Vec>, + // map strings to index in strings + indexes: HashMap, usize>, + } + + impl TestMap { + /// creates a map with TestPayloads for the given strings and then + /// validates the payloads + fn new() -> Self { + Self { + map: ArrowBytesViewMap::new(OutputType::Utf8View), + strings: vec![], + indexes: HashMap::new(), + } + } + + /// Inserts strings into the map + fn insert(&mut self, strings: &[Option<&str>]) { + let string_array = StringViewArray::from(strings.to_vec()); + let arr: ArrayRef = Arc::new(string_array); + + let mut next_index = self.indexes.len(); + let mut actual_new_strings = vec![]; + let mut actual_seen_indexes = vec![]; + // update self with new values, keeping track of newly added values + for str in strings { + let str = str.map(|s| s.to_string()); + let index = self.indexes.get(&str).cloned().unwrap_or_else(|| { + actual_new_strings.push(str.clone()); + let index = self.strings.len(); + self.strings.push(str.clone()); + self.indexes.insert(str, index); + index + }); + actual_seen_indexes.push(index); + } + + // insert the values into the map, recording what we did + let mut seen_new_strings = vec![]; + let mut seen_indexes = vec![]; + self.map.insert_if_new( + &arr, + |s| { + let value = s + .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); + let index = next_index; + next_index += 1; + seen_new_strings.push(value); + TestPayload { index } + }, + |payload| { + seen_indexes.push(payload.index); + }, + ); + + assert_eq!(actual_seen_indexes, seen_indexes); + assert_eq!(actual_new_strings, seen_new_strings); + } + + /// Call `self.map.into_array()` validating that the strings are in the same + /// order as they were inserted + fn into_array(self) -> ArrayRef { + let Self { + map, + strings, + indexes: _, + } = self; + + let arr = map.into_state(); + let expected: ArrayRef = Arc::new(StringViewArray::from(strings)); + assert_eq!(&arr, &expected); + arr + } + } + + #[test] + fn test_map() { + let input = vec![ + // Note mix of short/long strings + Some("A"), + Some("bcdefghijklmnop1234567"), + Some("X"), + Some("Y"), + None, + Some("qrstuvqxyzhjwya"), + Some("✨🔥"), + Some("🔥"), + Some("🔥🔥🔥🔥🔥🔥"), + ]; + + let mut test_map = TestMap::new(); + test_map.insert(&input); + test_map.insert(&input); // put it in twice + let expected_output: ArrayRef = Arc::new(StringViewArray::from(input)); + assert_eq!(&test_map.into_array(), &expected_output); + } } From 676b49b93151f5795439478c5de5a96883e8470a Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 12 Jul 2024 10:36:56 -0400 Subject: [PATCH 07/15] make doc happy --- datafusion/physical-expr-common/src/binary_view_map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index d90134a78c75..c9b89d71145b 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from +//! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from //! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. use ahash::RandomState; use arrow::array::cast::AsArray; @@ -111,7 +111,7 @@ impl ArrowBytesViewSet { /// in the same order as they were inserted. /// /// Note this structure can be used as a `HashSet` by specifying the value type -/// as `()`, as is done by [`ArrowByteViewSet`]. +/// as `()`, as is done by [`ArrowBytesViewSet`]. /// /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store @@ -437,7 +437,7 @@ where } } -/// Entry in the hash table -- see [`ArrowViewBytesMap`] for more details +/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] struct Entry where From 8bdfb1b8b980dc5a23792c2a7620a922d8af2b7c Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 12 Jul 2024 11:56:23 -0400 Subject: [PATCH 08/15] update new implementation --- .../src/binary_view_map.rs | 125 ++++-------------- 1 file changed, 27 insertions(+), 98 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c9b89d71145b..c030f78e2edd 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -19,13 +19,8 @@ //! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. use ahash::RandomState; use arrow::array::cast::AsArray; -use arrow::array::{ - Array, ArrayRef, BinaryViewArray, BooleanBufferBuilder, GenericByteViewArray, - StringViewArray, -}; -use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; +use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; -use arrow_data::ByteView; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; use std::fmt::Debug; @@ -127,8 +122,8 @@ where map: hashbrown::raw::RawTable>, /// Total size of the map in bytes map_size: usize, - view_buffers: Vec, - views: Vec, + + builder: GenericByteViewBuilder, /// random state used to generate hashes random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) @@ -137,10 +132,6 @@ where /// NOTE null_index is the logical index in the final array, not the index /// in the buffer null: Option<(V, usize)>, - - // the length of the input array. Used to determine if we want to gc the output array - // to avoid holding the view_buffers too long. - input_len: usize, } /// The size, in number of entries, of the initial hash table @@ -155,12 +146,10 @@ where output_type, map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY), map_size: 0, - views: Vec::new(), - view_buffers: Vec::new(), + builder: GenericByteViewBuilder::new(), random_state: RandomState::new(), hashes_buffer: vec![], null: None, - input_len: 0, } } @@ -258,25 +247,18 @@ where // step 2: insert each value into the set, if not already present let values = values.as_byte_view::(); - self.input_len += values.len(); - - let buffer_offset = self.view_buffers.len(); - self.view_buffers.extend_from_slice(values.data_buffers()); - // Ensure lengths are equivalent assert_eq!(values.len(), batch_hashes.len()); - for (view_idx, (value, &hash)) in - values.iter().zip(batch_hashes.iter()).enumerate() - { + for (value, &hash) in values.iter().zip(batch_hashes.iter()) { // handle null value let Some(value) = value else { let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { payload } else { let payload = make_payload_fn(None); - let null_index = self.views.len(); - self.views.push(0); + let null_index = self.builder.len(); + self.builder.append_null(); self.null = Some((payload, null_index)); payload }; @@ -288,32 +270,13 @@ where let value: &[u8] = value.as_ref(); let entry = self.map.get_mut(hash, |header| { - let v = unsafe { self.views.get_unchecked(header.view_idx) }; + let v = self.builder.get_value(header.view_idx); - let len = *v as u32; - if len as usize != value.len() { + if v.len() != value.len() { return false; } - // We should probably change arrow-rs to provide a - // GenericByteViewArray::value_from_view() method - let b = if len <= 12 { - unsafe { - GenericByteViewArray::::inline_value( - v, - len as usize, - ) - } - } else { - let view = ByteView::from(*v); - - let data = unsafe { - self.view_buffers.get_unchecked(view.buffer_index as usize) - }; - let offset = view.offset as usize; - unsafe { data.get_unchecked(offset..offset + len as usize) } - }; - b == value + v == value }); let payload = if let Some(entry) = entry { @@ -322,21 +285,14 @@ where // no existing value, make a new one. let payload = make_payload_fn(Some(value)); - let inner_view_idx = self.views.len(); + let inner_view_idx = self.builder.len(); let new_header = Entry { view_idx: inner_view_idx, payload, }; - let view = if value.len() <= 12 { - unsafe { *values.views().get_unchecked(view_idx) } - } else { - let v = unsafe { *values.views().get_unchecked(view_idx) }; - let mut v = ByteView::from(v); - v.buffer_index += buffer_offset as u32; - v.as_u128() - }; - self.views.push(view); + self.builder.append_value(value); + self.map .insert_accounted(new_header, |_| hash, &mut self.map_size); payload @@ -352,40 +308,20 @@ where /// The values are guaranteed to be returned in the same order in which /// they were first seen. pub fn into_state(self) -> ArrayRef { - // Only make a `NullBuffer` if there was a null value - let nulls = self.null.map(|(_payload, null_index)| { - let num_values = self.views.len(); - single_null_buffer(num_values, null_index) - }); - - let len = self.views.len(); - let b = ScalarBuffer::new(Buffer::from_vec(self.views), 0, len); + let mut builder = self.builder; match self.output_type { OutputType::BinaryView => { - // SAFETY: the offsets were constructed correctly - let mut array = unsafe { - BinaryViewArray::new_unchecked(b, self.view_buffers, nulls) - }; - if array.len() < (self.input_len / 2) { - // arrow gc by default will deduplicate strings, it should not do that. - // todo: file a ticket to change it. - array = array.gc(); - } + let array = builder.finish(); + Arc::new(array) } OutputType::Utf8View => { // SAFETY: - // 1. the offsets were constructed safely - // - // 2. we asserted the input arrays were all the correct type and + // we asserted the input arrays were all the correct type and // thus since all the values that went in were valid (e.g. utf8) // so are all the values that come out - let mut array = unsafe { - StringViewArray::new_unchecked(b, self.view_buffers, nulls) - }; - if array.len() < (self.input_len / 2) { - array = array.gc(); - } + let array = builder.finish(); + let array = unsafe { array.to_string_view_unchecked() }; Arc::new(array) } } @@ -409,19 +345,12 @@ where /// Return the total size, in bytes, of memory used to store the data in /// this set, not including `self` pub fn size(&self) -> usize { - // view buffers are from upstream string view, not technically from us. - self.map_size + self.views.allocated_size() + self.hashes_buffer.allocated_size() + self.map_size + + self.builder.allocated_size() + + self.hashes_buffer.allocated_size() } } -/// Returns a `NullBuffer` with a single null value at the given index -fn single_null_buffer(num_values: usize, null_index: usize) -> NullBuffer { - let mut bool_builder = BooleanBufferBuilder::new(num_values); - bool_builder.append_n(num_values, true); - bool_builder.set_bit(null_index, false); - NullBuffer::from(bool_builder.finish()) -} - impl Debug for ArrowBytesViewMap where V: Debug + PartialEq + Eq + Clone + Copy + Default, @@ -430,7 +359,7 @@ where f.debug_struct("ArrowBytesMap") .field("map", &"") .field("map_size", &self.map_size) - .field("views", &self.views) + .field("view_builder", &self.builder) .field("random_state", &self.random_state) .field("hashes_buffer", &self.hashes_buffer) .finish() @@ -452,6 +381,7 @@ where #[cfg(test)] mod tests { + use arrow::array::{BinaryViewArray, GenericByteViewArray, StringViewArray}; use hashbrown::HashMap; use super::*; @@ -608,7 +538,7 @@ mod tests { "BAR larger than 12 bytes.".repeat(1000), "more unique.".repeat(1000), "more unique2.".repeat(1000), - "BAZ".repeat(3000), + "FOO".repeat(3000), ]); let total_strings2_len = strings2 .iter() @@ -631,15 +561,14 @@ mod tests { // inserting the same strings should not affect the size set.insert(&values1); assert_eq!(set.size(), size_after_values1); + assert_eq!(set.len(), 5); // inserting the large strings should increase the reported size set.insert(&values2); let size_after_values2 = set.size(); assert!(size_after_values2 > size_after_values1); - // the consumed size should be less than the sum of the sizes of the strings - // bc the strings are deduplicated - assert!(size_after_values2 < total_strings1_len + total_strings2_len); + assert_eq!(set.len(), 10); } #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] From dc1a6695a3b1a3d808fb50817d111daf5a3db083 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 12 Jul 2024 12:09:13 -0400 Subject: [PATCH 09/15] fix bug --- datafusion/physical-expr-common/src/binary_view_map.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c030f78e2edd..88824ec52a09 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -135,7 +135,7 @@ where } /// The size, in number of entries, of the initial hash table -const INITIAL_MAP_CAPACITY: usize = 128; +const INITIAL_MAP_CAPACITY: usize = 512; impl ArrowBytesViewMap where @@ -288,13 +288,14 @@ where let inner_view_idx = self.builder.len(); let new_header = Entry { view_idx: inner_view_idx, + hash, payload, }; self.builder.append_value(value); self.map - .insert_accounted(new_header, |_| hash, &mut self.map_size); + .insert_accounted(new_header, |h| h.hash, &mut self.map_size); payload }; observe_payload_fn(payload); @@ -375,6 +376,8 @@ where /// The idx into the views array view_idx: usize, + hash: u64, + /// value stored by the entry payload: V, } From b0f8726a96adb80f59c262028dce780da431d607 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 09:31:42 -0400 Subject: [PATCH 10/15] avoid unused dep --- Cargo.toml | 1 - datafusion/physical-expr-common/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 00b22e8806ac..b1f07aa531df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,6 @@ arrow-array = { version = "52.1.0", default-features = false, features = [ "chrono-tz", ] } arrow-buffer = { version = "52.1.0", default-features = false } -arrow-data = { version = "52.1.0", default-features = false } arrow-flight = { version = "52.1.0", features = [ "flight-sql-experimental", ] } diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index bbb899d09fe0..3ef2d5345533 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -38,7 +38,6 @@ path = "src/lib.rs" [dependencies] ahash = { workspace = true } arrow = { workspace = true } -arrow-data = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } hashbrown = { workspace = true } From ca3d9b9e1b91150927ffa9b7a39f1d13b4df58e3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 09:33:48 -0400 Subject: [PATCH 11/15] update dep --- datafusion-cli/Cargo.lock | 247 +++----------------------------------- 1 file changed, 19 insertions(+), 228 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 48d996e5b721..af5d358b5064 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1369,7 +1369,6 @@ version = "40.0.0" dependencies = [ "ahash", "arrow", - "arrow-data", "datafusion-common", "datafusion-expr", "hashbrown 0.14.5", @@ -1490,17 +1489,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "displaydoc" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "doc-comment" version = "0.3.3" @@ -2063,134 +2051,14 @@ dependencies = [ "cc", ] -[[package]] -name = "icu_collections" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" -dependencies = [ - "displaydoc", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_locid" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" -dependencies = [ - "displaydoc", - "litemap", - "tinystr", - "writeable", - "zerovec", -] - -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" - -[[package]] -name = "icu_normalizer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_normalizer_data", - "icu_properties", - "icu_provider", - "smallvec", - "utf16_iter", - "utf8_iter", - "write16", - "zerovec", -] - -[[package]] -name = "icu_normalizer_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" - -[[package]] -name = "icu_properties" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_locid_transform", - "icu_properties_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_properties_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" - -[[package]] -name = "icu_provider" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_provider_macros", - "stable_deref_trait", - "tinystr", - "writeable", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "idna" -version = "1.0.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "icu_normalizer", - "icu_properties", - "smallvec", - "utf8_iter", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -2402,12 +2270,6 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" -[[package]] -name = "litemap" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" - [[package]] name = "lock_api" version = "0.4.12" @@ -3578,12 +3440,6 @@ dependencies = [ "syn 2.0.71", ] -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_assertions" version = "1.1.0" @@ -3780,16 +3636,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinystr" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" -dependencies = [ - "displaydoc", - "zerovec", -] - [[package]] name = "tinyvec" version = "1.8.0" @@ -3983,12 +3829,27 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -4030,18 +3891,6 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - -[[package]] -name = "utf8_iter" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" - [[package]] name = "utf8parse" version = "0.2.2" @@ -4392,18 +4241,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - -[[package]] -name = "writeable" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" - [[package]] name = "xmlparser" version = "0.13.6" @@ -4419,30 +4256,6 @@ dependencies = [ "lzma-sys", ] -[[package]] -name = "yoke" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" -dependencies = [ - "serde", - "stable_deref_trait", - "yoke-derive", - "zerofrom", -] - -[[package]] -name = "yoke-derive" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", - "synstructure", -] - [[package]] name = "zerocopy" version = "0.7.35" @@ -4469,28 +4282,6 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -[[package]] -name = "zerovec" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" -dependencies = [ - "yoke", - "zerofrom", - "zerovec-derive", -] - -[[package]] -name = "zerovec-derive" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "zstd" version = "0.12.4" From 8fffec571e3c480808ef242aec856e37541c1dda Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 11:09:18 -0400 Subject: [PATCH 12/15] update --- datafusion/common/src/hash_utils.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index a0c13c86fc1d..12cb9645498c 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -360,8 +360,6 @@ pub fn create_hashes<'a>( random_state: &RandomState, hashes_buffer: &'a mut Vec, ) -> Result<&'a mut Vec> { - use crate::cast::{as_binary_view_array, as_string_view_array}; - for (i, col) in arrays.iter().enumerate() { let array = col.as_ref(); // combine hashes with `combine_hashes` for all columns besides the first @@ -373,11 +371,9 @@ pub fn create_hashes<'a>( DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash), DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash), - DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash), DataType::Binary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), DataType::LargeBinary => hash_array(as_generic_binary_array::(array)?, random_state, hashes_buffer, rehash), - DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash), DataType::FixedSizeBinary(_) => { let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap(); hash_array(array, random_state, hashes_buffer, rehash) From d55567eb6b907bfa05cdf3f844ae06e51aaa185e Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 17 Jul 2024 11:20:36 -0400 Subject: [PATCH 13/15] fix cargo check --- datafusion/common/src/cast.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index 0f325d47cb62..0586fcf5e2ae 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -127,11 +127,6 @@ pub fn as_generic_binary_array( Ok(downcast_value!(array, GenericBinaryArray, T)) } -// Downcast ArrayRef to BinaryViewArray -pub fn as_binary_view_array(array: &dyn Array) -> Result<&BinaryViewArray> { - Ok(downcast_value!(array, BinaryViewArray)) -} - // Downcast ArrayRef to GenericListArray pub fn as_generic_list_array( array: &dyn Array, From 96c0367df696892c8cfa6fc1d39b0b2e4680f701 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 19 Jul 2024 15:27:58 -0400 Subject: [PATCH 14/15] update doc --- datafusion/physical-expr-common/src/binary_view_map.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index 88824ec52a09..b0860f292205 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -16,7 +16,9 @@ // under the License. //! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from -//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. +//! `StringViewArray`/`BinaryViewArray`. +//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the +//! [`GenericByteViewBuilder`]. use ahash::RandomState; use arrow::array::cast::AsArray; use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; From 9bef491dcdde0b538146a2d204839978756f4ef8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 19 Jul 2024 15:52:55 -0400 Subject: [PATCH 15/15] pick up the comments change again --- .../src/binary_view_map.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index b0860f292205..3eeab4a5af02 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -31,14 +31,14 @@ use std::sync::Arc; /// Should the output be a String or Binary? #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OutputType { - /// `StringArray` or `LargeStringArray` + /// `StringViewArray` Utf8View, - /// `BinaryArray` or `LargeBinaryArray` + /// `BinaryViewArray` BinaryView, } /// HashSet optimized for storing string or binary values that can produce that -/// the final set as a GenericStringArray with minimal copies. +/// the final set as a `GenericBinaryViewArray` with minimal copies. #[derive(Debug)] pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); @@ -55,9 +55,9 @@ impl ArrowBytesViewSet { .insert_if_new(values, make_payload_fn, observe_payload_fn); } - /// Converts this set into a `StringArray`/`LargeStringArray` or - /// `BinaryArray`/`LargeBinaryArray` containing each distinct value that - /// was interned. This is done without copying the values. + /// Converts this set into a `StringViewArray` or `BinaryViewArray` + /// containing each distinct value that was interned. + /// This is done without copying the values. pub fn into_state(self) -> ArrayRef { self.0.into_state() } @@ -103,7 +103,6 @@ impl ArrowBytesViewSet { /// the string values themselves, both when inserting and when emitting the /// final array. /// -/// /// 2. Retains the insertion order of entries in the final array. The values are /// in the same order as they were inserted. /// @@ -118,13 +117,14 @@ pub struct ArrowBytesViewMap where V: Debug + PartialEq + Eq + Clone + Copy + Default, { - /// Should the output be String or Binary? + /// Should the output be StringView or BinaryView? output_type: OutputType, /// Underlying hash set for each distinct value map: hashbrown::raw::RawTable>, /// Total size of the map in bytes map_size: usize, + /// Builder for output array builder: GenericByteViewBuilder, /// random state used to generate hashes random_state: RandomState, @@ -198,7 +198,7 @@ where MP: FnMut(Option<&[u8]>) -> V, OP: FnMut(V), { - // Sanity array type + // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView));