Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add synced logstore #20204

Merged
merged 73 commits into from
Feb 7, 2025
Merged
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
e69b09b
add interfaces
kwannoel Jan 10, 2025
3200bf7
interim
kwannoel Jan 13, 2025
73350a3
separate select
kwannoel Jan 13, 2025
c142833
poll writer and reader
kwannoel Jan 14, 2025
578a729
directly read from upstream
kwannoel Jan 14, 2025
f1e5881
match different upstream items
kwannoel Jan 15, 2025
fb0d8ee
docs
kwannoel Jan 15, 2025
38f7f45
add skeleton for read path
kwannoel Jan 15, 2025
8dee24c
refactor common fn
kwannoel Jan 16, 2025
6bcd06c
handle full read path
kwannoel Jan 16, 2025
11bc153
write barrier
kwannoel Jan 16, 2025
4eba587
write chunk
kwannoel Jan 17, 2025
c4554fa
wired up write_barrier
kwannoel Jan 17, 2025
884d48a
wire up write_chunk
kwannoel Jan 17, 2025
869aa3e
track metrics in buffer
kwannoel Jan 17, 2025
252e219
truncate offset after read
kwannoel Jan 17, 2025
fefbd28
refactor into functions
kwannoel Jan 17, 2025
e088f0b
add constructor
kwannoel Jan 17, 2025
5b2a2a5
fmt
kwannoel Jan 17, 2025
43d157b
remove unnecessary async
kwannoel Jan 17, 2025
cc0659f
do not lock mutex across await point
kwannoel Jan 17, 2025
629d72a
fix warnings
kwannoel Jan 17, 2025
990ee59
fmt
kwannoel Jan 20, 2025
c49bbd3
refactor to executor
kwannoel Jan 20, 2025
9789162
renaming
kwannoel Jan 20, 2025
5230aec
impl execute
kwannoel Jan 20, 2025
97e55b3
fix warn + docs
kwannoel Jan 20, 2025
4d605f0
refactor persisted logstore
kwannoel Jan 20, 2025
626d918
init state store iter
kwannoel Jan 20, 2025
5e937d0
fix warn
kwannoel Jan 20, 2025
a46c2a9
pass executor rather than msg stream
kwannoel Jan 20, 2025
a360b99
defer state_store clone
kwannoel Jan 20, 2025
474692e
add vnodes
kwannoel Jan 20, 2025
972d0df
test outline pt 1
kwannoel Jan 21, 2025
f764389
add more test lines
kwannoel Jan 21, 2025
f369e82
check test results
kwannoel Jan 21, 2025
da0f78a
fix calls
kwannoel Jan 21, 2025
bf2c8c6
make test compile
kwannoel Jan 21, 2025
f8f50b5
yield first barrier
kwannoel Jan 22, 2025
1bb6a3a
logging
kwannoel Jan 22, 2025
e4c7609
bias to upstream side
kwannoel Jan 22, 2025
0209ff3
fmt
kwannoel Jan 22, 2025
8406619
allow unused
kwannoel Jan 22, 2025
4f6f8fb
fix test lints
kwannoel Jan 23, 2025
8402e35
fix
kwannoel Jan 23, 2025
b054284
use prev
kwannoel Jan 27, 2025
9a36b0d
fix deps
kwannoel Jan 27, 2025
6655816
remove LS
kwannoel Jan 27, 2025
293b160
use pub(crate) instead of pub
kwannoel Jan 27, 2025
4c619ce
use expect_first_barrier + yield before init state store
kwannoel Jan 27, 2025
0e343bc
fix warn
kwannoel Jan 27, 2025
69b3fc8
apply vnode update to write path
kwannoel Jan 27, 2025
e512989
update read path
kwannoel Jan 27, 2025
63a375d
no need lock
kwannoel Jan 27, 2025
61a8b30
fix test
kwannoel Jan 28, 2025
e6e2092
need to handle watermark
kwannoel Jan 28, 2025
a7bf527
Apply suggestions from code review
kwannoel Feb 3, 2025
5862b88
no need read_metrics
kwannoel Feb 4, 2025
43d94bd
fix vis
kwannoel Feb 4, 2025
5457a54
remove unnecessary read metrics from test
kwannoel Feb 5, 2025
74ea90a
remove unnecessary fields
kwannoel Feb 5, 2025
49b0aef
just pass epoch
kwannoel Feb 5, 2025
4ff702d
UpdateVnodes should not be in buffer
kwannoel Feb 5, 2025
c4b9c1a
fix test
kwannoel Feb 5, 2025
ffee918
move vnode bitmap updates to top level
kwannoel Feb 5, 2025
06ce11a
fmt + fix async -> sync
kwannoel Feb 5, 2025
3eb0ecc
await
kwannoel Feb 5, 2025
3c68523
fix control flow
kwannoel Feb 5, 2025
6c72967
fmt
kwannoel Feb 5, 2025
3559fa2
increase timeout
kwannoel Feb 5, 2025
eed0d41
return pending if no results from the logstore side
kwannoel Feb 6, 2025
1d8ef0f
fix compat
kwannoel Feb 6, 2025
7eb5c8a
increase timeout
kwannoel Feb 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
@@ -114,7 +114,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 30
timeout_in_minutes: 35
retry: *auto-retry

