-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] buffered channel drain single collect #52
[FEAT] buffered channel drain single collect #52
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason I cannot see the full diff even tho this PR is targeting main
...
Anyway. Looks good, but I saw a change of behavior here https://github.com/calyptia/plugin/blob/f16bef3495ec8e7b50eb620377ac6867fcceb3f2/cshared.go#L190-L192
Before if the channel gets closed, then it returns with ok, now it returns with error.
I don't mind either one, I just want to be sure what we wan't to do here.
@nicolasparada do you still see the issue with the latency/delays that devo is reporting? |
No, getting nanosecond measurements :) |
@pwhelan can you please resolve the conflicts here. |
Due to the revert in #44 I was unable to rebase #52 interactively so the changes will have to be applied in a single patch. This unfortunately means the loss of git history, but so be it. Here is the changelog: * test: add a sleep in the infinite call to give time for collect to be accidentally invoked after Done(). * input: invoke Collect only once and only ever once. Tests have been updated to enforce this behavior. * test: improve dangling test. * test: add latency test. Signed-off-by: Phillip Whelan <[email protected]>
… issues. Signed-off-by: Phillip Whelan <[email protected]>
f16bef3
to
266c000
Compare
Signed-off-by: Phillip Whelan <[email protected]>
@nicolasparada The tests can fail intermittently. I opened an issue on the backlog,https://app.asana.com/0/1205296321537082/1205660623899255/f. I mention it just in case maybe you can see the root cause and to also just run the tests a few times to see they are sane. |
It is done. I had to reapply with a patch. |
Signed-off-by: Phillip Whelan <[email protected]>
Signed-off-by: Phillip Whelan <[email protected]>
Signed-off-by: Phillip Whelan <[email protected]>
Signed-off-by: Phillip Whelan <[email protected]>
@niedbalski I have changed CI to run each test individually... which should be easy enough to revert later when we work on hot-reload. Here is a log of me running fluent-bit -c test.conf
Fluent Bit v2.1.10
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2023/10/05 16:28:06] [ info] Configuration:
[2023/10/05 16:28:06] [ info] flush time | 1.000000 seconds
[2023/10/05 16:28:06] [ info] grace | 5 seconds
[2023/10/05 16:28:06] [ info] daemon | 0
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info] inputs:
[2023/10/05 16:28:06] [ info] http_loader
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info] filters:
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info] outputs:
[2023/10/05 16:28:06] [ info] stdout.0
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info] collectors:
[2023/10/05 16:28:06] [ info] [fluent bit] version=2.1.10, commit=1a41f49dc2, pid=602128
[2023/10/05 16:28:06] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/10/05 16:28:06] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/10/05 16:28:06] [ info] [cmetrics] version=0.6.3
[2023/10/05 16:28:06] [ info] [ctraces ] version=0.3.1
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] initializing
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] storage_strategy='memory' (memory only)
[2023/10/05 16:28:06] [ info] http_loader: defaulting method to GET
[2023/10/05 16:28:06] [ info] http_loader: defaulting skip to `{{or (ge .Response.StatusCode 400) (empty .Response.Body)}}`
[2023/10/05 16:28:06] [debug] Using storage key "http_loader_v2_da76aa41"
[2023/10/05 16:28:06] [debug] Data loaded from local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:06] [debug] Successfully json unmarshalled loaded data http://localhost:8888/test.json
[2023/10/05 16:28:06] [debug] Failed to json unmarshal stored request body, returning original string ""
[2023/10/05 16:28:06] [debug] Successfully processed loaded data with index index 2
[2023/10/05 16:28:06] [debug] [input:http_loader:http_loader.0] [thread init] initialization OK
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] thread instance initialized
[2023/10/05 16:28:06] [debug] [http_loader:http_loader.0] created event channels: read=30 write=34
[2023/10/05 16:28:06] [debug] [stdout:stdout.0] created event channels: read=37 write=38
[2023/10/05 16:28:06] [ info] [sp] stream processor started
[2023/10/05 16:28:06] [ info] [output:stdout:stdout.0] worker #0 started
[2023/10/05 16:28:07] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:07] [debug] Collect invoked
[2023/10/05 16:28:07] [debug] Sending request: method="GET" url="http://localhost:8888/test.json" headers=map[User-Agent:[Fluent-Bit HTTP Loader Plugin]] body=""
[2023/10/05 16:28:07] [debug] Received response: status_code=200 headers=map[Content-Length:[112] Content-Type:[application/json] Date:[Thu, 05 Oct 2023 19:28:07 GMT] Last-Modified:[Thu, 05 Oct 2023 19:26:49 GMT] Server:[SimpleHTTP/0.6 Python/3.11.5]] body="{\n\t\"records\": [\n\t\t{\"foo\": \"bar\"},\n\t\t{\"foo\": \"bat\"},\n\t\t{\"foo\": \"bad\"},\n\t\t{\"foo\": \"bath\"},\n\t\t{\"foo\": \"barf\"}\n\t]\n}\n"
[2023/10/05 16:28:07] [debug] Sending record: map[value:records]
[2023/10/05 16:28:07] [debug] Initiating store process with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:07] [debug] Successfully stored data in local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:07] [ warn] Error updating metadata in cloud API with key "http_loader_v2_da76aa41": cloud api: client not initialized or pipelineID missing
[2023/10/05 16:28:08] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=http_loader.0
[2023/10/05 16:28:09] [debug] [task] created task=0x7fff9001fc60 id=0 OK
[2023/10/05 16:28:09] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/10/05 16:28:09] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
{"date":1696534087.430507,"value":"records"}
[2023/10/05 16:28:09] [debug] [out flush] cb_destroy coro_id=0
[2023/10/05 16:28:09] [debug] [task] destroy task=0x7fff9001fc60 (task_id=0)
[2023/10/05 16:28:10] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:11] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:12] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:13] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:14] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:15] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:16] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:17] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:17] [debug] Sending request: method="GET" url="http://localhost:8888/test.json" headers=map[User-Agent:[Fluent-Bit HTTP Loader Plugin]] body=""
[2023/10/05 16:28:17] [debug] Received response: status_code=200 headers=map[Content-Length:[112] Content-Type:[application/json] Date:[Thu, 05 Oct 2023 19:28:17 GMT] Last-Modified:[Thu, 05 Oct 2023 19:26:49 GMT] Server:[SimpleHTTP/0.6 Python/3.11.5]] body="{\n\t\"records\": [\n\t\t{\"foo\": \"bar\"},\n\t\t{\"foo\": \"bat\"},\n\t\t{\"foo\": \"bad\"},\n\t\t{\"foo\": \"bath\"},\n\t\t{\"foo\": \"barf\"}\n\t]\n}\n"
[2023/10/05 16:28:17] [debug] Sending record: map[value:records]
[2023/10/05 16:28:17] [debug] Initiating store process with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:17] [debug] Successfully stored data in local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:17] [ warn] Error updating metadata in cloud API with key "http_loader_v2_da76aa41": cloud api: client not initialized or pipelineID missing
[2023/10/05 16:28:18] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=http_loader.0
[2023/10/05 16:28:19] [debug] [task] created task=0x7fff9005fee0 id=0 OK
[2023/10/05 16:28:19] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/10/05 16:28:19] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
{"date":1696534097.481164,"value":"records"}
[2023/10/05 16:28:19] [debug] [out flush] cb_destroy coro_id=1
[2023/10/05 16:28:19] [debug] [task] destroy task=0x7fff9005fee0 (task_id=0)
[2023/10/05 16:28:20] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:21] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:22] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
^C[2023/10/05 16:28:23] [engine] caught signal (SIGINT)
[2023/10/05 16:28:23] [ warn] [engine] service will shutdown in max 5 seconds
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread pause instance
[2023/10/05 16:28:23] [ info] [engine] service has stopped (0 pending tasks)
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread pause instance
[2023/10/05 16:28:23] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/10/05 16:28:23] [ info] [output:stdout:stdout.0] thread worker #0 stopped
[2023/10/05 16:28:23] [debug] [GO] running exit callback
2023/10/05 16:28:23 calling FLBPluginExit(): name="http_loader"
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread exit instance This is with vanilla fluent-bit v2.1.10 (possibly compiled from master). It also works with |
@nicolasparada are we still good here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Split an array and send them all now take nanoseconds and stop
condition on the http_loader
is working fine :)
Awesome work 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments which are not handled shortly. Basically, looks good to me.
This is a minor change for #44 that only ever invokes
Collect
once. Only once and only ever once! This has been both documented as well as tests added to make sure it remains this way.This is per @nicolasparada. This assumption might also have been made by @niedbalski. There is at least one plugin I know of which might be affected by this, but it might be that one single plugin.