diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 90dfba0b1958..7105a6033693 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -46,4 +46,5 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.78" snmalloc-rs = { version = "0.3", optional = true } structopt = { version = "0.3", default-features = false } +test-utils = { path = "../test-utils/" } tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] } diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index f77cbc8fd680..3efa86f27e94 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -15,12 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ - Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, - UInt16Builder, -}; -use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; -use arrow::record_batch::RecordBatch; +use arrow::datatypes::SchemaRef; use arrow::util::pretty; use datafusion::common::{Result, ToDFSchema}; use datafusion::config::{ @@ -41,14 +36,12 @@ use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; use std::fs::File; -use std::ops::Range; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; use structopt::StructOpt; +use test_utils::AccessLogGenerator; #[cfg(feature = "snmalloc")] #[global_allocator] @@ -96,13 +89,14 @@ async fn main() -> Result<()> { let path = opt.path.join("logs.parquet"); - let (object_store_url, object_meta) = + let (schema, object_store_url, object_meta) = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; run_benchmarks( &mut ctx, - object_store_url.clone(), - object_meta.clone(), + schema, + object_store_url, + object_meta, opt.iterations, opt.debug, ) @@ -120,6 +114,7 @@ struct ParquetScanOptions { async fn run_benchmarks( ctx: &mut SessionContext, + schema: SchemaRef, object_store_url: ObjectStoreUrl, object_meta: ObjectMeta, iterations: usize, @@ -179,6 +174,7 @@ async fn run_benchmarks( let start = Instant::now(); let rows = exec_scan( ctx, + schema.clone(), object_store_url.clone(), object_meta.clone(), filter_expr.clone(), @@ -201,14 +197,13 @@ async fn run_benchmarks( async fn exec_scan( ctx: &SessionContext, + schema: SchemaRef, object_store_url: ObjectStoreUrl, object_meta: ObjectMeta, filter: Expr, scan_options: ParquetScanOptions, debug: bool, ) -> Result { - let schema = BatchBuilder::schema(); - let ParquetScanOptions { pushdown_filters, reorder_filters, @@ -263,8 +258,8 @@ fn gen_data( scale_factor: f32, page_size: Option, row_group_size: Option, -) -> Result<(ObjectStoreUrl, ObjectMeta)> { - let generator = Generator::new(); +) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> { + let generator = AccessLogGenerator::new(); let file = File::create(&path).unwrap(); @@ -280,9 +275,9 @@ fn gen_data( props_builder = props_builder.set_max_row_group_size(s); } + let schema = generator.schema(); let mut writer = - ArrowWriter::try_new(file, generator.schema.clone(), Some(props_builder.build())) - .unwrap(); + ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap(); let mut num_rows = 0; @@ -311,203 +306,5 @@ fn gen_data( size, }; - Ok((object_store_url, object_meta)) -} - -#[derive(Default)] -struct BatchBuilder { - service: StringDictionaryBuilder, - host: StringDictionaryBuilder, - pod: StringDictionaryBuilder, - container: StringDictionaryBuilder, - image: StringDictionaryBuilder, - time: TimestampNanosecondBuilder, - client_addr: StringBuilder, - request_duration: Int32Builder, - request_user_agent: StringBuilder, - request_method: StringBuilder, - request_host: StringBuilder, - request_bytes: Int32Builder, - response_bytes: Int32Builder, - response_status: UInt16Builder, -} - -impl BatchBuilder { - fn schema() -> SchemaRef { - let utf8_dict = - || DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - - Arc::new(Schema::new(vec![ - Field::new("service", utf8_dict(), true), - Field::new("host", utf8_dict(), false), - Field::new("pod", utf8_dict(), false), - Field::new("container", utf8_dict(), false), - Field::new("image", utf8_dict(), false), - Field::new( - "time", - DataType::Timestamp(TimeUnit::Nanosecond, None), - false, - ), - Field::new("client_addr", DataType::Utf8, true), - Field::new("request_duration_ns", DataType::Int32, false), - Field::new("request_user_agent", DataType::Utf8, true), - Field::new("request_method", DataType::Utf8, true), - Field::new("request_host", DataType::Utf8, true), - Field::new("request_bytes", DataType::Int32, true), - Field::new("response_bytes", DataType::Int32, true), - Field::new("response_status", DataType::UInt16, false), - ])) - } - - fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) { - let num_pods = rng.gen_range(1..15); - let pods = generate_sorted_strings(rng, num_pods, 30..40); - for pod in pods { - for container_idx in 0..rng.gen_range(1..3) { - let container = format!("{}_container_{}", service, container_idx); - let image = format!( - "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9", - container - ); - - let num_entries = rng.gen_range(1024..8192); - for i in 0..num_entries { - let time = i as i64 * 1024; - self.append_row(rng, host, &pod, service, &container, &image, time); - } - } - } - } - - #[allow(clippy::too_many_arguments)] - fn append_row( - &mut self, - rng: &mut StdRng, - host: &str, - pod: &str, - service: &str, - container: &str, - image: &str, - time: i64, - ) { - let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"]; - let status = &[200, 204, 400, 503, 403]; - - self.service.append(service).unwrap(); - self.host.append(host).unwrap(); - self.pod.append(pod).unwrap(); - self.container.append(container).unwrap(); - self.image.append(image).unwrap(); - self.time.append_value(time); - - self.client_addr.append_value(format!( - "{}.{}.{}.{}", - rng.gen::(), - rng.gen::(), - rng.gen::(), - rng.gen::() - )); - self.request_duration.append_value(rng.gen()); - self.request_user_agent - .append_value(random_string(rng, 20..100)); - self.request_method - .append_value(methods[rng.gen_range(0..methods.len())]); - self.request_host - .append_value(format!("https://{}.mydomain.com", service)); - - self.request_bytes - .append_option(rng.gen_bool(0.9).then(|| rng.gen())); - self.response_bytes - .append_option(rng.gen_bool(0.9).then(|| rng.gen())); - self.response_status - .append_value(status[rng.gen_range(0..status.len())]); - } - - fn finish(mut self, schema: SchemaRef) -> RecordBatch { - RecordBatch::try_new( - schema, - vec![ - Arc::new(self.service.finish()), - Arc::new(self.host.finish()), - Arc::new(self.pod.finish()), - Arc::new(self.container.finish()), - Arc::new(self.image.finish()), - Arc::new(self.time.finish()), - Arc::new(self.client_addr.finish()), - Arc::new(self.request_duration.finish()), - Arc::new(self.request_user_agent.finish()), - Arc::new(self.request_method.finish()), - Arc::new(self.request_host.finish()), - Arc::new(self.request_bytes.finish()), - Arc::new(self.response_bytes.finish()), - Arc::new(self.response_status.finish()), - ], - ) - .unwrap() - } -} - -fn random_string(rng: &mut StdRng, len_range: Range) -> String { - let len = rng.gen_range(len_range); - (0..len) - .map(|_| rng.gen_range(b'a'..=b'z') as char) - .collect::() -} - -fn generate_sorted_strings( - rng: &mut StdRng, - count: usize, - str_len: Range, -) -> Vec { - let mut strings: Vec<_> = (0..count) - .map(|_| random_string(rng, str_len.clone())) - .collect(); - - strings.sort_unstable(); - strings -} - -/// Generates sorted RecordBatch with an access log style schema for a single host -#[derive(Debug)] -struct Generator { - schema: SchemaRef, - rng: StdRng, - host_idx: usize, -} - -impl Generator { - fn new() -> Self { - let seed = [ - 1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0, - 0, 0, 0, 5, 0, 0, 0, 0, 0, - ]; - - Self { - schema: BatchBuilder::schema(), - host_idx: 0, - rng: StdRng::from_seed(seed), - } - } -} - -impl Iterator for Generator { - type Item = RecordBatch; - - fn next(&mut self) -> Option { - let mut builder = BatchBuilder::default(); - - let host = format!( - "i-{:016x}.ec2.internal", - self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928 - ); - self.host_idx += 1; - - for service in &["frontend", "backend", "database", "cache"] { - if self.rng.gen_bool(0.5) { - continue; - } - builder.append(&mut self.rng, &host, service); - } - Some(builder.finish(Arc::clone(&self.schema))) - } + Ok((schema, object_store_url, object_meta)) } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 4fef2f07a095..f5fbf997412d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -105,8 +105,8 @@ csv = "1.1.6" ctor = "0.1.22" doc-comment = "0.3" env_logger = "0.9" -fuzz-utils = { path = "fuzz-utils" } rstest = "0.15.0" +test-utils = { path = "../../test-utils" } [[bench]] harness = false diff --git a/datafusion/core/tests/join_fuzz.rs b/datafusion/core/tests/join_fuzz.rs index c5111a0750bf..8d4f31af5173 100644 --- a/datafusion/core/tests/join_fuzz.rs +++ b/datafusion/core/tests/join_fuzz.rs @@ -31,7 +31,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion_expr::JoinType; use datafusion::prelude::{SessionConfig, SessionContext}; -use fuzz_utils::add_empty_batches; +use test_utils::add_empty_batches; #[tokio::test] async fn test_inner_join_1k() { diff --git a/datafusion/core/tests/merge_fuzz.rs b/datafusion/core/tests/merge_fuzz.rs index 31ccc679cef2..2280cdeb6029 100644 --- a/datafusion/core/tests/merge_fuzz.rs +++ b/datafusion/core/tests/merge_fuzz.rs @@ -30,8 +30,8 @@ use datafusion::physical_plan::{ sorts::sort_preserving_merge::SortPreservingMergeExec, }; use datafusion::prelude::{SessionConfig, SessionContext}; -use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; use rand::{prelude::StdRng, Rng, SeedableRng}; +use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] async fn test_merge_2() { diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index faaa2ae0bb3d..ad39630afd8c 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -29,10 +29,10 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; -use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; use rand::prelude::StdRng; use rand::{Rng, SeedableRng}; use std::sync::Arc; +use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] #[cfg_attr(tarpaulin, ignore)] diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs index 625d70fa7b1b..e6c88e0a16d8 100644 --- a/datafusion/core/tests/sql/order.rs +++ b/datafusion/core/tests/sql/order.rs @@ -16,7 +16,7 @@ // under the License. use super::*; -use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec}; +use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] async fn test_sort_unprojected_col() -> Result<()> { diff --git a/datafusion/core/fuzz-utils/Cargo.toml b/test-utils/Cargo.toml similarity index 98% rename from datafusion/core/fuzz-utils/Cargo.toml rename to test-utils/Cargo.toml index ba876d4ffed0..85d1c0233880 100644 --- a/datafusion/core/fuzz-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] -name = "fuzz-utils" +name = "test-utils" version = "0.1.0" edition = "2021" diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs new file mode 100644 index 000000000000..a77728eea23c --- /dev/null +++ b/test-utils/src/data_gen.rs @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{ + Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, + UInt16Builder, +}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; +use arrow::record_batch::RecordBatch; +use rand::prelude::StdRng; +use rand::{Rng, SeedableRng}; + +#[derive(Default)] +struct BatchBuilder { + service: StringDictionaryBuilder, + host: StringDictionaryBuilder, + pod: StringDictionaryBuilder, + container: StringDictionaryBuilder, + image: StringDictionaryBuilder, + time: TimestampNanosecondBuilder, + client_addr: StringBuilder, + request_duration: Int32Builder, + request_user_agent: StringBuilder, + request_method: StringBuilder, + request_host: StringBuilder, + request_bytes: Int32Builder, + response_bytes: Int32Builder, + response_status: UInt16Builder, +} + +impl BatchBuilder { + fn schema() -> SchemaRef { + let utf8_dict = + || DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + Arc::new(Schema::new(vec![ + Field::new("service", utf8_dict(), true), + Field::new("host", utf8_dict(), false), + Field::new("pod", utf8_dict(), false), + Field::new("container", utf8_dict(), false), + Field::new("image", utf8_dict(), false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("client_addr", DataType::Utf8, true), + Field::new("request_duration_ns", DataType::Int32, false), + Field::new("request_user_agent", DataType::Utf8, true), + Field::new("request_method", DataType::Utf8, true), + Field::new("request_host", DataType::Utf8, true), + Field::new("request_bytes", DataType::Int32, true), + Field::new("response_bytes", DataType::Int32, true), + Field::new("response_status", DataType::UInt16, false), + ])) + } + + fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) { + let num_pods = rng.gen_range(1..15); + let pods = generate_sorted_strings(rng, num_pods, 30..40); + for pod in pods { + for container_idx in 0..rng.gen_range(1..3) { + let container = format!("{}_container_{}", service, container_idx); + let image = format!( + "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9", + container + ); + + let num_entries = rng.gen_range(1024..8192); + for i in 0..num_entries { + let time = i as i64 * 1024; + self.append_row(rng, host, &pod, service, &container, &image, time); + } + } + } + } + + #[allow(clippy::too_many_arguments)] + fn append_row( + &mut self, + rng: &mut StdRng, + host: &str, + pod: &str, + service: &str, + container: &str, + image: &str, + time: i64, + ) { + let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"]; + let status = &[200, 204, 400, 503, 403]; + + self.service.append(service).unwrap(); + self.host.append(host).unwrap(); + self.pod.append(pod).unwrap(); + self.container.append(container).unwrap(); + self.image.append(image).unwrap(); + self.time.append_value(time); + + self.client_addr.append_value(format!( + "{}.{}.{}.{}", + rng.gen::(), + rng.gen::(), + rng.gen::(), + rng.gen::() + )); + self.request_duration.append_value(rng.gen()); + self.request_user_agent + .append_value(random_string(rng, 20..100)); + self.request_method + .append_value(methods[rng.gen_range(0..methods.len())]); + self.request_host + .append_value(format!("https://{}.mydomain.com", service)); + + self.request_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_status + .append_value(status[rng.gen_range(0..status.len())]); + } + + fn finish(mut self, schema: SchemaRef) -> RecordBatch { + RecordBatch::try_new( + schema, + vec![ + Arc::new(self.service.finish()), + Arc::new(self.host.finish()), + Arc::new(self.pod.finish()), + Arc::new(self.container.finish()), + Arc::new(self.image.finish()), + Arc::new(self.time.finish()), + Arc::new(self.client_addr.finish()), + Arc::new(self.request_duration.finish()), + Arc::new(self.request_user_agent.finish()), + Arc::new(self.request_method.finish()), + Arc::new(self.request_host.finish()), + Arc::new(self.request_bytes.finish()), + Arc::new(self.response_bytes.finish()), + Arc::new(self.response_status.finish()), + ], + ) + .unwrap() + } +} + +fn random_string(rng: &mut StdRng, len_range: Range) -> String { + let len = rng.gen_range(len_range); + (0..len) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect::() +} + +fn generate_sorted_strings( + rng: &mut StdRng, + count: usize, + str_len: Range, +) -> Vec { + let mut strings: Vec<_> = (0..count) + .map(|_| random_string(rng, str_len.clone())) + .collect(); + + strings.sort_unstable(); + strings +} + +/// Iterator that generates sorted, [`RecordBatch`]es with randomly generated data with +/// an access log style schema for tracing or monitoring type +/// usecases. +/// +/// This is useful for writing tests queries on such data +#[derive(Debug)] +pub struct AccessLogGenerator { + schema: SchemaRef, + rng: StdRng, + host_idx: usize, +} + +impl Default for AccessLogGenerator { + fn default() -> Self { + Self::new() + } +} + +impl AccessLogGenerator { + pub fn new() -> Self { + let seed = [ + 1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0, + 0, 0, 0, 5, 0, 0, 0, 0, 0, + ]; + + Self { + schema: BatchBuilder::schema(), + host_idx: 0, + rng: StdRng::from_seed(seed), + } + } + + /// Return the schema of the [`RecordBatch`]es created + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Iterator for AccessLogGenerator { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + let mut builder = BatchBuilder::default(); + + let host = format!( + "i-{:016x}.ec2.internal", + self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928 + ); + self.host_idx += 1; + + for service in &["frontend", "backend", "database", "cache"] { + if self.rng.gen_bool(0.5) { + continue; + } + builder.append(&mut self.rng, &host, service); + } + Some(builder.finish(Arc::clone(&self.schema))) + } +} diff --git a/datafusion/core/fuzz-utils/src/lib.rs b/test-utils/src/lib.rs similarity index 96% rename from datafusion/core/fuzz-utils/src/lib.rs rename to test-utils/src/lib.rs index 920a9bc8d2f1..7f0e5ef0770d 100644 --- a/datafusion/core/fuzz-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -//! Common utils for fuzz tests +//! Common functions used for testing use arrow::{array::Int32Array, record_batch::RecordBatch}; use rand::prelude::StdRng; use rand::Rng; +mod data_gen; + +pub use data_gen::AccessLogGenerator; + pub use env_logger; /// Extracts the i32 values from the set of batches and returns them as a single Vec