Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: teach load generator sources to resume #21482

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ generate_extracted_config!(
LoadGeneratorOption,
(TickInterval, Interval),
(ScaleFactor, f64),
(MaxCardinality, i64)
(MaxCardinality, u64)
);

pub(crate) fn load_generator_ast_to_generator(
Expand All @@ -1394,11 +1394,6 @@ pub(crate) fn load_generator_ast_to_generator(
let LoadGeneratorOptionExtracted {
max_cardinality, ..
} = options.to_vec().try_into()?;
if let Some(max_cardinality) = max_cardinality {
if max_cardinality < 0 {
sql_bail!("unsupported max cardinality {max_cardinality}");
}
}
LoadGenerator::Counter { max_cardinality }
}
mz_sql_parser::ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
Expand Down
4 changes: 1 addition & 3 deletions src/storage-client/src/types/sources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ message ProtoTestScriptSourceConnection {
}

message ProtoCounterLoadGenerator {
// Must be non-negative,
// but kept as int64 to make downstream logic simpler
optional int64 max_cardinality = 1;
optional uint64 max_cardinality = 1;
}

message ProtoTpchLoadGenerator {
Expand Down
15 changes: 4 additions & 11 deletions src/storage-client/src/types/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use prost::Message;
use serde::{Deserialize, Serialize};
use timely::dataflow::operators::to_stream::Event;
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::timestamp::Refines;
use timely::progress::{PathSummary, Timestamp};
Expand Down Expand Up @@ -2182,10 +2183,7 @@ pub enum LoadGenerator {
/// How many values will be emitted
/// before old ones are retracted, or `None` for
/// an append-only collection.
///
/// This is verified by the planner to be nonnegative. We encode it as
/// an `i64` to make the code in `Counter::by_seed` simpler.
max_cardinality: Option<i64>,
max_cardinality: Option<u64>,
},
Datums,
Marketing,
Expand Down Expand Up @@ -2474,13 +2472,8 @@ pub trait Generator {
&self,
now: NowFn,
seed: Option<u64>,
) -> Box<dyn Iterator<Item = (usize, GeneratorMessageType, Row, i64)>>;
}

#[derive(Clone, Copy, Debug)]
pub enum GeneratorMessageType {
InProgress,
Finalized,
resume_offset: MzOffset,
) -> Box<dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>>;
}

