Skip to content

Commit

Permalink
storage: only messages should drive generator capability
Browse files Browse the repository at this point in the history
Fixes a bug introduced in MaterializeInc#21842 and detected by @guswynn here
https://github.com/MaterializeInc/materialize/pull/21482/files#r1310695260

Fixes #21496

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Sep 4, 2023
1 parent 03b45a9 commit f9c8e3f
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/storage/src/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ impl SourceRender for LoadGeneratorSourceConnection {
let Some(resume_offset) = resume_upper.into_option() else {
return;
};
cap.downgrade(&resume_offset);

let mut rows = as_generator(&self.load_generator, self.tick_micros).by_seed(
mz_ore::now::SYSTEM_TIME.clone(),
Expand All @@ -139,7 +138,14 @@ impl SourceRender for LoadGeneratorSourceConnection {
}
Event::Progress(Some(offset)) => {
cap.downgrade(&offset);
tokio::time::sleep(tick).await;
// We only sleep if we have surpassed the resume offset so that we can
// quickly go over any historical updates that a generator might choose to
// emit.
// TODO(petrosagg): Remove the sleep below and make generators return an
// async stream so that they can drive the rate of production directly
if resume_offset < offset {
tokio::time::sleep(tick).await;
}
}
Event::Progress(None) => return,
}
Expand Down

0 comments on commit f9c8e3f

Please sign in to comment.