Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement wrapper around KafkaConsumer to support batch consumption with head of line blocking #907

Closed
nscuro opened this issue Nov 13, 2023 · 3 comments
Assignees
Labels
component/api-server enhancement New feature or request p3 Nice-to-have features size/L High effort spike/research Requires more research before implementation

Comments

@nscuro
Copy link
Member

nscuro commented Nov 13, 2023

We have multiple use cases where Kafka records are assembled into batches before they're processed.

While batch semantics can be achieved with Kafka Streams, it introduces additional overhead:

  • Record offsets become eligible for committing once a record successfully passes through a sub-topology
  • It is thus not possible to delay offset commits until the batch is processed, as record offsets may be committed before that, risking message loss when processing the batch fails
  • Assembling record batches requires Kafka Streams state stores (in-memory or RocksDB), which in turn necessitate a changelog topic for fault tolerance
  • Batches can only be assembled per-partition, because each state store is bound to a single Kafka Streams task, which itself is bound to one partition

An example of trying to address such batching use cases in Kafka Streams can be seen here: https://github.com/DependencyTrack/hyades-apiserver/pull/305/files

For simple batching use cases, ideally it should work like this:

  • Subscribe to N partitions of topic foo
  • poll records from all N partitions
  • Put records into in-memory batch, until a given max size is reached
  • (Optionally send records that failed to deserialize to a dead-letter-topic)
  • When max batch size is reached, or a given timeout is reached, "flush" / process records (i.e. write to database, do HTTP call, ...)
  • When flushing was successful, commit offsets
  • When flushing was unsuccessful, do not commits offsets and either:
    • Fail the consumer entirely in case of non-retryable errors
    • Restart consumer from last committed offset in case of retryable errors

Essentially, implement a batch consumer with head-of-line blocking.

HOL blocking semantics are often undesirable, but for certain cases they are useful:

  • Assuming all records are flushed to the same "sink" (e.g. database), failure to flush one record (e.g. database is down) always implies others can't be flushed either. There is no point in proceeding to later records in the topic.
  • Retries are simple, because they simply involve restarting from the last committed offset. There is no additional state keeping necessary, and no retry- or changelog-topic is required.

Areas where I think this might be useful:

  • Ingestion of ScanResults and vulnerabilities from the dtrack.vuln-analysis.result topic
  • Ingestion of AnalysisResults from the dtrack.repo-meta-analysis.result topic
  • Buffering of ScanResults when tracking vulnerability scan completion (see https://github.com/DependencyTrack/hyades-apiserver/pull/305/files)
  • Ingestion of mirrored vulnerabilities from the dtrack.vulnerability topic
  • Performing vulnerability analysis with Snyk or OSS Index which support PURL batching
@nscuro nscuro added enhancement New feature or request p3 Nice-to-have features size/L High effort component/api-server spike/research Requires more research before implementation labels Nov 13, 2023
@nscuro
Copy link
Member Author

nscuro commented Nov 14, 2023

I just realized that Confluent's Parallel Consumer is doing exactly that: #346

Update: Confluent Parallel Consumer has no batch timeout behavior (doesn't wait for batches to become full). So ultimately we need to build our own batching consumer. But perhaps PC can be used as intermediary solution in the meantime.

@nscuro nscuro self-assigned this Nov 24, 2023
@nscuro nscuro mentioned this issue Feb 2, 2024
34 tasks
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 2, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 3, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
Decoupled from #509

This merely adds an API on top of which processors can be implemented. We can migrate processors one-by-one from Kafka Streams to this API. Majority of this work was already done in #509, but got out of date due to changed priorities. At the very least said PR is good to take inspiration from.

Relates to DependencyTrack/hyades#346
Relates to DependencyTrack/hyades#901
Relates to DependencyTrack/hyades#907

Signed-off-by: nscuro <[email protected]>
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 5, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 9, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 9, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 26, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 26, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 26, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Feb 26, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 4, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 4, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 19, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 19, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 19, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 22, 2024
nscuro added a commit to DependencyTrack/hyades-apiserver that referenced this issue Mar 25, 2024
@nscuro
Copy link
Member Author

nscuro commented Jun 5, 2024

Closing, as consumers in the API server have been migrated to Confluent parallel-consumer.

@nscuro nscuro closed this as completed Jun 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/api-server enhancement New feature or request p3 Nice-to-have features size/L High effort spike/research Requires more research before implementation
Projects
None yet
Development

No branches or pull requests

1 participant