From a2b04ad44489737eadf7692e041154814544f359 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 30 Aug 2023 14:18:25 +0300 Subject: [PATCH] storage: teach load generator sources to resume The current load generator framework requires each implementation to always produce its differential history from the beginning of time. The framework relied on this behavior to be able to count up the number of transactions already emitted and provide a definite behavior. This is suboptimal however as many load generator implementations would rather skip forward to the resumption point without generating all this data for it to be immediately discarded. This PR teaches the load generator framework about resumption in the form of a `resume_offset` argument passed to anyone implementing the `Generator` trait. The expectation is that the source is obligated to produce all the updates that happen beyond `resume_offset` and may skip producing the updates that happen not beyond `resume_offset`. The latter is not an obligations and for sources that is beneficial for impementation reasons to reproduce their history they are free to do so. --- src/sql/src/plan/statement/ddl.rs | 7 +- src/storage-client/src/types/sources.proto | 4 +- src/storage-client/src/types/sources.rs | 15 +- src/storage/src/source/generator.rs | 63 ++-- src/storage/src/source/generator/auction.rs | 111 +++--- src/storage/src/source/generator/counter.rs | 81 ++--- src/storage/src/source/generator/datums.rs | 47 +-- src/storage/src/source/generator/marketing.rs | 214 ++++++------ src/storage/src/source/generator/tpch.rs | 315 ++++++++---------- test/testdrive/dataflow-cleanup.td | 8 +- test/testdrive/load-generator.td | 2 +- 11 files changed, 410 insertions(+), 457 deletions(-) diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 3f4ad2b034ede..45b801ddc3478 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -1375,7 +1375,7 @@ generate_extracted_config!( LoadGeneratorOption, (TickInterval, Interval), (ScaleFactor, f64), - (MaxCardinality, i64) + (MaxCardinality, u64) ); pub(crate) fn load_generator_ast_to_generator( @@ -1394,11 +1394,6 @@ pub(crate) fn load_generator_ast_to_generator( let LoadGeneratorOptionExtracted { max_cardinality, .. } = options.to_vec().try_into()?; - if let Some(max_cardinality) = max_cardinality { - if max_cardinality < 0 { - sql_bail!("unsupported max cardinality {max_cardinality}"); - } - } LoadGenerator::Counter { max_cardinality } } mz_sql_parser::ast::LoadGenerator::Marketing => LoadGenerator::Marketing, diff --git a/src/storage-client/src/types/sources.proto b/src/storage-client/src/types/sources.proto index f516fa5efbd6e..91844fc316cd6 100644 --- a/src/storage-client/src/types/sources.proto +++ b/src/storage-client/src/types/sources.proto @@ -222,9 +222,7 @@ message ProtoTestScriptSourceConnection { } message ProtoCounterLoadGenerator { - // Must be non-negative, - // but kept as int64 to make downstream logic simpler - optional int64 max_cardinality = 1; + optional uint64 max_cardinality = 1; } message ProtoTpchLoadGenerator { diff --git a/src/storage-client/src/types/sources.rs b/src/storage-client/src/types/sources.rs index 826de32f849d1..8aa41274e8c64 100644 --- a/src/storage-client/src/types/sources.rs +++ b/src/storage-client/src/types/sources.rs @@ -40,6 +40,7 @@ use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; use prost::Message; use serde::{Deserialize, Serialize}; +use timely::dataflow::operators::to_stream::Event; use timely::order::{PartialOrder, TotalOrder}; use timely::progress::timestamp::Refines; use timely::progress::{PathSummary, Timestamp}; @@ -2182,10 +2183,7 @@ pub enum LoadGenerator { /// How many values will be emitted /// before old ones are retracted, or `None` for /// an append-only collection. - /// - /// This is verified by the planner to be nonnegative. We encode it as - /// an `i64` to make the code in `Counter::by_seed` simpler. - max_cardinality: Option, + max_cardinality: Option, }, Datums, Marketing, @@ -2474,13 +2472,8 @@ pub trait Generator { &self, now: NowFn, seed: Option, - ) -> Box>; -} - -#[derive(Clone, Copy, Debug)] -pub enum GeneratorMessageType { - InProgress, - Finalized, + resume_offset: MzOffset, + ) -> Box, (Row, i64)>)>>; } impl RustType for LoadGeneratorSourceConnection { diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 13e6f6416f5dd..5103454e19dac 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -13,15 +13,14 @@ use std::rc::Rc; use std::time::Duration; use differential_dataflow::{AsCollection, Collection}; -use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_repr::{Diff, Row}; use mz_storage_client::types::connections::ConnectionContext; use mz_storage_client::types::sources::{ - Generator, GeneratorMessageType, LoadGenerator, LoadGeneratorSourceConnection, MzOffset, - SourceTimestamp, + Generator, LoadGenerator, LoadGeneratorSourceConnection, MzOffset, SourceTimestamp, }; use mz_timely_util::builder_async::OperatorBuilder as AsyncOperatorBuilder; +use timely::dataflow::operators::to_stream::Event; use timely::dataflow::operators::ToStream; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; @@ -111,42 +110,38 @@ impl SourceRender for LoadGeneratorSourceConnection { .map(MzOffset::decode_row), ); - let Some(mut offset) = resume_upper.into_option() else { - return; + let Some(resume_offset) = resume_upper.into_option() else { + return }; - cap.downgrade(&offset); - - let mut rows = as_generator(&self.load_generator, self.tick_micros) - .by_seed(mz_ore::now::SYSTEM_TIME.clone(), None); + cap.downgrade(&resume_offset); - // Skip forward to the requested offset. - let tx_count = usize::cast_from(offset.offset); - let txns = rows - .by_ref() - .filter(|(_, typ, _, _)| matches!(typ, GeneratorMessageType::Finalized)) - .take(tx_count) - .count(); - assert_eq!(txns, tx_count, "produced wrong number of transactions"); + let mut rows = as_generator(&self.load_generator, self.tick_micros).by_seed( + mz_ore::now::SYSTEM_TIME.clone(), + None, + resume_offset, + ); let tick = Duration::from_micros(self.tick_micros.unwrap_or(1_000_000)); - while let Some((output, typ, value, diff)) = rows.next() { - let message = ( - output, - Ok(SourceMessage { - upstream_time_millis: None, - key: (), - value, - headers: None, - }), - ); - - data_output.give(&cap, (message, offset, diff)).await; - - if matches!(typ, GeneratorMessageType::Finalized) { - offset += 1; - cap.downgrade(&offset); - tokio::time::sleep(tick).await; + while let Some((output, event)) = rows.next() { + match event { + Event::Message(offset, (value, diff)) => { + let message = ( + output, + Ok(SourceMessage { + upstream_time_millis: None, + key: (), + value, + headers: None, + }), + ); + data_output.give(&cap, (message, offset, diff)).await; + } + Event::Progress(Some(offset)) => { + cap.downgrade(&offset); + tokio::time::sleep(tick).await; + } + Event::Progress(None) => return, } } }); diff --git a/src/storage/src/source/generator/auction.rs b/src/storage/src/source/generator/auction.rs index cf7b68ecab6e6..d19500c6cf3a8 100644 --- a/src/storage/src/source/generator/auction.rs +++ b/src/storage/src/source/generator/auction.rs @@ -12,10 +12,11 @@ use std::iter; use mz_ore::now::{to_datetime, NowFn}; use mz_repr::{Datum, Row}; -use mz_storage_client::types::sources::{Generator, GeneratorMessageType}; +use mz_storage_client::types::sources::{Generator, MzOffset}; use rand::prelude::{Rng, SmallRng}; use rand::seq::SliceRandom; use rand::SeedableRng; +use timely::dataflow::operators::to_stream::Event; /// CREATE TABLE organizations /// ( @@ -74,7 +75,8 @@ impl Generator for Auction { &self, now: NowFn, seed: Option, - ) -> Box<(dyn Iterator)> { + _resume_offset: MzOffset, + ) -> Box<(dyn Iterator, (Row, i64)>)>)> { let mut rng = SmallRng::seed_from_u64(seed.unwrap_or_default()); let organizations = COMPANIES.iter().enumerate().map(|(offset, name)| { @@ -115,57 +117,64 @@ impl Generator for Auction { let mut pending: VecDeque<(usize, Row)> = organizations.chain(users).chain(accounts).collect(); - Box::new(iter::from_fn(move || { - { - if pending.is_empty() { - counter += 1; - let now = to_datetime(now()); - let mut auction = Row::with_capacity(4); - let mut packer = auction.packer(); - packer.push(Datum::Int64(counter)); // auction id - let max_seller_id = - i64::try_from(CELEBRETIES.len()).expect("demo entries less than i64::MAX"); - packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // seller - packer.push(Datum::String(AUCTIONS.choose(&mut rng).unwrap())); // item - packer.push(Datum::TimestampTz( - (now + chrono::Duration::seconds(10)) - .try_into() - .expect("timestamp must fit"), - )); // end time - pending.push_back((AUCTIONS_OUTPUT, auction)); - const MAX_BIDS: i64 = 10; - for i in 0..rng.gen_range(2..MAX_BIDS) { - let bid_id = Datum::Int64(counter * MAX_BIDS + i); - let bid = { - let mut bid = Row::with_capacity(5); - let mut packer = bid.packer(); - packer.push(bid_id); - packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // buyer - packer.push(Datum::Int64(counter)); // auction id - packer.push(Datum::Int32(rng.gen_range(1..100))); // amount - packer.push(Datum::TimestampTz( - (now + chrono::Duration::seconds(i)) - .try_into() - .expect("timestamp must fit"), - )); // bid time - bid - }; - pending.push_back((BIDS_OUTPUT, bid)); + let mut offset = 0; + Box::new( + iter::from_fn(move || { + { + if pending.is_empty() { + counter += 1; + let now = to_datetime(now()); + let mut auction = Row::with_capacity(4); + let mut packer = auction.packer(); + packer.push(Datum::Int64(counter)); // auction id + let max_seller_id = i64::try_from(CELEBRETIES.len()) + .expect("demo entries less than i64::MAX"); + packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // seller + packer.push(Datum::String(AUCTIONS.choose(&mut rng).unwrap())); // item + packer.push(Datum::TimestampTz( + (now + chrono::Duration::seconds(10)) + .try_into() + .expect("timestamp must fit"), + )); // end time + pending.push_back((AUCTIONS_OUTPUT, auction)); + const MAX_BIDS: i64 = 10; + for i in 0..rng.gen_range(2..MAX_BIDS) { + let bid_id = Datum::Int64(counter * MAX_BIDS + i); + let bid = { + let mut bid = Row::with_capacity(5); + let mut packer = bid.packer(); + packer.push(bid_id); + packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // buyer + packer.push(Datum::Int64(counter)); // auction id + packer.push(Datum::Int32(rng.gen_range(1..100))); // amount + packer.push(Datum::TimestampTz( + (now + chrono::Duration::seconds(i)) + .try_into() + .expect("timestamp must fit"), + )); // bid time + bid + }; + pending.push_back((BIDS_OUTPUT, bid)); + } } + // Pop from the front so auctions always appear before bids. + let pend = pending.pop_front(); + pend.map(|(output, row)| { + let msg = (output, Event::Message(MzOffset::from(offset), (row, 1))); + + // The first batch (orgs, users, accounts) is a single txn, all others (auctions and bids) are separate. + let progress = if counter != 0 || pending.is_empty() { + offset += 1; + Some((output, Event::Progress(Some(MzOffset::from(offset))))) + } else { + None + }; + std::iter::once(msg).chain(progress) + }) } - // Pop from the front so auctions always appear before bids. - let pend = pending.pop_front(); - pend.map(|(output, row)| { - // The first batch (orgs, users, accounts) is a single txn, all others (auctions and bids) are separate. - let typ = if counter != 0 || pending.is_empty() { - GeneratorMessageType::Finalized - } else { - GeneratorMessageType::InProgress - }; - (output, typ, row, 1) - }) - } - })) + }) + .flatten(), + ) } } diff --git a/src/storage/src/source/generator/counter.rs b/src/storage/src/source/generator/counter.rs index d8d87986a246e..22afb1940e051 100644 --- a/src/storage/src/source/generator/counter.rs +++ b/src/storage/src/source/generator/counter.rs @@ -7,11 +7,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::iter; - use mz_ore::now::NowFn; use mz_repr::{Datum, Row}; -use mz_storage_client::types::sources::{Generator, GeneratorMessageType}; +use mz_storage_client::types::sources::{Generator, MzOffset}; +use timely::dataflow::operators::to_stream::Event; pub struct Counter { /// How many values will be emitted before old ones are retracted, @@ -19,10 +18,7 @@ pub struct Counter { /// behavior is changed, /// `mz_storage_client::types::sources::LoadGenerator::is_monotonic` /// must be updated. - /// - /// This is verified by the planner to be nonnegative. We encode it as - /// an `i64` to make the code in `Counter::by_seed` simpler. - pub max_cardinality: Option, + pub max_cardinality: Option, } impl Generator for Counter { @@ -30,50 +26,39 @@ impl Generator for Counter { &self, _now: NowFn, _seed: Option, - ) -> Box> { - let mut counter = 0; + resume_offset: MzOffset, + ) -> Box<(dyn Iterator, (Row, i64)>)>)> { let max_cardinality = self.max_cardinality; + Box::new( - iter::repeat_with(move || { - let to_retract = match max_cardinality { - Some(max) => { - if max <= counter { - Some(counter - max + 1) - } else { - None + (resume_offset.offset..) + .map(move |offset| { + let retraction = match max_cardinality { + // At offset `max` we must start retracting the value of `offset - max`. For + // example if max_cardinality is 2 then the collection should contain: + // (1, 0, +1) + // (2, 1, +1) + // (1, 2, -1) <- Here offset becomes >= max and we retract the value that was + // (3, 2, +1) emitted at (offset - max), which equals (offset - max + 1). + // (2, 3, -1) + // (4, 3, +1) + Some(max) if offset >= max => { + let retracted_value = i64::try_from(offset - max + 1).unwrap(); + let row = Row::pack_slice(&[Datum::Int64(retracted_value)]); + Some((0, Event::Message(MzOffset::from(offset), (row, -1)))) } - } - None => None, - }; - counter += 1; - // NB: we could get rid of this allocation with - // judicious use of itertools::Either, if it were - // important to highly optimize this code path. - if let Some(to_retract) = to_retract { - vec![ - ( - 0, - GeneratorMessageType::InProgress, - Row::pack_slice(&[Datum::Int64(counter)]), - 1, - ), - ( - 0, - GeneratorMessageType::Finalized, - Row::pack_slice(&[Datum::Int64(to_retract)]), - -1, - ), - ] - } else { - vec![( - 0, - GeneratorMessageType::Finalized, - Row::pack_slice(&[Datum::Int64(counter)]), - 1, - )] - } - }) - .flatten(), + _ => None, + }; + + let inserted_value = i64::try_from(offset + 1).unwrap(); + let row = Row::pack_slice(&[Datum::Int64(inserted_value)]); + let insertion = [ + (0, Event::Message(MzOffset::from(offset), (row, 1))), + (0, Event::Progress(Some(MzOffset::from(offset + 1)))), + ]; + retraction.into_iter().chain(insertion) + }) + .flatten(), ) } } diff --git a/src/storage/src/source/generator/datums.rs b/src/storage/src/source/generator/datums.rs index 7ed805db5164b..ea2fa04012bd8 100644 --- a/src/storage/src/source/generator/datums.rs +++ b/src/storage/src/source/generator/datums.rs @@ -11,7 +11,8 @@ use std::iter; use mz_ore::now::NowFn; use mz_repr::{Datum, Row, ScalarType}; -use mz_storage_client::types::sources::{Generator, GeneratorMessageType}; +use mz_storage_client::types::sources::{Generator, MzOffset}; +use timely::dataflow::operators::to_stream::Event; pub struct Datums {} @@ -23,7 +24,8 @@ impl Generator for Datums { &self, _: NowFn, _seed: Option, - ) -> Box> { + _resume_offset: MzOffset, + ) -> Box<(dyn Iterator, (Row, i64)>)>)> { let typs = ScalarType::enumerate(); let mut datums: Vec> = typs .iter() @@ -45,22 +47,29 @@ impl Generator for Datums { .collect(), ); let mut idx = 0; - Box::new(iter::from_fn(move || { - if idx == len { - return None; - } - let mut row = Row::with_capacity(datums.len()); - let mut packer = row.packer(); - for d in &datums { - packer.push(d[idx]); - } - idx += 1; - let message = if idx == len { - GeneratorMessageType::Finalized - } else { - GeneratorMessageType::InProgress - }; - Some((0, message, row, 1)) - })) + let mut offset = 0; + Box::new( + iter::from_fn(move || { + if idx == len { + return None; + } + let mut row = Row::with_capacity(datums.len()); + let mut packer = row.packer(); + for d in &datums { + packer.push(d[idx]); + } + let msg = (0, Event::Message(MzOffset::from(offset), (row, 1))); + + idx += 1; + let progress = if idx == len { + offset += 1; + Some((0, Event::Progress(Some(MzOffset::from(offset))))) + } else { + None + }; + Some(std::iter::once(msg).chain(progress)) + }) + .flatten(), + ) } } diff --git a/src/storage/src/source/generator/marketing.rs b/src/storage/src/source/generator/marketing.rs index 5a926136b787e..dabb3bdc37791 100644 --- a/src/storage/src/source/generator/marketing.rs +++ b/src/storage/src/source/generator/marketing.rs @@ -14,8 +14,9 @@ use std::{ use mz_ore::now::to_datetime; use mz_repr::{Datum, Row}; -use mz_storage_client::types::sources::{Generator, GeneratorMessageType}; +use mz_storage_client::types::sources::{Generator, MzOffset}; use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng}; +use timely::dataflow::operators::to_stream::Event; const CUSTOMERS_OUTPUT: usize = 1; const IMPRESSIONS_OUTPUT: usize = 2; @@ -37,16 +38,8 @@ impl Generator for Marketing { &self, now: mz_ore::now::NowFn, seed: Option, - ) -> Box< - dyn Iterator< - Item = ( - usize, - mz_storage_client::types::sources::GeneratorMessageType, - mz_repr::Row, - i64, - ), - >, - > { + _resume_offset: MzOffset, + ) -> Box<(dyn Iterator, (Row, i64)>)>)> { let mut rng: SmallRng = SmallRng::seed_from_u64(seed.unwrap_or_default()); let mut counter = 0; @@ -67,136 +60,143 @@ impl Generator for Marketing { }) .collect(); - Box::new(iter::from_fn(move || { - if pending.is_empty() { - let mut impression = Row::with_capacity(4); - let mut packer = impression.packer(); + let mut offset = 0; + Box::new( + iter::from_fn(move || { + if pending.is_empty() { + let mut impression = Row::with_capacity(4); + let mut packer = impression.packer(); - let impression_id = counter; - counter += 1; - - packer.push(Datum::Int64(impression_id)); - packer.push(Datum::Int64( - rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(), - )); - packer.push(Datum::Int64(rng.gen_range(0..20i64))); - let impression_time = now(); - packer.push(Datum::TimestampTz( - to_datetime(impression_time) - .try_into() - .expect("timestamp must fit"), - )); - - pending.push((IMPRESSIONS_OUTPUT, impression, 1)); - - // 1 in 10 impressions have a click. Making us the - // most successful marketing organization in the world. - if rng.gen_range(0..10) == 1 { - let mut click = Row::with_capacity(2); - let mut packer = click.packer(); - - let click_time = impression_time + rng.gen_range(20000..40000); + let impression_id = counter; + counter += 1; packer.push(Datum::Int64(impression_id)); + packer.push(Datum::Int64( + rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(), + )); + packer.push(Datum::Int64(rng.gen_range(0..20i64))); + let impression_time = now(); packer.push(Datum::TimestampTz( - to_datetime(click_time) + to_datetime(impression_time) .try_into() .expect("timestamp must fit"), )); - future_updates.insert(click_time, (CLICK_OUTPUT, click, 1)); - } + pending.push((IMPRESSIONS_OUTPUT, impression, 1)); - let mut updates = future_updates.retrieve(now()); - pending.append(&mut updates); + // 1 in 10 impressions have a click. Making us the + // most successful marketing organization in the world. + if rng.gen_range(0..10) == 1 { + let mut click = Row::with_capacity(2); + let mut packer = click.packer(); - for _ in 0..rng.gen_range(1..2) { - let id = counter; - counter += 1; + let click_time = impression_time + rng.gen_range(20000..40000); - let mut lead = Lead { - id, - customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(), - created_at: now(), - converted_at: None, - conversion_amount: None, - }; + packer.push(Datum::Int64(impression_id)); + packer.push(Datum::TimestampTz( + to_datetime(click_time) + .try_into() + .expect("timestamp must fit"), + )); - pending.push((LEADS_OUTPUT, lead.to_row(), 1)); + future_updates.insert(click_time, (CLICK_OUTPUT, click, 1)); + } - // a highly scientific statistical model - // predicting the likelyhood of a conversion - let score = rng.sample::(Standard); - let label = score > 0.5f64; + let mut updates = future_updates.retrieve(now()); + pending.append(&mut updates); - let bucket = if lead.id % 10 <= 1 { - CONTROL - } else { - EXPERIMENT - }; + for _ in 0..rng.gen_range(1..2) { + let id = counter; + counter += 1; - let mut prediction = Row::with_capacity(4); - let mut packer = prediction.packer(); + let mut lead = Lead { + id, + customer_id: rng.gen_range(0..CUSTOMERS.len()).try_into().unwrap(), + created_at: now(), + converted_at: None, + conversion_amount: None, + }; - packer.push(Datum::Int64(lead.id)); - packer.push(Datum::String(bucket)); - packer.push(Datum::TimestampTz( - to_datetime(now()).try_into().expect("timestamp must fit"), - )); - packer.push(Datum::Float64(score.into())); + pending.push((LEADS_OUTPUT, lead.to_row(), 1)); - pending.push((CONVERSIONS_PREDICTIONS_OUTPUT, prediction, 1)); + // a highly scientific statistical model + // predicting the likelyhood of a conversion + let score = rng.sample::(Standard); + let label = score > 0.5f64; - let mut sent_coupon = false; - if !label && bucket == EXPERIMENT { - sent_coupon = true; - let amount = rng.gen_range(500..5000); + let bucket = if lead.id % 10 <= 1 { + CONTROL + } else { + EXPERIMENT + }; - let mut coupon = Row::with_capacity(4); - let mut packer = coupon.packer(); + let mut prediction = Row::with_capacity(4); + let mut packer = prediction.packer(); - let id = counter; - counter += 1; - packer.push(Datum::Int64(id)); packer.push(Datum::Int64(lead.id)); + packer.push(Datum::String(bucket)); packer.push(Datum::TimestampTz( to_datetime(now()).try_into().expect("timestamp must fit"), )); - packer.push(Datum::Int64(amount)); + packer.push(Datum::Float64(score.into())); - pending.push((COUPONS_OUTPUT, coupon, 1)); - } + pending.push((CONVERSIONS_PREDICTIONS_OUTPUT, prediction, 1)); - // Decide if a lead will convert. We assume our model is fairly - // accurate and correlates with conversions. We also assume - // that coupons make leads a little more liekly to convert. - let mut converted = rng.sample::(Standard) < score; - if sent_coupon && !converted { - converted = rng.sample::(Standard) < score; - } + let mut sent_coupon = false; + if !label && bucket == EXPERIMENT { + sent_coupon = true; + let amount = rng.gen_range(500..5000); + + let mut coupon = Row::with_capacity(4); + let mut packer = coupon.packer(); + + let id = counter; + counter += 1; + packer.push(Datum::Int64(id)); + packer.push(Datum::Int64(lead.id)); + packer.push(Datum::TimestampTz( + to_datetime(now()).try_into().expect("timestamp must fit"), + )); + packer.push(Datum::Int64(amount)); - if converted { - let converted_at = now() + rng.gen_range(1..30); + pending.push((COUPONS_OUTPUT, coupon, 1)); + } - future_updates.insert(converted_at, (LEADS_OUTPUT, lead.to_row(), -1)); + // Decide if a lead will convert. We assume our model is fairly + // accurate and correlates with conversions. We also assume + // that coupons make leads a little more liekly to convert. + let mut converted = rng.sample::(Standard) < score; + if sent_coupon && !converted { + converted = rng.sample::(Standard) < score; + } - lead.converted_at = Some(converted_at); - lead.conversion_amount = Some(rng.gen_range(1000..25000)); + if converted { + let converted_at = now() + rng.gen_range(1..30); - future_updates.insert(converted_at, (LEADS_OUTPUT, lead.to_row(), 1)); + future_updates.insert(converted_at, (LEADS_OUTPUT, lead.to_row(), -1)); + + lead.converted_at = Some(converted_at); + lead.conversion_amount = Some(rng.gen_range(1000..25000)); + + future_updates.insert(converted_at, (LEADS_OUTPUT, lead.to_row(), 1)); + } } } - } - pending.pop().map(|(output, row, diff)| { - let typ = if pending.is_empty() { - GeneratorMessageType::Finalized - } else { - GeneratorMessageType::InProgress - }; - (output, typ, row, diff) + pending.pop().map(|(output, row, diff)| { + let msg = (output, Event::Message(MzOffset::from(offset), (row, diff))); + + let progress = if pending.is_empty() { + offset += 1; + Some((output, Event::Progress(Some(MzOffset::from(offset))))) + } else { + None + }; + std::iter::once(msg).chain(progress) + }) }) - })) + .flatten(), + ) } } diff --git a/src/storage/src/source/generator/tpch.rs b/src/storage/src/source/generator/tpch.rs index 65f301ac21665..cbad5b6580814 100644 --- a/src/storage/src/source/generator/tpch.rs +++ b/src/storage/src/source/generator/tpch.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::VecDeque; use std::fmt::Display; use std::iter; use std::ops::RangeInclusive; @@ -18,12 +19,13 @@ use mz_ore::now::NowFn; use mz_repr::adt::date::Date; use mz_repr::adt::numeric::{self, DecimalLike, Numeric}; use mz_repr::{Datum, Row}; -use mz_storage_client::types::sources::{Generator, GeneratorMessageType}; +use mz_storage_client::types::sources::{Generator, MzOffset}; use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; use rand::rngs::StdRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; +use timely::dataflow::operators::to_stream::Event; #[derive(Clone, Debug)] pub struct Tpch { @@ -49,7 +51,8 @@ impl Generator for Tpch { &self, _: NowFn, seed: Option, - ) -> Box> { + _resume_offset: MzOffset, + ) -> Box<(dyn Iterator, (Row, i64)>)>)> { let mut rng = StdRng::seed_from_u64(seed.unwrap_or_default()); let mut ctx = Context { tpch: self.clone(), @@ -74,197 +77,163 @@ impl Generator for Tpch { // Some rows need to generate other rows from their values; hold those // here. - let mut pending = Vec::new(); + let mut pending = VecDeque::new(); // All orders and their lineitems, so they can be retracted during // streaming. let mut active_orders = Vec::new(); + let mut offset = 0; Box::new(iter::from_fn(move || { - if let Some(pending) = pending.pop() { + if let Some(pending) = pending.pop_front() { return Some(pending); } - rows.next() - .map(|(output, key)| { - let key_usize = usize::try_from(key).expect("key known to be non-negative"); - let row = match output { - SUPPLIER_OUTPUT => { - let nation = rng.gen_range(0..count_nation); - Row::pack_slice(&[ - Datum::Int64(key), - Datum::String(&pad_nine("Supplier", key)), - Datum::String(&v_string(&mut rng, 10, 40)), // address - Datum::Int64(nation), - Datum::String(&phone(&mut rng, nation)), - Datum::Numeric(decimal( - &mut rng, - &mut ctx.cx, - -999_99, - 9_999_99, - 100, - )), // acctbal - // TODO: add customer complaints and recommends, see 4.2.3. - Datum::String(text_string( - &mut rng, - &ctx.text_string_source, - 25, - 100, - )), - ]) - } - PART_OUTPUT => { - let name: String = PARTNAMES - .choose_multiple(&mut rng, 5) - .cloned() - .collect::>() - .join(" "); - let m = rng.gen_range(1..=5); - let n = rng.gen_range(1..=5); - for _ in 1..=4 { - let suppkey = (key - + (rng.gen_range(0..=3) - * ((ctx.tpch.count_supplier / 4) - + (key - 1) / ctx.tpch.count_supplier))) - % ctx.tpch.count_supplier - + 1; - let row = Row::pack_slice(&[ - Datum::Int64(key), - Datum::Int64(suppkey), - Datum::Int32(rng.gen_range(1..=9_999)), // availqty - Datum::Numeric(decimal( - &mut rng, - &mut ctx.cx, - 1_00, - 1_000_00, - 100, - )), // supplycost - Datum::String(text_string( - &mut rng, - &ctx.text_string_source, - 49, - 198, - )), - ]); - pending.push(( - PARTSUPP_OUTPUT, - GeneratorMessageType::InProgress, - row, - 1, - )); - } - Row::pack_slice(&[ + if let Some((output, key)) = rows.next() { + let key_usize = usize::try_from(key).expect("key known to be non-negative"); + let row = match output { + SUPPLIER_OUTPUT => { + let nation = rng.gen_range(0..count_nation); + Row::pack_slice(&[ + Datum::Int64(key), + Datum::String(&pad_nine("Supplier", key)), + Datum::String(&v_string(&mut rng, 10, 40)), // address + Datum::Int64(nation), + Datum::String(&phone(&mut rng, nation)), + Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal + // TODO: add customer complaints and recommends, see 4.2.3. + Datum::String(text_string(&mut rng, &ctx.text_string_source, 25, 100)), + ]) + } + PART_OUTPUT => { + let name: String = PARTNAMES + .choose_multiple(&mut rng, 5) + .cloned() + .collect::>() + .join(" "); + let m = rng.gen_range(1..=5); + let n = rng.gen_range(1..=5); + for _ in 1..=4 { + let suppkey = (key + + (rng.gen_range(0..=3) + * ((ctx.tpch.count_supplier / 4) + + (key - 1) / ctx.tpch.count_supplier))) + % ctx.tpch.count_supplier + + 1; + let row = Row::pack_slice(&[ Datum::Int64(key), - Datum::String(&name), - Datum::String(&format!("Manufacturer#{m}")), - Datum::String(&format!("Brand#{m}{n}")), - Datum::String(&syllables(&mut rng, TYPES)), - Datum::Int32(rng.gen_range(1..=50)), // size - Datum::String(&syllables(&mut rng, CONTAINERS)), - Datum::Numeric(partkey_retailprice(key)), + Datum::Int64(suppkey), + Datum::Int32(rng.gen_range(1..=9_999)), // availqty + Datum::Numeric(decimal(&mut rng, &mut ctx.cx, 1_00, 1_000_00, 100)), // supplycost Datum::String(text_string( &mut rng, &ctx.text_string_source, 49, 198, )), - ]) - } - CUSTOMER_OUTPUT => { - let nation = rng.gen_range(0..count_nation); - Row::pack_slice(&[ - Datum::Int64(key), - Datum::String(&pad_nine("Customer", key)), - Datum::String(&v_string(&mut rng, 10, 40)), // address - Datum::Int64(nation), - Datum::String(&phone(&mut rng, nation)), - Datum::Numeric(decimal( - &mut rng, - &mut ctx.cx, - -999_99, - 9_999_99, - 100, - )), // acctbal - Datum::String(SEGMENTS.choose(&mut rng).unwrap()), - Datum::String(text_string( - &mut rng, - &ctx.text_string_source, - 29, - 116, - )), - ]) - } - ORDERS_OUTPUT => { - let seed = rng.gen(); - let (order, lineitems) = ctx.order_row(seed, key); - for row in lineitems { - pending.push(( - LINEITEM_OUTPUT, - GeneratorMessageType::InProgress, - row, - 1, - )); - } - if !ctx.tpch.tick.is_zero() { - active_orders.push((key, seed)); - } - order - } - NATION_OUTPUT => { - let (name, region) = NATIONS[key_usize]; - Row::pack_slice(&[ - Datum::Int64(key), - Datum::String(name), - Datum::Int64(region), - Datum::String(text_string( - &mut rng, - &ctx.text_string_source, - 31, - 114, - )), - ]) + ]); + pending.push_back(( + PARTSUPP_OUTPUT, + Event::Message(MzOffset::from(offset), (row, 1)), + )); } - REGION_OUTPUT => Row::pack_slice(&[ + Row::pack_slice(&[ Datum::Int64(key), - Datum::String(REGIONS[key_usize]), - Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)), - ]), - _ => unreachable!("{output}"), - }; - let typ = if rows.peek().is_some() { - GeneratorMessageType::InProgress - } else { - GeneratorMessageType::Finalized - }; - (output, typ, row, 1) - }) - .or_else(|| { - if ctx.tpch.tick.is_zero() { - return None; + Datum::String(&name), + Datum::String(&format!("Manufacturer#{m}")), + Datum::String(&format!("Brand#{m}{n}")), + Datum::String(&syllables(&mut rng, TYPES)), + Datum::Int32(rng.gen_range(1..=50)), // size + Datum::String(&syllables(&mut rng, CONTAINERS)), + Datum::Numeric(partkey_retailprice(key)), + Datum::String(text_string(&mut rng, &ctx.text_string_source, 49, 198)), + ]) } - let idx = rng.gen_range(0..active_orders.len()); - let (key, old_seed) = active_orders.swap_remove(idx); - let (old_order, old_lineitems) = ctx.order_row(old_seed, key); - // Fill pending with old lineitem retractions, new lineitem - // additions, and finally the new order. Return the old - // order to start the batch. - for row in old_lineitems { - pending.push((LINEITEM_OUTPUT, GeneratorMessageType::InProgress, row, -1)); + CUSTOMER_OUTPUT => { + let nation = rng.gen_range(0..count_nation); + Row::pack_slice(&[ + Datum::Int64(key), + Datum::String(&pad_nine("Customer", key)), + Datum::String(&v_string(&mut rng, 10, 40)), // address + Datum::Int64(nation), + Datum::String(&phone(&mut rng, nation)), + Datum::Numeric(decimal(&mut rng, &mut ctx.cx, -999_99, 9_999_99, 100)), // acctbal + Datum::String(SEGMENTS.choose(&mut rng).unwrap()), + Datum::String(text_string(&mut rng, &ctx.text_string_source, 29, 116)), + ]) } - let new_seed = rng.gen(); - let (new_order, new_lineitems) = ctx.order_row(new_seed, key); - for row in new_lineitems { - pending.push((LINEITEM_OUTPUT, GeneratorMessageType::InProgress, row, 1)); + ORDERS_OUTPUT => { + let seed = rng.gen(); + let (order, lineitems) = ctx.order_row(seed, key); + for row in lineitems { + pending.push_back(( + LINEITEM_OUTPUT, + Event::Message(MzOffset::from(offset), (row, 1)), + )); + } + if !ctx.tpch.tick.is_zero() { + active_orders.push((key, seed)); + } + order } - pending.push((ORDERS_OUTPUT, GeneratorMessageType::Finalized, new_order, 1)); - active_orders.push((key, new_seed)); - - Some(( - ORDERS_OUTPUT, - GeneratorMessageType::InProgress, - old_order, - -1, - )) - }) + NATION_OUTPUT => { + let (name, region) = NATIONS[key_usize]; + Row::pack_slice(&[ + Datum::Int64(key), + Datum::String(name), + Datum::Int64(region), + Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 114)), + ]) + } + REGION_OUTPUT => Row::pack_slice(&[ + Datum::Int64(key), + Datum::String(REGIONS[key_usize]), + Datum::String(text_string(&mut rng, &ctx.text_string_source, 31, 115)), + ]), + _ => unreachable!("{output}"), + }; + + pending.push_back((output, Event::Message(MzOffset::from(offset), (row, 1)))); + if rows.peek().is_none() { + offset += 1; + pending.push_back((output, Event::Progress(Some(MzOffset::from(offset))))); + } + } else { + if ctx.tpch.tick.is_zero() { + return None; + } + let idx = rng.gen_range(0..active_orders.len()); + let (key, old_seed) = active_orders.swap_remove(idx); + let (old_order, old_lineitems) = ctx.order_row(old_seed, key); + // Fill pending with old lineitem retractions, new lineitem + // additions, and finally the new order. Return the old + // order to start the batch. + for row in old_lineitems { + pending.push_back(( + LINEITEM_OUTPUT, + Event::Message(MzOffset::from(offset), (row, -1)), + )); + } + let new_seed = rng.gen(); + let (new_order, new_lineitems) = ctx.order_row(new_seed, key); + for row in new_lineitems { + pending.push_back(( + LINEITEM_OUTPUT, + Event::Message(MzOffset::from(offset), (row, 1)), + )); + } + pending.push_back(( + ORDERS_OUTPUT, + Event::Message(MzOffset::from(offset), (old_order, -1)), + )); + pending.push_back(( + ORDERS_OUTPUT, + Event::Message(MzOffset::from(offset), (new_order, 1)), + )); + offset += 1; + pending.push_back((ORDERS_OUTPUT, Event::Progress(Some(MzOffset::from(offset))))); + active_orders.push((key, new_seed)); + } + pending.pop_front() })) } } diff --git a/test/testdrive/dataflow-cleanup.td b/test/testdrive/dataflow-cleanup.td index 9a6782debc96a..74acd5d4d3ef3 100644 --- a/test/testdrive/dataflow-cleanup.td +++ b/test/testdrive/dataflow-cleanup.td @@ -95,11 +95,11 @@ false true > SELECT mz_internal.mz_sleep(1) -> SELECT count(*) > 0 FROM mz_internal.mz_dataflow_operators -false +> SELECT count(*) FROM mz_internal.mz_dataflow_operators +0 > DROP MATERIALIZED VIEW q14 -> SELECT count(*) > 0 FROM mz_internal.mz_dataflow_operators -false +> SELECT count(*) FROM mz_internal.mz_dataflow_operators +0 # Clean up. > DROP CLUSTER test CASCADE diff --git a/test/testdrive/load-generator.td b/test/testdrive/load-generator.td index 0b1547ec84a46..c21bdfee332ff 100644 --- a/test/testdrive/load-generator.td +++ b/test/testdrive/load-generator.td @@ -67,7 +67,7 @@ materialize.public.accounts "CREATE SUBSOURCE \"materialize\".\"public\".\"accou # Check that negative max cardinalities are rejected ! CREATE SOURCE counter2 FROM LOAD GENERATOR COUNTER (MAX CARDINALITY -1) -contains: unsupported max cardinality +contains:invalid MAX CARDINALITY: invalid unsigned numeric value: invalid digit found in string > CREATE SOURCE counter3 FROM LOAD GENERATOR COUNTER (MAX CARDINALITY 0)