Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak with Kafka and ElasticLogger ? #567

Closed
secure-xxx opened this issue Jan 23, 2024 · 16 comments
Closed

Memory leak with Kafka and ElasticLogger ? #567

secure-xxx opened this issue Jan 23, 2024 · 16 comments
Labels
bug Something isn't working
Milestone

Comments

@secure-xxx
Copy link

image

In 0.32 version (with kafka logger) memory leak absent or minimal. In other newest versions it grow. Same config was been used.

#529

@secure-xxx
Copy link
Author

apiVersion: v1
kind: ConfigMap
metadata:
name: config-yml
data:
config.yml: |
global:
trace:
verbose: true

multiplexer:
  # Listen on tcp/6000 for incoming DNSTap protobuf messages from dns servers
  collectors:
    - name: powerdns
      powerdns:
        listen-ip: 0.0.0.0
        listen-port: 6000

  # Exposes prometheus metrics
  loggers:
    - name: prom
      prometheus:
        listen-ip: 0.0.0.0
        listen-port: 8080
        basic-auth-login: admin
        basic-auth-pwd: XXXXXXXXXX
    - name: kafka
      kafkaproducer:
        remote-address: XXXXXXXX
        remote-port: XXXXXX
        connect-timeout: 5
        retry-interval: 10
        flush-interval: 30
        tls-support: false
        tls-insecure: false
        tls-min-version: 1.2
        ca-file: ""
        cert-file: ""
        key-file: ""
        sasl-support: false
        sasl-mechanism: PLAIN
        sasl-username: ""
        sasl-password: ""
        mode: json
        buffer-size: 0
        topic: "XXX"
        partition: 0
        chan-buffer-size: 65535
        compression: "none"

  # Routes DNS messages from the tap collector to the prometheus logger
  routes:
    - from: [powerdns]
      to: [prom, kafka]

@dmachard
Copy link
Owner

Could you please try using version 0.40.2?

For sure, the DNS collector is consuming more memory than it did at the beginning of the project.

Perhaps you may consider adjusting the following settings (it depend of your traffic volume):

  • The chan-buffer-size setting can be reduced
  • The prometheus logger can consume memory, to limit that try to adjust the following settings
    requesters-cache-size: 250000
    requesters-cache-ttl: 3600
    domains-cache-size: 500000
    domains-cache-ttl: 3600
    noerror-domains-cache-size: 100000
    noerror-domains-cache-ttl: 3600
    servfail-domains-cache-size: 10000
    servfail-domains-cache-ttl: 3600
    nonexistent-domains-cache-size: 10000
    nonexistent-domains-cache-ttl: 3600
    default-domains-cache-size: 1000
    default-domains-cache-ttl: 3600
    

@misaki-kawakami
Copy link

misaki-kawakami commented Feb 28, 2024

I am seeing the same problem but with an elastic search client. When the collector runs on instances where we have a lot of traffic, it uses up all the memory within a few hours. I'm not very familiar with the go programming language to analyse the problem more deeply, but I've tried using pprof and this is output. Maybe this will give you some clues.

image

@dmachard
Copy link
Owner

Thank for pprof.
Could you share your config file ? How much traffic ?

@misaki-kawakami
Copy link

4k queries per second

multiplexer:
  collectors:
    - name: xdp
      transforms:
        extract:
          add-payload: true
        normalize:
          qname-lowercase: true
      xdp-sniffer:
        device: enp3s0

  loggers:
    - elasticsearch:
        index: recursor
        server: xxxx
        flush-interval: 1
      name: elastic_1
    - elasticsearch:
        index: recursor
        server: xxxx
        flush-interval: 1
      name: elastic_2

  routes:
    - from:
      - xdp
      to:
      - elastic_1
      - elastic_2

@dmachard
Copy link
Owner

dmachard commented Feb 29, 2024

@misaki-kawakami thank to report this issue, could you try with the fix in the branch https://github.com/dmachard/go-dnscollector/tree/fix-memory-leak ?

Please also add (or update) the bulk-size (in megabytes) and chan-buffer-size options

chan-buffer-size: 2048
bulk-size: 1048576 # in bytes
flush-interval: 10

if the following error message appears, you need to increase the chan-buffer-size :

logger[elastic] buffer is full, 7855 packet(s) dropped

@secure-xxx Regarding kafka, I need to do more testing but I identified a wrong default value for channel size.

@misaki-kawakami
Copy link

misaki-kawakami commented Mar 1, 2024

Thank you. I no longer notice the memory growing over time 👍

@dmachard
Copy link
Owner

dmachard commented Mar 1, 2024

thank you for testing, it's really appreciated.

@dmachard
Copy link
Owner

dmachard commented Mar 3, 2024

Thank you. I no longer notice the memory growing over time 👍

Could you do some new tests ? I did some optimizations to reduce CPU usage and memory.

@dmachard
Copy link
Owner

dmachard commented Mar 5, 2024