- label: "slow end-to-end test"
@@ -160,7 +160,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 30
timeout_in_minutes: 35
retry: *auto-retry

- label: "end-to-end test for opendal (parallel)"
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ risingwave_hummock_test = { path = "../storage/hummock_test", features = [
"test",
] }
serde_yaml = "0.9"
tracing-subscriber = "0.3.17"
tracing-test = "0.2"

[features]
Original file line number Diff line number Diff line change
@@ -339,7 +339,7 @@ impl LogStoreBufferSender {
{
if *flushed {
// Since we iterate from new data to old data, when we meet a flushed data, the
// rest should all be flushed.
// rest should have been flushed.
break;
}
flush_fn(chunk, *epoch, *start_seq_id, *end_seq_id)?;
10 changes: 5 additions & 5 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
@@ -32,11 +32,11 @@ use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
use crate::common::log_store_impl::kv_log_store::writer::KvLogStoreWriter;
use crate::executor::monitor::StreamingMetrics;

mod buffer;
mod reader;
pub(crate) mod buffer;
pub mod reader;
pub(crate) mod serde;
#[cfg(test)]
mod test_utils;
pub mod test_utils;
mod writer;

pub(crate) use reader::{REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY};
@@ -55,7 +55,7 @@ pub(crate) const FIRST_SEQ_ID: SeqIdType = 0;
pub(crate) type ReaderTruncationOffsetType = (u64, Option<SeqIdType>);

#[derive(Clone)]
pub(crate) struct KvLogStoreReadMetrics {
pub struct KvLogStoreReadMetrics {
pub storage_read_count: LabelGuardedIntCounter<5>,
pub storage_read_size: LabelGuardedIntCounter<5>,
}
@@ -190,7 +190,7 @@ impl KvLogStoreMetrics {
}

#[cfg(test)]
fn for_test() -> Self {
pub(crate) fn for_test() -> Self {
KvLogStoreMetrics {
storage_write_count: LabelGuardedIntCounter::test_int_counter(),
storage_write_size: LabelGuardedIntCounter::test_int_counter(),
225 changes: 126 additions & 99 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
@@ -51,7 +51,9 @@ use crate::common::log_store_impl::kv_log_store::buffer::{
use crate::common::log_store_impl::kv_log_store::serde::{
merge_log_store_item_stream, KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
};
use crate::common::log_store_impl::kv_log_store::KvLogStoreMetrics;
use crate::common::log_store_impl::kv_log_store::{
KvLogStoreMetrics, KvLogStoreReadMetrics, SeqIdType,
};

pub(crate) const REWIND_BASE_DELAY: Duration = Duration::from_secs(1);
pub(crate) const REWIND_BACKOFF_FACTOR: u64 = 2;
@@ -197,7 +199,7 @@ impl<S: StateStoreRead> KvLogStoreReader<S> {
}
}

struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
pub struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
state_store: S,
iter: S::Iter,
// call to get whether to rebuild the iter. Once return true, the closure should reset itself.
@@ -230,7 +232,7 @@ impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F>
}
}

mod timeout_auto_rebuild {
pub(crate) mod timeout_auto_rebuild {
use std::time::{Duration, Instant};

use risingwave_hummock_sdk::key::TableKeyRange;
@@ -240,7 +242,7 @@ mod timeout_auto_rebuild {

use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter;

pub(super) type TimeoutAutoRebuildIter<S: StateStoreRead> =
pub(crate) type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;

pub(super) async fn iter_with_timeout_rebuild<S: StateStoreRead>(
@@ -345,57 +347,14 @@ impl<S: StateStoreRead + Clone> KvLogStoreReader<S> {
) -> impl Future<
Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,
> + Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(
self.serde
.serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch()),
)
} else {
Unbounded
};
let range_end = self.serde.serialize_pk_epoch_prefix(
self.first_write_epoch
.expect("should have set first write epoch"),
);

let serde = self.serde.clone();
let table_id = self.table_id;
let read_metrics = self.metrics.persistent_log_read_metrics.clone();
let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| {
let key_range = prefixed_range_with_vnode(
(range_start.clone(), Excluded(range_end.clone())),
vnode,
);
let state_store = self.state_store.clone();
async move {
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

streams_future.map_err(Into::into).map_ok(|streams| {
// TODO: set chunk size by config
Box::pin(merge_log_store_item_stream(
streams,
serde,
1024,
read_metrics,
))
})
read_persisted_log_store(
&self.serde,
self.table_id,
&self.metrics,
self.state_store.clone(),
self.first_write_epoch.expect("should have init"),
last_persisted_epoch,
)
}
}

@@ -510,50 +469,17 @@ impl<S: StateStoreRead + Clone> LogReader for KvLogStoreReader<S> {
let state_store = self.state_store.clone();
let table_id = self.table_id;
let read_metrics = self.metrics.flushed_buffer_read_metrics.clone();
async move {
let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
let range_start =
serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id));
let range_end =
serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id));
let state_store = &state_store;

// Use MAX EPOCH here because the epoch to consume may be below the safe
// epoch
async move {
Ok::<_, anyhow::Error>(
state_store
.iter(
(Included(range_start), Included(range_end)),
HummockEpoch::MAX,
ReadOptions {
prefetch_options:
PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
)
.await?,
)
}
}))
.instrument_await("Wait Create Iter Stream")
.await?;

let chunk = serde
.deserialize_stream_chunk(
iters,
start_seq_id,
end_seq_id,
item_epoch,
&read_metrics,
)
.instrument_await("Deserialize Stream Chunk")
.await?;

Ok((chunk_id, chunk, item_epoch))
}
read_flushed_chunk(
serde,
state_store,
vnode_bitmap,
chunk_id,
start_seq_id,
end_seq_id,
item_epoch,
table_id,
read_metrics,
)
.boxed()
};

@@ -672,6 +598,107 @@ impl<S: StateStoreRead + Clone> LogReader for KvLogStoreReader<S> {
}
}

#[allow(clippy::too_many_arguments)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the read_flushed_chunk and read_persisted_log_store just a simple copy-paste from the original code without any change in the logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. These are refactored and reused in the synced_log_store executor.

pub(crate) async fn read_flushed_chunk(
serde: LogStoreRowSerde,
state_store: impl StateStoreRead,
vnode_bitmap: Bitmap,
chunk_id: ChunkId,
start_seq_id: SeqIdType,
end_seq_id: SeqIdType,
item_epoch: u64,
table_id: TableId,
read_metrics: KvLogStoreReadMetrics,
) -> LogStoreResult<(ChunkId, StreamChunk, u64)> {
tracing::trace!("reading flushed chunk from buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}");
let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
let range_start = serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id));
let range_end = serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id));
let state_store = &state_store;

// Use MAX EPOCH here because the epoch to consume may be below the safe
// epoch
async move {
Ok::<_, anyhow::Error>(
state_store
.iter(
(Included(range_start), Included(range_end)),
HummockEpoch::MAX,
ReadOptions {
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
)
.await?,
)
}
}))
.instrument_await("Wait Create Iter Stream")
.await?;

let chunk = serde
.deserialize_stream_chunk(iters, start_seq_id, end_seq_id, item_epoch, &read_metrics)
.instrument_await("Deserialize Stream Chunk")
.await?;

Ok((chunk_id, chunk, item_epoch))
}

pub(crate) fn read_persisted_log_store<S: StateStoreRead + Clone>(
serde: &LogStoreRowSerde,
table_id: TableId,
metrics: &KvLogStoreMetrics,
state_store: S,
first_write_epoch: u64,
last_persisted_epoch: Option<u64>,
) -> impl Future<Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>>
+ Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(serde.serialize_pk_epoch_prefix(last_persisted_epoch.next_epoch()))
} else {
Unbounded
};
let range_end = serde.serialize_pk_epoch_prefix(first_write_epoch);

let serde = serde.clone();
let read_metrics = metrics.persistent_log_read_metrics.clone();
let streams_future = try_join_all(serde.vnodes().iter_vnodes().map(|vnode| {
let key_range =
prefixed_range_with_vnode((range_start.clone(), Excluded(range_end.clone())), vnode);
let state_store = state_store.clone();
async move {
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheHint::Low),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

streams_future.map_err(Into::into).map_ok(|streams| {
// TODO: set chunk size by config
Box::pin(merge_log_store_item_stream(
streams,
serde,
1024,
read_metrics,
))
})
}

#[cfg(test)]
mod tests {
use std::collections::{Bound, HashSet};
5 changes: 5 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -110,6 +110,7 @@ mod row_merge;

#[cfg(test)]
mod integration_tests;
mod sync_kv_log_store;
pub mod test_utils;
mod utils;

@@ -390,6 +391,10 @@ impl Barrier {
.map_or(false, |actors| actors.contains(&actor_id))
}

pub fn is_checkpoint(&self) -> bool {
self.kind == BarrierKind::Checkpoint
}

/// Get the initial split assignments for the actor with `actor_id`.
///
/// This should only be called on the initial barrier received by the executor. It must be
Loading