Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] Awkwardness due to API between queue sender and batch sender #10368

Closed
Tracked by #8122
carsonip opened this issue Jun 7, 2024 · 13 comments · Fixed by #11637
Closed
Tracked by #8122

[exporterhelper] Awkwardness due to API between queue sender and batch sender #10368

carsonip opened this issue Jun 7, 2024 · 13 comments · Fixed by #11637

Comments

@carsonip
Copy link
Contributor

carsonip commented Jun 7, 2024

Is your feature request related to a problem? Please describe.

Related to batch sender in #8122

Currently, queue sender and batch sender work together fine, but the config sending_queue.num_consumers and batch sender concurrencyLimit make it a little awkward to use. The awkwardness comes from the fact that it is impossible for batch sender to get more requests than queue sender's num_consumers, and may be forced to export a smaller batch due to it.

I am also not sure if this presents a performance problem at scale, which means a lot of goroutines will be involved.

The batch sender concurrency check has also been prone to unfavorable goroutine scheduling, as reported in #9952 .

Describe the solution you'd like

Ideally, queue sender can keep sending to batch sender without an artificial limit, such that batching is only triggered by min_size_items and flush_timeout.

I don't have an idea how to implement this yet. Also, this may require a change in contract between exporterhelper parts.

@dmitryax
Copy link
Member

dmitryax commented Jun 12, 2024

We still need to limit concurrency in the queue or batch configuration. Otherwise, the queue becomes useless.

From the user configuration perspective, I think we should provide:

batcher:
  # The maximum number of batches that can be exported concurrently. 
  # This can only be used with enabled sending_queue.
  max_concurrency: 

If this option is defined and both sending_queue and batcher are enabled, the sending_queue.num_workers will be ignored.

As you mentioned, this will complicate the contract between the queue and batcher senders. I think we can move the queue consumers into some sort of another component which will be replaced by the batcher if max_concurrency is provided.

@jmacd
Copy link
Contributor

jmacd commented Jun 21, 2024

@moh-osman3 and I studied the same problem -- we want to avoid use of a queue_sender but we want concurrency in the batcher. I reason that another user will come along asking not to use the queue sender, not to use the batch sender, and still want concurrency. Therefore, I propose we try to (and @moh-osman3 has been investigating how to) add a new sender stage purely for concurrency control.

In such a future, the existing queue_sender num_consumers I think should dictate how many workers perform the cpu and I/O intensive work of gathering items from the queue, not dictate how many export threads are used.

@moh-osman3
Copy link
Contributor

@dmitryax @carsonip This PR #10478 shows an implementation of the concurrency_sender which will limit the number of goroutines used for export. This removes the concurrenyLimit from the batch_sender so that batch exports are only triggered by size/timeout and the concurrency is limited by the new sender independent of queue_sender.num_consumers. Any thoughts on this approach?

@dmitryax
Copy link
Member

dmitryax commented Jul 8, 2024

@jmacd, I just wanted to clarify that the existing implementation of the batcher sender is concurrent. There is no limit if queue sender is not enabled. This issue is about limiting the concurrency of the batcher sender.

@dmitryax
Copy link
Member

dmitryax commented Aug 6, 2024

I believe the solution should be to change API interface between queue and batch from pushing queue->batcher to pulling queue<-batcher. So if queue and batcher are enabled, we treat batcher as queue consumers. The number of concurrent batchers should be configurable, similar to queue consumers it'll represent the maximum amount of outgoing requests.

The workflow would be the following:

  1. Pick a batcher/consumer from the pool, make it an active batcher
  2. The active batcher picks items from the queue until the minimum batch size (or flush timeout) is reached
  3. The batcher sends the data and marks itself as busy. Once the request is completed asynchronously, the batcher puts itself back in the pool
  4. Repeat 1-3 steps while there are available batcher/consumer in the pool. Otherwise, wait

The maximum number of cuncurrent batchers can be configured by the same num_workers option as the queue has. The queue's num_workers is ignored if batcher is enabled.