impl RustType<ProtoLoadGeneratorSourceConnection> for LoadGeneratorSourceConnection {
Expand Down
63 changes: 29 additions & 34 deletions src/storage/src/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ use std::rc::Rc;
use std::time::Duration;

use differential_dataflow::{AsCollection, Collection};
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_repr::{Diff, Row};
use mz_storage_client::types::connections::ConnectionContext;
use mz_storage_client::types::sources::{
Generator, GeneratorMessageType, LoadGenerator, LoadGeneratorSourceConnection, MzOffset,
SourceTimestamp,
Generator, LoadGenerator, LoadGeneratorSourceConnection, MzOffset, SourceTimestamp,
};
use mz_timely_util::builder_async::OperatorBuilder as AsyncOperatorBuilder;
use timely::dataflow::operators::to_stream::Event;
use timely::dataflow::operators::ToStream;
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
Expand Down Expand Up @@ -111,42 +110,38 @@ impl SourceRender for LoadGeneratorSourceConnection {
.map(MzOffset::decode_row),
);

let Some(mut offset) = resume_upper.into_option() else {
return;
let Some(resume_offset) = resume_upper.into_option() else {
return
};
cap.downgrade(&offset);

let mut rows = as_generator(&self.load_generator, self.tick_micros)
.by_seed(mz_ore::now::SYSTEM_TIME.clone(), None);
cap.downgrade(&resume_offset);

// Skip forward to the requested offset.
let tx_count = usize::cast_from(offset.offset);
let txns = rows
.by_ref()
.filter(|(_, typ, _, _)| matches!(typ, GeneratorMessageType::Finalized))
.take(tx_count)
.count();
assert_eq!(txns, tx_count, "produced wrong number of transactions");
let mut rows = as_generator(&self.load_generator, self.tick_micros).by_seed(
mz_ore::now::SYSTEM_TIME.clone(),
None,
resume_offset,
);

let tick = Duration::from_micros(self.tick_micros.unwrap_or(1_000_000));

while let Some((output, typ, value, diff)) = rows.next() {
let message = (
output,
Ok(SourceMessage {
upstream_time_millis: None,
key: (),
value,
headers: None,
}),
);

data_output.give(&cap, (message, offset, diff)).await;

if matches!(typ, GeneratorMessageType::Finalized) {
offset += 1;
cap.downgrade(&offset);
tokio::time::sleep(tick).await;
while let Some((output, event)) = rows.next() {
match event {
Event::Message(offset, (value, diff)) => {
let message = (
output,
Ok(SourceMessage {
upstream_time_millis: None,
key: (),
value,
headers: None,
}),
);
data_output.give(&cap, (message, offset, diff)).await;
}
Event::Progress(Some(offset)) => {
cap.downgrade(&offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auction and other generators that skip inspecting the resume upper are going to panic here on restart

tokio::time::sleep(tick).await;
}
Event::Progress(None) => return,
}
}
});
Expand Down
111 changes: 60 additions & 51 deletions src/storage/src/source/generator/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use std::iter;

use mz_ore::now::{to_datetime, NowFn};
use mz_repr::{Datum, Row};
use mz_storage_client::types::sources::{Generator, GeneratorMessageType};
use mz_storage_client::types::sources::{Generator, MzOffset};
use rand::prelude::{Rng, SmallRng};
use rand::seq::SliceRandom;
use rand::SeedableRng;
use timely::dataflow::operators::to_stream::Event;

/// CREATE TABLE organizations
/// (
Expand Down Expand Up @@ -74,7 +75,8 @@ impl Generator for Auction {
&self,
now: NowFn,
seed: Option<u64>,
) -> Box<(dyn Iterator<Item = (usize, GeneratorMessageType, Row, i64)>)> {
_resume_offset: MzOffset,
) -> Box<(dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>)> {
let mut rng = SmallRng::seed_from_u64(seed.unwrap_or_default());

let organizations = COMPANIES.iter().enumerate().map(|(offset, name)| {
Expand Down Expand Up @@ -115,57 +117,64 @@ impl Generator for Auction {
let mut pending: VecDeque<(usize, Row)> =
organizations.chain(users).chain(accounts).collect();

Box::new(iter::from_fn(move || {
{
if pending.is_empty() {
counter += 1;
let now = to_datetime(now());
let mut auction = Row::with_capacity(4);
let mut packer = auction.packer();
packer.push(Datum::Int64(counter)); // auction id
let max_seller_id =
i64::try_from(CELEBRETIES.len()).expect("demo entries less than i64::MAX");
packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // seller
packer.push(Datum::String(AUCTIONS.choose(&mut rng).unwrap())); // item
packer.push(Datum::TimestampTz(
(now + chrono::Duration::seconds(10))
.try_into()
.expect("timestamp must fit"),
)); // end time
pending.push_back((AUCTIONS_OUTPUT, auction));
const MAX_BIDS: i64 = 10;
for i in 0..rng.gen_range(2..MAX_BIDS) {
let bid_id = Datum::Int64(counter * MAX_BIDS + i);
let bid = {
let mut bid = Row::with_capacity(5);
let mut packer = bid.packer();
packer.push(bid_id);
packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // buyer
packer.push(Datum::Int64(counter)); // auction id
packer.push(Datum::Int32(rng.gen_range(1..100))); // amount
packer.push(Datum::TimestampTz(
(now + chrono::Duration::seconds(i))
.try_into()
.expect("timestamp must fit"),
)); // bid time
bid
};
pending.push_back((BIDS_OUTPUT, bid));
let mut offset = 0;
Box::new(
iter::from_fn(move || {
{
if pending.is_empty() {
counter += 1;
let now = to_datetime(now());
let mut auction = Row::with_capacity(4);
let mut packer = auction.packer();
packer.push(Datum::Int64(counter)); // auction id
let max_seller_id = i64::try_from(CELEBRETIES.len())
.expect("demo entries less than i64::MAX");
packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // seller
packer.push(Datum::String(AUCTIONS.choose(&mut rng).unwrap())); // item
packer.push(Datum::TimestampTz(
(now + chrono::Duration::seconds(10))
.try_into()
.expect("timestamp must fit"),
)); // end time
pending.push_back((AUCTIONS_OUTPUT, auction));
const MAX_BIDS: i64 = 10;
for i in 0..rng.gen_range(2..MAX_BIDS) {
let bid_id = Datum::Int64(counter * MAX_BIDS + i);
let bid = {
let mut bid = Row::with_capacity(5);
let mut packer = bid.packer();
packer.push(bid_id);
packer.push(Datum::Int64(rng.gen_range(1..=max_seller_id))); // buyer
packer.push(Datum::Int64(counter)); // auction id
packer.push(Datum::Int32(rng.gen_range(1..100))); // amount
packer.push(Datum::TimestampTz(
(now + chrono::Duration::seconds(i))
.try_into()
.expect("timestamp must fit"),
)); // bid time
bid
};
pending.push_back((BIDS_OUTPUT, bid));
}
}
// Pop from the front so auctions always appear before bids.
let pend = pending.pop_front();
pend.map(|(output, row)| {
let msg = (output, Event::Message(MzOffset::from(offset), (row, 1)));

// The first batch (orgs, users, accounts) is a single txn, all others (auctions and bids) are separate.
let progress = if counter != 0 || pending.is_empty() {
offset += 1;
Some((output, Event::Progress(Some(MzOffset::from(offset)))))
} else {
None
};
std::iter::once(msg).chain(progress)
})
}
// Pop from the front so auctions always appear before bids.
let pend = pending.pop_front();
pend.map(|(output, row)| {
// The first batch (orgs, users, accounts) is a single txn, all others (auctions and bids) are separate.
let typ = if counter != 0 || pending.is_empty() {
GeneratorMessageType::Finalized
} else {
GeneratorMessageType::InProgress
};
(output, typ, row, 1)
})
}
}))
})
.flatten(),
)
}
}

Expand Down
81 changes: 33 additions & 48 deletions src/storage/src/source/generator/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,58 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::iter;

use mz_ore::now::NowFn;
use mz_repr::{Datum, Row};
use mz_storage_client::types::sources::{Generator, GeneratorMessageType};
use mz_storage_client::types::sources::{Generator, MzOffset};
use timely::dataflow::operators::to_stream::Event;

pub struct Counter {
/// How many values will be emitted before old ones are retracted,
/// or `None` for an append-only collection. (If this retraction
/// behavior is changed,
/// `mz_storage_client::types::sources::LoadGenerator::is_monotonic`
/// must be updated.
///
/// This is verified by the planner to be nonnegative. We encode it as
/// an `i64` to make the code in `Counter::by_seed` simpler.
pub max_cardinality: Option<i64>,
pub max_cardinality: Option<u64>,
}

impl Generator for Counter {
fn by_seed(
&self,
_now: NowFn,
_seed: Option<u64>,
) -> Box<dyn Iterator<Item = (usize, GeneratorMessageType, Row, i64)>> {
let mut counter = 0;
resume_offset: MzOffset,
) -> Box<(dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>)> {
let max_cardinality = self.max_cardinality;

Box::new(
iter::repeat_with(move || {
let to_retract = match max_cardinality {
Some(max) => {
if max <= counter {
Some(counter - max + 1)
} else {
None
(resume_offset.offset..)
.map(move |offset| {
let retraction = match max_cardinality {
// At offset `max` we must start retracting the value of `offset - max`. For
// example if max_cardinality is 2 then the collection should contain:
// (1, 0, +1)
// (2, 1, +1)
// (1, 2, -1) <- Here offset becomes >= max and we retract the value that was
// (3, 2, +1) emitted at (offset - max), which equals (offset - max + 1).
// (2, 3, -1)
// (4, 3, +1)
Some(max) if offset >= max => {
let retracted_value = i64::try_from(offset - max + 1).unwrap();
let row = Row::pack_slice(&[Datum::Int64(retracted_value)]);
Some((0, Event::Message(MzOffset::from(offset), (row, -1))))
}
}
None => None,
};
counter += 1;
// NB: we could get rid of this allocation with
// judicious use of itertools::Either, if it were
// important to highly optimize this code path.
if let Some(to_retract) = to_retract {
vec![
(
0,
GeneratorMessageType::InProgress,
Row::pack_slice(&[Datum::Int64(counter)]),
1,
),
(
0,
GeneratorMessageType::Finalized,
Row::pack_slice(&[Datum::Int64(to_retract)]),
-1,
),
]
} else {
vec![(
0,
GeneratorMessageType::Finalized,
Row::pack_slice(&[Datum::Int64(counter)]),
1,
)]
}
})
.flatten(),
_ => None,
};

let inserted_value = i64::try_from(offset + 1).unwrap();
let row = Row::pack_slice(&[Datum::Int64(inserted_value)]);
let insertion = [
(0, Event::Message(MzOffset::from(offset), (row, 1))),
(0, Event::Progress(Some(MzOffset::from(offset + 1)))),
];
retraction.into_iter().chain(insertion)
})
.flatten(),
)
}
}
Loading