diff --git a/.gitignore b/.gitignore index f3265cca..217b1d3b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ __pycache__/ go-dnscollector bin/ include/ -docs/_integration/elasticsearch/data/ \ No newline at end of file +docs/_integration/elasticsearch/data/ +docs/_integration/kafka/data/ \ No newline at end of file diff --git a/config.yml b/config.yml index 8cfebbb3..5e83a1f0 100644 --- a/config.yml +++ b/config.yml @@ -760,7 +760,7 @@ multiplexer: # # Kafka partition # partition: 0 # # Channel buffer size for incoming packets, number of packet before to drop it. -# chan-buffer-size: 65535 +# chan-buffer-size: 4096 # # Compression for Kafka messages: none, gzip, lz4, snappy, zstd # compression: none diff --git a/docs/_integration/kafka/README.md b/docs/_integration/kafka/README.md new file mode 100644 index 00000000..63b59221 --- /dev/null +++ b/docs/_integration/kafka/README.md @@ -0,0 +1,18 @@ + +# DNS-collector with Kafka + +- Copy folder [./docs/_integration/kafka] and start the docker stack: + + ```bash + sudo docker compose up -d + ``` + +- Go to Apache Kafka interface through `http://127.0.0.1:8080` + +- The `dnscollector` topics should be available. + +- Finally, run DNScollector from source: + + ```bash + go run . -config docs/_integration/kafka/config.yml + ``` diff --git a/docs/_integration/kafka/config.yml b/docs/_integration/kafka/config.yml new file mode 100644 index 00000000..6efa0890 --- /dev/null +++ b/docs/_integration/kafka/config.yml @@ -0,0 +1,36 @@ + +global: + trace: + verbose: true + +multiplexer: + collectors: + - name: tap + dnstap: + listen-ip: 0.0.0.0 + listen-port: 6000 + chan-buffer-size: 4096 + loggers: + - name: kafka + kafkaproducer: + remote-address: 127.0.0.1 + remote-port: 9092 + connect-timeout: 5 + retry-interval: 10 + flush-interval: 30 + tls-support: false + tls-insecure: false + sasl-support: false + sasl-mechanism: PLAIN + sasl-username: false + sasl-password: false + mode: flat-json + buffer-size: 100 + topic: "dnscollector" + partition: 0 + chan-buffer-size: 4096 + compression: none + + routes: + - from: [ tap ] + to: [ kafka ] \ No newline at end of file diff --git a/docs/_integration/kafka/docker-compose.yml b/docs/_integration/kafka/docker-compose.yml new file mode 100644 index 00000000..cf6354ac --- /dev/null +++ b/docs/_integration/kafka/docker-compose.yml @@ -0,0 +1,60 @@ +version: '3.8' +services: + kafka-ui: + image: provectuslabs/kafka-ui:v0.7.1 + container_name: kafka-ui + environment: + DYNAMIC_CONFIG_ENABLED: true + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + ports: + - 8080:8080 + depends_on: + - kafka + + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.6.0 + container_name: kafka + depends_on: + - zookeeper + ports: + - "9092:9092" + - "9997:9997" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9997 + KAFKA_JMX_HOSTNAME: kafka + volumes: + - "./data:/var/lib/kafka/data" + + + kafka-init-topics: + image: confluentinc/cp-kafka:7.6.0 + container_name: kafka-init-topic + volumes: + - ./message.json:/data/message.json + depends_on: + - kafka + command: "bash -c 'echo Waiting for Kafka to be ready... && \ + cub kafka-ready -b kafka:29092 1 30 && \ + kafka-topics --create --topic dnscollector --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092 < /data/message.json'" diff --git a/docs/_integration/kafka/message.json b/docs/_integration/kafka/message.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/docs/_integration/kafka/message.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/docs/loggers/logger_kafka.md b/docs/loggers/logger_kafka.md index 027fb8a0..c75f7063 100644 --- a/docs/loggers/logger_kafka.md +++ b/docs/loggers/logger_kafka.md @@ -4,51 +4,45 @@ Kafka producer, based on [kafka-go](https://github.com/segmentio/kafka-go) libra Options: -- `remote-address`: (string) remote address -- `remote-port`: (integer) remote tcp port -- `connect-timeout`: (integer) connect timeout in second -- `retry-interval`: (integer) interval in second between retry reconnect -- `flush-interval`: (integer) interval in second before to flush the buffer -- `tls-support`: (boolean) enable tls -- `tls-insecure`: (boolean) insecure skip verify -- `tls-min-version`: (string) min tls version, default to 1.2 -- `ca-file`: (string) provide CA file to verify the server certificate -- `cert-file`: (string) provide client certificate file for mTLS -- `key-file`: (string) provide client private key file for mTLS -- `sasl-support`: (boolean) enable SASL -- `sasl-username`: (string) SASL username -- `sasl-password`: (string) SASL password -- `sasl-mechanism`: (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512` -- `mode`: (string) output format: `text`, `json`, or `flat-json` -- `buffer-size`: (integer) how many DNS messages will be buffered before being sent -- `topic`: (integer) kafka topic to forward messages to -- `partition`: (integer) kafka partition -- `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it. -- `compression`: (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd` - -Default values: - -```yaml -kafkaproducer: - remote-address: 127.0.0.1 - remote-port: 9092 - connect-timeout: 5 - retry-interval: 10 - flush-interval: 30 - tls-support: false - tls-insecure: false - tls-min-version: 1.2 - ca-file: "" - cert-file: "" - key-file: "" - sasl-support: false - sasl-mechanism: PLAIN - sasl-username: "" - sasl-password: "" - mode: flat-json - buffer-size: 100 - topic: "dnscollector" - partition: 0 - chan-buffer-size: 65535 - compression: "none" -``` +- `remote-address` (string) remote address. Default to `127.0.0.1`. + > Specifies the remote address to connect to. +- `remote-port` (integer) remote tcp port. Default to `9092`. + > Specifies the remote TCP port to connect to. +- `connect-timeout` (integer) connect timeout in second. Default to `5` seconds. + > Specifies the maximum time to wait for a connection attempt to complete. +- `retry-interval` (integer) interval in second between retry reconnect. Default to `10` seconds. + > Specifies the interval between attempts to reconnect in case of connection failure. +- `flush-interval` (integer) interval in second before to flush the buffer. Default to `30` seconds. + > Specifies the interval between buffer flushes. +- `tls-support` (boolean) enable TLS. Default to `false`. + > Enables or disables TLS (Transport Layer Security) support. If set to true, TLS will be used for secure communication. +- `tls-insecure` (boolean) insecure skip verify. Default to `false`. + > If set to true, skip verification of server certificate. +- `tls-min-version` (string) min tls version. Default to `1.2`. + > Specifies the minimum TLS version that the server will support. +- `ca-file` (string) provide CA file to verify the server certificate. Default to `(empty)`. + > Specifies the path to the CA (Certificate Authority) file used to verify the server's certificate. +- `cert-file` (string) provide client certificate file for mTLS. Default to `(empty)`. + > Specifies the path to the certificate file to be used. This is a required parameter if TLS support is enabled. +- `key-file` (string) provide client private key file for mTLS. Default to `(empty)`. + > Specifies the path to the key file corresponding to the certificate file. This is a required parameter if TLS support is enabled. +- `sasl-support` (boolean) enable SASL. Default to `false`. + > Enable or disable SASL (Simple Authentication and Security Layer) support for Kafka. +- `sasl-username` (string) SASL username. Default to `(empty)`. + > Specifies the SASL username for authentication with Kafka brokers. +- `sasl-password` (string) SASL password. Default to `(empty)`. + > Specifies the SASL password for authentication with Kafka brokers. +- `sasl-mechanism` (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512`. Default to `PLAIN`. + > Specifies the SASL mechanism to use for authentication with Kafka brokers. +- `mode` (string) output format: `text`, `json`, or `flat-json`. Default to `flat-json`. + > Specifies the output format for Kafka messages. +- `buffer-size` (integer) how many DNS messages will be buffered before being sent. Default to `100`. + > Specifies the size of the buffer for DNS messages before they are sent to Kafka. +- `topic` (integer) kafka topic to forward messages to. Default to `dnscollector`. + > Specifies the Kafka topic to which messages will be forwarded. +- `partition` (integer) kafka partition. Default to `0`. + > Specifies the Kafka partition to which messages will be sent. +- `chan-buffer-size` (int) incoming channel size, number of packet before to drop it. Default to `4096`. + > Specifies the maximum number of packets that can be buffered before dropping additional packets. +- `compression` (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. Default to `none`. + > Specifies the compression algorithm to use for Kafka messages. diff --git a/loggers/kafkaproducer.go b/loggers/kafkaproducer.go index 8c28dd66..c7d5e044 100644 --- a/loggers/kafkaproducer.go +++ b/loggers/kafkaproducer.go @@ -264,7 +264,7 @@ func (k *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) { } func (k *KafkaProducer) Run() { - k.LogInfo("running in background...") + k.LogInfo("waiting dnsmessage to process...") // prepare next channels defaultRoutes, defaultNames := k.RoutingHandler.GetDefaultRoutes() @@ -276,7 +276,7 @@ func (k *KafkaProducer) Run() { subprocessors := transformers.NewTransforms(&k.config.OutgoingTransformers, k.logger, k.name, listChannel, 0) // goroutine to process transformed dns messages - go k.Process() + go k.ProcessDM() // loop to process incoming messages RUN_LOOP: @@ -320,7 +320,9 @@ RUN_LOOP: k.LogInfo("run terminated") } -func (k *KafkaProducer) Process() { +func (k *KafkaProducer) ProcessDM() { + k.LogInfo("waiting transformed dnsmessage to process...") + ctx, cancelKafka := context.WithCancel(context.Background()) defer cancelKafka() // Libérez les ressources liées au contexte @@ -336,8 +338,6 @@ func (k *KafkaProducer) Process() { go k.ConnectToKafka(ctx, readyTimer) - k.LogInfo("ready to process") - PROCESS_LOOP: for { select { diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index d363f62f..98b61e2b 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -544,7 +544,7 @@ func (c *ConfigLoggers) SetDefault() { c.KafkaProducer.FlushInterval = 10 c.KafkaProducer.Topic = "dnscollector" c.KafkaProducer.Partition = 0 - c.KafkaProducer.ChannelBufferSize = 65535 + c.KafkaProducer.ChannelBufferSize = 4096 c.KafkaProducer.Compression = CompressNone c.FalcoClient.Enable = false