@secure-xxx I did more tests with the kafka producer and observed no memory leaks with it. In your case, it's clearly the prometheus logger. Have you tried to adjust thesettings like my suggestion ?

@misaki-kawakami
Copy link

I have tested your optimization changes. I did a stress test with about 7k queries/second. Unfortunately, no data seems to be sent. I don't see any error messages in the log. How should I adjust the settings?
Output from the pprof:
Screenshot from 2024-03-06 17-22-51

sDM in loggers/elasticsearch.go
    1.06GB     1.07GB (flat, cum) 85.13% of Total
         .          .    226:	ec.LogInfo("waiting transformed dnsmessage to process...")
         .          .    227:
         .          .    228:	// buffer
         .          .    229:	buffer := bytes.NewBuffer(nil)
         .          .    230:	// create a new encoder that writes to the buffer
         .          .    231:	encoder := json.NewEncoder(buffer)
         .          .    232:
         .          .    233:	flushInterval := time.Duration(ec.config.Loggers.ElasticSearchClient.FlushInterval) * time.Second
         .          .    234:	flushTimer := time.NewTimer(flushInterval)
         .          .    235:
         .          .    236:PROCESS_LOOP:
         .          .    237:	for {
         .          .    238:		select {
         .          .    239:		case <-ec.stopProcess:
         .          .    240:			ec.doneProcess <- true
         .          .    241:			break PROCESS_LOOP
         .          .    242:
         .          .    243:		// incoming dns message to process
         .          .    244:		case dm, opened := <-ec.outputChan:
         .          .    245:			if !opened {
         .          .    246:				ec.LogInfo("output channel closed!")
         .          .    247:				return
         .          .    248:			}
         .          .    249:
         .          .    250:			// append dns message to buffer
         .          .    251:			flat, err := dm.Flatten()
         .     4.51MB    252:			if err != nil {
         .          .    253:				ec.LogError("flattening DNS message failed: %e", err)
         .          .    254:			}
         .          .    255:
         .          .    256:			buffer.WriteString("{ \"create\" : {}}\n")
         .          .    257:			//	buffer.WriteString("\n")
         .          .    258:			encoder.Encode(flat)
         .     9.17MB    259:
         .          .    260:			// Send data and reset buffer
         .          .    261:			if buffer.Len() >= ec.config.Loggers.ElasticSearchClient.BulkSize {
         .          .    262:				bufCopy := make([]byte, buffer.Len())
    1.05GB     1.05GB    263:				copy(bufCopy, buffer.Bytes())
         .          .    264:				buffer.Reset()
         .          .    265:
         .          .    266:				select {
         .          .    267:				case ec.sendChan <- bufCopy:
         .          .    268:				default:
         .          .    269:					ec.LogError("send buffer is full, bulk dropped")
         .          .    270:				}
         .          .    271:			}
         .          .    272:
         .          .    273:		// flush the buffer every ?
         .          .    274:		case <-flushTimer.C:
         .          .    275:
         .          .    276:			// Send data and reset buffer
         .          .    277:			if buffer.Len() > 0 {
         .          .    278:				bufCopy := make([]byte, buffer.Len())
   12.37MB    12.37MB    279:				copy(bufCopy, buffer.Bytes())
         .          .    280:				buffer.Reset()
         .          .    281:
         .          .    282:				select {
         .          .    283:				case ec.sendChan <- bufCopy:
         .          .    284:				default:

@dmachard
Copy link
Owner

dmachard commented Mar 6, 2024

I am surprised because I did stress tests to in my side with 4kqps without issue. .
Could you share the config used ?

@misaki-kawakami
Copy link

Added this to elastic loggers:

chan-buffer-size: 32768
bulk-size: 1048576
flush-interval: 1

@dmachard
Copy link
Owner

dmachard commented Mar 8, 2024

Thanks, I will make more test with your settings and QPS.

@dmachard
Copy link
Owner

After investigation, the increase of memory is expected because the bulk of DNSmessages are sent in ordered way to avoid to hurt the API of your elastic instance. But in DNS-collector side; it cost in memory to do that...

However, I have made improvements to limit the use of the memory. Could you try with the following config and the branch https://github.com/dmachard/go-dnscollector/tree/fix-elastic-leak ?

chan-buffer-size: 4096
bulk-size: 5242880 # chuck of 5MB
flush-interval: 10 # flush every 5s
compression: gzip
bulk-channel-size: 10 

If you observed the following error, try to increase the bulk-channel-size

ERROR: 2024/03/10 14:30:44.201259 logger - [elastic] elasticsearch - Send buffer is full, bulk dropped

@dmachard dmachard changed the title Memory leak Memory leak with Kafka and ElasticLogger ? Mar 12, 2024
@misaki-kawakami
Copy link

I have tested your tweaks and I no longer see such a large increase in memory usage. Everything is working fine.

@dmachard dmachard added this to the v0.42.0 milestone Mar 14, 2024
@dmachard dmachard added bug Something isn't working and removed needs more investigation labels Mar 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants