Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Failed to submit a listener notification task. Event loop shut down? #4441

Closed
ricardorqr opened this issue Apr 18, 2024 · 1 comment
Closed
Labels
bug Something isn't working untriaged

Comments

@ricardorqr
Copy link

ricardorqr commented Apr 18, 2024

Hello everyone,

Can I have a hand on this error? For some reason, I started having this problem below:

2024-04-18T13:38:49,018 [armeria-boss-http-*:21890] ERROR  io.netty.util.concurrent.DefaultPromise - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated

I even have no idea where to start investigating. This is my pipeline:

  pipelines.yaml: |
    log-pipeline:
      source:
        http:
          # Explicitly disable SSL
          ssl: false
          # Explicitly disable authentication
          authentication:
            unauthenticated:
          # The default port that will listen for incoming logs
          port: 21890
      # https://github.com/opensearch-project/data-prepper/issues/2147
      buffer:
        bounded_blocking:
          buffer_size: 2000000 # max number of records the buffer accepts
          batch_size: 400 # max number of records the buffer drains after each read
      processor:
        - grok:
            match:
              # This will match logs with a "log" key against the COMMONAPACHELOG pattern (ex: { "log": "actual apache log..." } )
              # You should change this to match what your logs look like. See the grok documenation to get started.
              log: [ "%{COMMONAPACHELOG}" ]
      sink:
        #- stdout:  # print in the console
        - opensearch:
            hosts: [ "https://opensearch-cluster-master:9200" ]
            # Change to your credentials
            username: admin
            password: admin
            # Add a certificate file if you are accessing an OpenSearch cluster with a self-signed certificate  
            #cert: /path/to/cert
            # If you are connecting to an Amazon OpenSearch Service domain without
            # Fine-Grained Access Control, enable these settings. Comment out the
            # username and password above.
            #aws_sigv4: true
            #aws_region: us-east-1
            # Since we are grok matching for apache logs, it makes sense to send them to an OpenSearch index named apache_logs.
            # You should change this to correspond with how your OpenSearch indices are set up.
            index: devopsv2-rico-index
            insecure: true
  data-prepper-config.yaml: |
    ssl: false

And this is the full stack error:

2024-04-18T13:38:49,018 [armeria-boss-http-*:21890] ERROR  io.netty.util.concurrent.DefaultPromise - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at com.linecorp.armeria.internal.common.util.ChannelUtil.close(ChannelUtil.java:189) ~[armeria-1.26.4.jar:?]
	at com.linecorp.armeria.server.Server$ServerStartStopSupport.lambda$doStop$11(Server.java:651) ~[armeria-1.26.4.jar:?]
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
	at com.linecorp.armeria.internal.common.util.ChannelUtil.lambda$close$0(ChannelUtil.java:184) ~[armeria-1.26.4.jar:?]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:756) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:749) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext.access$1200(AbstractChannelHandlerContext.java:61) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext$11.run(AbstractChannelHandlerContext.java:732) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]

Just in case, here is the complete data-prepper.log

I appreciate any help in advance.

@ricardorqr
Copy link
Author

Hi guys,

I have fixed this problem. As you can see in the data-prepper.log, there are tons of the error below:

2024-04-18T13:37:29,973 [log-pipeline-sink-worker-2-thread-1] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy - operation = Index, error = object mapping for [kubernetes.labels.app] tried to parse field [app] as object, but found a concrete value

Investigating a bit more I found out this post.

Then, everything is working. This is my implementation for Fluentbit.

      luaScripts:
        dedot_file.lua: |
          function dedot_function(tag, timestamp, record)
            if record["kubernetes"] == nil then
              return 0, 0, 0
            end
            dedot_keys(record["kubernetes"]["annotations"])
            dedot_keys(record["kubernetes"]["labels"])
            return 1, timestamp, record
          end
          
          function dedot_keys(map)
            if map == nil then
              return
            end
            local new_map = {}
            local changed_keys = {}
            for k, v in pairs(map) do
              local deslashed = string.gsub(k, "%/", "_")
              local dedotted = string.gsub(deslashed, "%.", "_")
              if dedotted ~= k then
                new_map[dedotted] = v
                changed_keys[k] = true
              end
            end
            for k in pairs(changed_keys) do
              map[k] = nil
            end
            for k, v in pairs(new_map) do
              map[k] = v
            end
          end

      ## https://docs.fluentbit.io/manual/pipeline/inputs
      config:
        inputs: |
          [INPUT]
              Name tail
              Path /var/log/containers/*.log
              multiline.parser docker, cri
              Tag kube.*
              Mem_Buf_Limit 5MB
              Skip_Long_Lines On

          [INPUT]
              Name systemd
              Tag host.*
              Systemd_Filter _SYSTEMD_UNIT=kubelet.service
              Read_From_Tail On

        ## https://docs.fluentbit.io/manual/pipeline/filters
        filters: |
          [FILTER]
              Name kubernetes
              Match kube.*
              Merge_Log On
              Keep_Log Off
              K8S-Logging.Parser On
              K8S-Logging.Exclude On
          
          [FILTER]
              Name lua
              Match *
              script /fluent-bit/scripts/dedot_file.lua
              call dedot_function

        ## https://docs.fluentbit.io/manual/pipeline/outputs
        outputs: |
          # Print in the console
          #[OUTPUT]
          #    Name stdout
          #    Match *

          [OUTPUT]
              Name http
              Match *
              Host data-prepper-headless
              Port 21890
              URI /log/ingest
              Format json

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working untriaged
Projects
Development

No branches or pull requests

1 participant