If the queue isn't enabled, the batch workers would listen for incoming requests and, in a similar way, control the concurrency of outgoing requests.

@axw
Copy link
Contributor

axw commented Aug 7, 2024

@dmitryax that sounds good to me. I wrote down related thoughts at #10478 (comment).

Re num_workers config: too few workers and we may not achieve optimal throughput, too many workers and we may never reach the minimum batch size. I think starting with configurable num_workers is the way to go, but it might be worth later adding auto scaling based on queue depth.

@dmitryax
Copy link
Member

dmitryax commented Aug 7, 2024

too many workers and we may never reach the minimum batch size.

num_workers doesn't affect this. Only one active batcher can consume metrics from the queue until the batch is ready to be exported. Other batchers are either inactive in the pool or busy sending completed batches.

@dmitryax
Copy link
Member

dmitryax commented Aug 7, 2024

I think we can start with the same default value for the num_workers as queue has: 10

@axw
Copy link
Contributor

axw commented Aug 7, 2024

num_workers doesn't affect this. Only one active batcher can consume metrics from the queue until the batch is ready to be exported. Other batchers are either inactive in the pool or busy sending completed batches.

Ah ok, I understand now. Sounds good.

FWIW we took that approach in an earlier version of Elastic APM and then switched to having multiple concurrently polling queue consumers to make better use of CPUs. It may not be relevant here, since in that case the queue consumer was doing more CPU-heavy work (JSON document encoding) than the batcher would be doing. (And the reason we did it there was so the bytes-based threshold could be based on the compressed total encoded documents size.)

@moh-osman3
Copy link
Contributor

num_workers doesn't affect this. Only one active batcher can consume metrics from the queue until the batch is ready to be exported. Other batchers are either inactive in the pool or busy sending completed batches.

Hmm why can we only have one active batcher? Why shouldn't all batchers read from the queue and try to form batches?

@dmitryax
Copy link
Member

dmitryax commented Sep 4, 2024

Hmm why can we only have one active batcher? Why shouldn't all batchers read from the queue and try to form batches?

Because we can't get consistent batching in that case. Let's say we have 200 items in the queue, and the minimum batch size is configured to be 100. The expected behavior is to get 2 batches of 100 items each sent out right away. However, if we have, let's say, 4 batchers reading from the queue concurrently, they can get smaller batches like 50,50,50,50 and be blocked until the timeout is reached. Once the timeout is passed, they will send 4 batches of 50 items.

@dmitryax
Copy link
Member

dmitryax commented Sep 4, 2024

@sfc-gh-sili is currently helping with this issue. She prepared a design doc based on our discussions: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit#heading=h.ypo1293baglx Feel free to take a look and comment

@carsonip
Copy link
Contributor Author

carsonip commented Sep 4, 2024

@dmitryax @sfc-gh-sili Thanks for the design doc, it looks good at a high level. Moving batching into queue sender when sending queue is enabled makes sense to me, and it should fix the awkwardness described in this issue, despite a little bit of extra complexity to handle both sending_queue={false,true}. I am happy to review the PRs when they are ready.

mx-psi pushed a commit that referenced this issue Sep 6, 2024
…11042)

#### Description

This PR moves `exporter/exporterhelper/request.go` to
`exporter/internal/request.go` to avoid circular dependency.

Context: As part of the effort to move from a queue->batch pushing model
to a queue->batch pulling model in exporter, we will be depending on
`Request` from `exporter/exporterbatcher`. However,
`exporter/exporterhelper` already depends on `exporter/exporterbatcher`,
so we need to move `Request` out of `exporter/exporterhelper`

#### Link to tracking issue

#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.
mx-psi added a commit that referenced this issue Sep 6, 2024
…guration (#11041)

#### Description

This PR changes initialization of `batchSender` and `queueSender` to
AFTER configuration. That way we get to access `queueConfig` and
`batcherConfig` in the same place.

Context: This is some pre-work for changing queue->batch from a pushing
model to a pulling model. We will be initialization a
`queueBatchSender(queueConfig, batcherConfig)` if both queue and batcher
are enabled and initialize `batchSender(batchConfig)` if only batcher is
enabled. This change enables us to achieve the goal without changing
config API.

#### Link to tracking issue

#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.

Co-authored-by: Pablo Baeyens <[email protected]>
bogdandrutu pushed a commit that referenced this issue Sep 26, 2024
…guration - #2 (#11240)

This PR follows
#11041.

The previous PR changed the initialization of `batchSender` and
`queueSender` to AFTER configuration, because that enables us to access
`queueConfig` and `batcherConfig` in the same place.

I noticed since then that there is another API for queue configuration,
and this PR takes care of that other API

#### Link to tracking issue

#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.
dmitryax added a commit that referenced this issue Oct 5, 2024
… to a class function instead of a callback (#11338)

#### Description

Why this change?
Each request from the queue contains multiple items, and those items
could be merge-split into multiple batches when they are sent out (see
#8122
for more about exporter batcher). We would like to book-keep those
cases, and only call `onProcessingFinished` when all such batches has
gone out. In this PR, `onProcessingFinished` is changed from a callback
to a method function because it is easier to book keep index instead of
functions.

#### Link to tracking issue
#8122
#10368

#### Testing
`exporter/internal/queue/persistent_queue_test.go`

#### Documentation

This is an internal change invisible to the users.

---------

Co-authored-by: Dmitrii Anoshin <[email protected]>
jackgopack4 pushed a commit to jackgopack4/opentelemetry-collector that referenced this issue Oct 8, 2024
…guration - #2 (open-telemetry#11240)

This PR follows
open-telemetry#11041.

The previous PR changed the initialization of `batchSender` and
`queueSender` to AFTER configuration, because that enables us to access
`queueConfig` and `batcherConfig` in the same place.

I noticed since then that there is another API for queue configuration,
and this PR takes care of that other API

#### Link to tracking issue

open-telemetry#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.
dmitryax pushed a commit that referenced this issue Oct 26, 2024
)

#### Description

This PR is a bare minimum implementation of a component called queue
batcher. On completion, this component will replace `consumers` in
`queue_sender`, and thus moving queue-batch from a pulling model instead
of pushing model.

Limitations of the current code
* This implements only the case where batching is disabled, which means
no merge of splitting of requests + no timeout flushing.
* This implementation does not enforce an upper bound on concurrency

All these code paths are marked as panic currently, and they will be
replaced with actual implementation in coming PRs. This PR is split from
#11507 for
easier review.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing


#### Link to tracking issue

#8122
#10368
dmitryax pushed a commit that referenced this issue Oct 29, 2024
#### Description

This PR follows
#11532 and
implements support for limited worker pool for queue batcher.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

#8122
#10368
dmitryax pushed a commit that referenced this issue Oct 30, 2024
…d exports (#11546)

#### Description

This PR follows
#11540 and
implements support for item-count based batching for queue batcher.

Limitation: This PR supports merging request but not splitting request.
In other words, it support specifying a minimum request size but not a
maximum request size.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

#8122
#10368
dmitryax pushed a commit that referenced this issue Nov 6, 2024
…batcher (#11580)

#### Description

This PR follows
#11546 and
add support for splitting (i.e. support setting a maximum request size)

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

#8122
#10368
dmitryax pushed a commit that referenced this issue Nov 11, 2024
…equest.export() (#11636)

#### Description

This PR changes queue batcher to use `exportFunc` instead of
`request.export()`. This makes testing easier and avoid passing
unnecessary detail to the exporter batcher.

#### Link to tracking issue

#8122
#10368
dmitryax pushed a commit that referenced this issue Nov 13, 2024
…tdown (#11666)

#### Description

This PR changes exporter queue batcher to flush the current batch on
shutdown.

#### Link to tracking issue

#10368
#8122
sfc-gh-sili added a commit to sfc-gh-sili/opentelemetry-collector that referenced this issue Nov 13, 2024
…tdown (open-telemetry#11666)

This PR changes exporter queue batcher to flush the current batch on
shutdown.

open-telemetry#10368
open-telemetry#8122
sfc-gh-sili added a commit to sfc-gh-sili/opentelemetry-collector that referenced this issue Nov 13, 2024
…tdown (open-telemetry#11666)

This PR changes exporter queue batcher to flush the current batch on
shutdown.

open-telemetry#10368
open-telemetry#8122
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR adds a public function `GetNextItem` to queue (both persistent
queue and bounded memory queue)

Why this change?
Instead of blocking until consumption of the item is done, we would like
to separate the API for reading and committing consumption.

Before:
`Consume(consumeFunc)`

After:
`idx, item = Read()`
`OnProcessingFinished(idx)`

<!-- Issue number if applicable -->
#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
… that operates on batch sender (open-telemetry#11448)

#### Description

As part of the effort to solve
open-telemetry#10368,
we no longer guarantee to initialize a `batchSender` when `batcher` is
enabled. Therefore, we would like to remove the interface to set
`mergeFunc` and `mergeSplitFunc` as a callback that operates on
`batchSender`. Instead, users should use the alternative
`WithBatchFuncs` that is a callback that operates `baseExporter`.

Context:
open-telemetry#11414

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368

---------

Co-authored-by: Bogdan Drutu <[email protected]>
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
…n-telemetry#11532)

#### Description

This PR is a bare minimum implementation of a component called queue
batcher. On completion, this component will replace `consumers` in
`queue_sender`, and thus moving queue-batch from a pulling model instead
of pushing model.

Limitations of the current code
* This implements only the case where batching is disabled, which means
no merge of splitting of requests + no timeout flushing.
* This implementation does not enforce an upper bound on concurrency

All these code paths are marked as panic currently, and they will be
replaced with actual implementation in coming PRs. This PR is split from
open-telemetry#11507 for
easier review.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing


#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
…d exports (open-telemetry#11546)

#### Description

This PR follows
open-telemetry#11540 and
implements support for item-count based batching for queue batcher.

Limitation: This PR supports merging request but not splitting request.
In other words, it support specifying a minimum request size but not a
maximum request size.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
…batcher (open-telemetry#11580)

#### Description

This PR follows
open-telemetry#11546 and
add support for splitting (i.e. support setting a maximum request size)

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
…equest.export() (open-telemetry#11636)

#### Description

This PR changes queue batcher to use `exportFunc` instead of
`request.export()`. This makes testing easier and avoid passing
unnecessary detail to the exporter batcher.

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
djaglowski pushed a commit to djaglowski/opentelemetry-collector that referenced this issue Nov 21, 2024
…tdown (open-telemetry#11666)

#### Description

This PR changes exporter queue batcher to flush the current batch on
shutdown.

#### Link to tracking issue

open-telemetry#10368
open-telemetry#8122
bogdandrutu pushed a commit that referenced this issue Nov 22, 2024
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR proceeds
#11637. It
* Introduces a noop feature gate that will be used for queue batcher.
* Updates exporter tests to run with both the feature gate on and off.

<!-- Issue number if applicable -->
#### Link to tracking issue
#10368
#8122


<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
dmitryax added a commit that referenced this issue Dec 2, 2024
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR solves
#10368.

Previously we use a pushing model between the queue and the batch,
resulting the batch size to be constrained by the
`sending_queue.num_consumers`, because the batch cannot accumulate more
than `sending_queue.num_consumers` blocked goroutines provide.

This PR changes it to a pulling model. We read from the queue until
threshold is met or timeout, then allocate a worker to asynchronously
send out the request.

<!-- Issue number if applicable -->
#### Link to tracking issue
Fixes
#10368
#8122

---------

Co-authored-by: Dmitrii Anoshin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants