Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement the OrderedBatchProducer #4134

Merged
merged 27 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
724a4a6
feat: implement the `OrderedBatchProducer`
WenyXu Jun 12, 2024
016d793
test: add test of cancel safety
WenyXu Jun 12, 2024
dceabaf
chore: apply suggestions from CR
WenyXu Jun 12, 2024
6f1545e
chore: apply suggestions from CR
WenyXu Jun 12, 2024
a1bc532
refactor: simplify the `BackgroundProducerWorker`
WenyXu Jun 13, 2024
23b0951
feat: implement the OrderedBatchProducer v2
WenyXu Jun 13, 2024
1fbea06
refactor: switch to `OrderedBatchProducer`
WenyXu Jun 12, 2024
51bb93b
chore: rename to `MAX_FLUSH_QUEUE_SIZE`
WenyXu Jun 12, 2024
df600a0
refactor: switch to `OrderedBatchProducerV2`
WenyXu Jun 13, 2024
605bcdb
refactor: remove `OrderedBatchProducerV1`
WenyXu Jun 13, 2024
ac898ed
test: add tests
WenyXu Jun 13, 2024
70bc87a
refactor: make config configurable
WenyXu Jun 13, 2024
9db1dde
refactor: minor refactor
WenyXu Jun 13, 2024
a95186a
chore: remove unused code
WenyXu Jun 13, 2024
b414bf0
chore: remove `benchmarks` crate
WenyXu Jun 13, 2024
cc0c8fc
chore: update config doc
WenyXu Jun 13, 2024
a8684db
chore: remove unused comment
WenyXu Jun 14, 2024
710378d
refactor: refactor client registry
WenyXu Jun 14, 2024
7d3a377
refactor: rename `max_batch_size` to `max_batch_bytes`
WenyXu Jun 14, 2024
2ffa89c
chore: use constant value
WenyXu Jun 17, 2024
4931fa6
chore: ensure serialized meta < ESTIMATED_META_SIZE
WenyXu Jun 17, 2024
5246dbd
chore: apply suggestions from CR
WenyXu Jun 17, 2024
44eda34
chore: remove the `CHANNEL_SIZE`
WenyXu Jun 17, 2024
e3c9b4d
chore: apply suggestions from CR
WenyXu Jun 18, 2024
66c0e36
fix: ensure serialized meta < ESTIMATED_META_SIZE
WenyXu Jun 18, 2024
57a3ad8
chore: apply suggestions from CR
WenyXu Jun 18, 2024
826e6d9
chore: apply suggestions from CR
WenyXu Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ pub enum Error {
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
actual
))]
MetaLengthExceededLimit {
#[snafu(implicit)]
location: Location,
limit: usize,
actual: usize,
},
}

impl ErrorExt for Error {
Expand Down
11 changes: 9 additions & 2 deletions src/log-store/src/kafka/util/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::storage::RegionId;

use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MissingKeySnafu, MissingValueSnafu,
Result,
DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
MissingKeySnafu, MissingValueSnafu, Result,
};
use crate::kafka::{EntryId, NamespaceImpl};

Expand Down Expand Up @@ -90,6 +90,13 @@ impl TryFrom<Record> for KafkaRecord {

fn try_from(record: Record) -> Result<Self> {
let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
ensure!(
key.len() < ESTIMATED_META_SIZE,
MetaLengthExceededLimitSnafu {
limit: ESTIMATED_META_SIZE,
actual: key.len()
}
);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
Ok(KafkaRecord {
key: Some(key),
value: Some(record.data),
Expand Down