Skip to content

Commit

Permalink
Put common code used for testing code into datafusion/test_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 26, 2022
1 parent 3940e36 commit 34708b6
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 224 deletions.
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
snmalloc-rs = { version = "0.3", optional = true }
structopt = { version = "0.3", default-features = false }
test-utils = { path = "../test-utils/" }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
231 changes: 14 additions & 217 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{
Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder,
UInt16Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::datatypes::SchemaRef;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
Expand All @@ -41,14 +36,12 @@ use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::fs::File;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;

#[cfg(feature = "snmalloc")]
#[global_allocator]
Expand Down Expand Up @@ -96,13 +89,14 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let (object_store_url, object_meta) =
let (schema, object_store_url, object_meta) =
gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;

run_benchmarks(
&mut ctx,
object_store_url.clone(),
object_meta.clone(),
schema,
object_store_url,
object_meta,
opt.iterations,
opt.debug,
)
Expand All @@ -120,6 +114,7 @@ struct ParquetScanOptions {

async fn run_benchmarks(
ctx: &mut SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
iterations: usize,
Expand Down Expand Up @@ -179,6 +174,7 @@ async fn run_benchmarks(
let start = Instant::now();
let rows = exec_scan(
ctx,
schema.clone(),
object_store_url.clone(),
object_meta.clone(),
filter_expr.clone(),
Expand All @@ -201,14 +197,13 @@ async fn run_benchmarks(

async fn exec_scan(
ctx: &SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let schema = BatchBuilder::schema();

let ParquetScanOptions {
pushdown_filters,
reorder_filters,
Expand Down Expand Up @@ -263,8 +258,8 @@ fn gen_data(
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
) -> Result<(ObjectStoreUrl, ObjectMeta)> {
let generator = Generator::new();
) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
let generator = AccessLogGenerator::new();

let file = File::create(&path).unwrap();

Expand All @@ -280,9 +275,9 @@ fn gen_data(
props_builder = props_builder.set_max_row_group_size(s);
}

let schema = generator.schema();
let mut writer =
ArrowWriter::try_new(file, generator.schema.clone(), Some(props_builder.build()))
.unwrap();
ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();

let mut num_rows = 0;

Expand Down Expand Up @@ -311,203 +306,5 @@ fn gen_data(
size,
};

Ok((object_store_url, object_meta))
}

#[derive(Default)]
struct BatchBuilder {
service: StringDictionaryBuilder<Int32Type>,
host: StringDictionaryBuilder<Int32Type>,
pod: StringDictionaryBuilder<Int32Type>,
container: StringDictionaryBuilder<Int32Type>,
image: StringDictionaryBuilder<Int32Type>,
time: TimestampNanosecondBuilder,
client_addr: StringBuilder,
request_duration: Int32Builder,
request_user_agent: StringBuilder,
request_method: StringBuilder,
request_host: StringBuilder,
request_bytes: Int32Builder,
response_bytes: Int32Builder,
response_status: UInt16Builder,
}

impl BatchBuilder {
fn schema() -> SchemaRef {
let utf8_dict =
|| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

Arc::new(Schema::new(vec![
Field::new("service", utf8_dict(), true),
Field::new("host", utf8_dict(), false),
Field::new("pod", utf8_dict(), false),
Field::new("container", utf8_dict(), false),
Field::new("image", utf8_dict(), false),
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("client_addr", DataType::Utf8, true),
Field::new("request_duration_ns", DataType::Int32, false),
Field::new("request_user_agent", DataType::Utf8, true),
Field::new("request_method", DataType::Utf8, true),
Field::new("request_host", DataType::Utf8, true),
Field::new("request_bytes", DataType::Int32, true),
Field::new("response_bytes", DataType::Int32, true),
Field::new("response_status", DataType::UInt16, false),
]))
}

fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
let num_pods = rng.gen_range(1..15);
let pods = generate_sorted_strings(rng, num_pods, 30..40);
for pod in pods {
for container_idx in 0..rng.gen_range(1..3) {
let container = format!("{}_container_{}", service, container_idx);
let image = format!(
"{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9",
container
);

let num_entries = rng.gen_range(1024..8192);
for i in 0..num_entries {
let time = i as i64 * 1024;
self.append_row(rng, host, &pod, service, &container, &image, time);
}
}
}
}

#[allow(clippy::too_many_arguments)]
fn append_row(
&mut self,
rng: &mut StdRng,
host: &str,
pod: &str,
service: &str,
container: &str,
image: &str,
time: i64,
) {
let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
let status = &[200, 204, 400, 503, 403];

self.service.append(service).unwrap();
self.host.append(host).unwrap();
self.pod.append(pod).unwrap();
self.container.append(container).unwrap();
self.image.append(image).unwrap();
self.time.append_value(time);

self.client_addr.append_value(format!(
"{}.{}.{}.{}",
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>(),
rng.gen::<u8>()
));
self.request_duration.append_value(rng.gen());
self.request_user_agent
.append_value(random_string(rng, 20..100));
self.request_method
.append_value(methods[rng.gen_range(0..methods.len())]);
self.request_host
.append_value(format!("https://{}.mydomain.com", service));

self.request_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_bytes
.append_option(rng.gen_bool(0.9).then(|| rng.gen()));
self.response_status
.append_value(status[rng.gen_range(0..status.len())]);
}

fn finish(mut self, schema: SchemaRef) -> RecordBatch {
RecordBatch::try_new(
schema,
vec![
Arc::new(self.service.finish()),
Arc::new(self.host.finish()),
Arc::new(self.pod.finish()),
Arc::new(self.container.finish()),
Arc::new(self.image.finish()),
Arc::new(self.time.finish()),
Arc::new(self.client_addr.finish()),
Arc::new(self.request_duration.finish()),
Arc::new(self.request_user_agent.finish()),
Arc::new(self.request_method.finish()),
Arc::new(self.request_host.finish()),
Arc::new(self.request_bytes.finish()),
Arc::new(self.response_bytes.finish()),
Arc::new(self.response_status.finish()),
],
)
.unwrap()
}
}

fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
let len = rng.gen_range(len_range);
(0..len)
.map(|_| rng.gen_range(b'a'..=b'z') as char)
.collect::<String>()
}

fn generate_sorted_strings(
rng: &mut StdRng,
count: usize,
str_len: Range<usize>,
) -> Vec<String> {
let mut strings: Vec<_> = (0..count)
.map(|_| random_string(rng, str_len.clone()))
.collect();

strings.sort_unstable();
strings
}

/// Generates sorted RecordBatch with an access log style schema for a single host
#[derive(Debug)]
struct Generator {
schema: SchemaRef,
rng: StdRng,
host_idx: usize,
}

impl Generator {
fn new() -> Self {
let seed = [
1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0,
0, 0, 0, 5, 0, 0, 0, 0, 0,
];

Self {
schema: BatchBuilder::schema(),
host_idx: 0,
rng: StdRng::from_seed(seed),
}
}
}

impl Iterator for Generator {
type Item = RecordBatch;

fn next(&mut self) -> Option<Self::Item> {
let mut builder = BatchBuilder::default();

let host = format!(
"i-{:016x}.ec2.internal",
self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
);
self.host_idx += 1;

for service in &["frontend", "backend", "database", "cache"] {
if self.rng.gen_bool(0.5) {
continue;
}
builder.append(&mut self.rng, &host, service);
}
Some(builder.finish(Arc::clone(&self.schema)))
}
Ok((schema, object_store_url, object_meta))
}
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
rstest = "0.15.0"
test-utils = { path = "../../test-utils" }

[[bench]]
harness = false
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion_expr::JoinType;

use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::add_empty_batches;
use test_utils::add_empty_batches;

#[tokio::test]
async fn test_inner_join_1k() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::{prelude::StdRng, Rng, SeedableRng};
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
async fn test_merge_2() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
use test_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::*;
use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec};

#[tokio::test]
async fn test_sort_unprojected_col() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[package]
name = "fuzz-utils"
name = "test-utils"
version = "0.1.0"
edition = "2021"

Expand Down
Loading

0 comments on commit 34708b6

Please sign in to comment.