diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 5103454e19dac..f6216097555ac 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -113,7 +113,6 @@ impl SourceRender for LoadGeneratorSourceConnection { let Some(resume_offset) = resume_upper.into_option() else { return }; - cap.downgrade(&resume_offset); let mut rows = as_generator(&self.load_generator, self.tick_micros).by_seed( mz_ore::now::SYSTEM_TIME.clone(), @@ -139,7 +138,14 @@ impl SourceRender for LoadGeneratorSourceConnection { } Event::Progress(Some(offset)) => { cap.downgrade(&offset); - tokio::time::sleep(tick).await; + // We only sleep if we have surpassed the resume offset so that we can + // quickly go over any historical updates that a generator might choose to + // emit. + // TODO(petrosagg): Remove the sleep below and make generators return an + // async stream so that they can drive the rate of production directly + if resume_offset < offset { + tokio::time::sleep(tick).await; + } } Event::Progress(None) => return, }