Skip to content

Commit

Permalink
add more docs for kafka (#637)
Browse files Browse the repository at this point in the history
change default channel size to 4096
  • Loading branch information
dmachard authored Mar 5, 2024
1 parent 06c9293 commit 9bff9a7
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 56 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ __pycache__/
go-dnscollector
bin/
include/
docs/_integration/elasticsearch/data/
docs/_integration/elasticsearch/data/
docs/_integration/kafka/data/
2 changes: 1 addition & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ multiplexer:
# # Kafka partition
# partition: 0
# # Channel buffer size for incoming packets, number of packet before to drop it.
# chan-buffer-size: 65535
# chan-buffer-size: 4096
# # Compression for Kafka messages: none, gzip, lz4, snappy, zstd
# compression: none

Expand Down
18 changes: 18 additions & 0 deletions docs/_integration/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

# DNS-collector with Kafka

- Copy folder [./docs/_integration/kafka] and start the docker stack:

```bash
sudo docker compose up -d
```

- Go to Apache Kafka interface through `http://127.0.0.1:8080`

- The `dnscollector` topics should be available.

- Finally, run DNScollector from source:

```bash
go run . -config docs/_integration/kafka/config.yml
```
36 changes: 36 additions & 0 deletions docs/_integration/kafka/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

global:
trace:
verbose: true

multiplexer:
collectors:
- name: tap
dnstap:
listen-ip: 0.0.0.0
listen-port: 6000
chan-buffer-size: 4096
loggers:
- name: kafka
kafkaproducer:
remote-address: 127.0.0.1
remote-port: 9092
connect-timeout: 5
retry-interval: 10
flush-interval: 30
tls-support: false
tls-insecure: false
sasl-support: false
sasl-mechanism: PLAIN
sasl-username: false
sasl-password: false
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
chan-buffer-size: 4096
compression: none

routes:
- from: [ tap ]
to: [ kafka ]
60 changes: 60 additions & 0 deletions docs/_integration/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3.8'
services:
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.1
container_name: kafka-ui
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
ports:
- 8080:8080
depends_on:
- kafka

zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: kafka
volumes:
- "./data:/var/lib/kafka/data"


kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka-init-topic
volumes:
- ./message.json:/data/message.json
depends_on:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic dnscollector --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092 < /data/message.json'"
1 change: 1 addition & 0 deletions docs/_integration/kafka/message.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
90 changes: 42 additions & 48 deletions docs/loggers/logger_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,45 @@ Kafka producer, based on [kafka-go](https://github.com/segmentio/kafka-go) libra

Options:

- `remote-address`: (string) remote address
- `remote-port`: (integer) remote tcp port
- `connect-timeout`: (integer) connect timeout in second
- `retry-interval`: (integer) interval in second between retry reconnect
- `flush-interval`: (integer) interval in second before to flush the buffer
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
- `tls-min-version`: (string) min tls version, default to 1.2
- `ca-file`: (string) provide CA file to verify the server certificate
- `cert-file`: (string) provide client certificate file for mTLS
- `key-file`: (string) provide client private key file for mTLS
- `sasl-support`: (boolean) enable SASL
- `sasl-username`: (string) SASL username
- `sasl-password`: (string) SASL password
- `sasl-mechanism`: (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512`
- `mode`: (string) output format: `text`, `json`, or `flat-json`
- `buffer-size`: (integer) how many DNS messages will be buffered before being sent
- `topic`: (integer) kafka topic to forward messages to
- `partition`: (integer) kafka partition
- `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it.
- `compression`: (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`

Default values:

```yaml
kafkaproducer:
remote-address: 127.0.0.1
remote-port: 9092
connect-timeout: 5
retry-interval: 10
flush-interval: 30
tls-support: false
tls-insecure: false
tls-min-version: 1.2
ca-file: ""
cert-file: ""
key-file: ""
sasl-support: false
sasl-mechanism: PLAIN
sasl-username: ""
sasl-password: ""
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
chan-buffer-size: 65535
compression: "none"
```
- `remote-address` (string) remote address. Default to `127.0.0.1`.
> Specifies the remote address to connect to.
- `remote-port` (integer) remote tcp port. Default to `9092`.
> Specifies the remote TCP port to connect to.
- `connect-timeout` (integer) connect timeout in second. Default to `5` seconds.
> Specifies the maximum time to wait for a connection attempt to complete.
- `retry-interval` (integer) interval in second between retry reconnect. Default to `10` seconds.
> Specifies the interval between attempts to reconnect in case of connection failure.
- `flush-interval` (integer) interval in second before to flush the buffer. Default to `30` seconds.
> Specifies the interval between buffer flushes.
- `tls-support` (boolean) enable TLS. Default to `false`.
> Enables or disables TLS (Transport Layer Security) support. If set to true, TLS will be used for secure communication.
- `tls-insecure` (boolean) insecure skip verify. Default to `false`.
> If set to true, skip verification of server certificate.
- `tls-min-version` (string) min tls version. Default to `1.2`.
> Specifies the minimum TLS version that the server will support.
- `ca-file` (string) provide CA file to verify the server certificate. Default to `(empty)`.
> Specifies the path to the CA (Certificate Authority) file used to verify the server's certificate.
- `cert-file` (string) provide client certificate file for mTLS. Default to `(empty)`.
> Specifies the path to the certificate file to be used. This is a required parameter if TLS support is enabled.
- `key-file` (string) provide client private key file for mTLS. Default to `(empty)`.
> Specifies the path to the key file corresponding to the certificate file. This is a required parameter if TLS support is enabled.
- `sasl-support` (boolean) enable SASL. Default to `false`.
> Enable or disable SASL (Simple Authentication and Security Layer) support for Kafka.
- `sasl-username` (string) SASL username. Default to `(empty)`.
> Specifies the SASL username for authentication with Kafka brokers.
- `sasl-password` (string) SASL password. Default to `(empty)`.
> Specifies the SASL password for authentication with Kafka brokers.
- `sasl-mechanism` (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512`. Default to `PLAIN`.
> Specifies the SASL mechanism to use for authentication with Kafka brokers.
- `mode` (string) output format: `text`, `json`, or `flat-json`. Default to `flat-json`.
> Specifies the output format for Kafka messages.
- `buffer-size` (integer) how many DNS messages will be buffered before being sent. Default to `100`.
> Specifies the size of the buffer for DNS messages before they are sent to Kafka.
- `topic` (integer) kafka topic to forward messages to. Default to `dnscollector`.
> Specifies the Kafka topic to which messages will be forwarded.
- `partition` (integer) kafka partition. Default to `0`.
> Specifies the Kafka partition to which messages will be sent.
- `chan-buffer-size` (int) incoming channel size, number of packet before to drop it. Default to `4096`.
> Specifies the maximum number of packets that can be buffered before dropping additional packets.
- `compression` (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. Default to `none`.
> Specifies the compression algorithm to use for Kafka messages.
10 changes: 5 additions & 5 deletions loggers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (k *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {
}

func (k *KafkaProducer) Run() {
k.LogInfo("running in background...")
k.LogInfo("waiting dnsmessage to process...")

// prepare next channels
defaultRoutes, defaultNames := k.RoutingHandler.GetDefaultRoutes()
Expand All @@ -276,7 +276,7 @@ func (k *KafkaProducer) Run() {
subprocessors := transformers.NewTransforms(&k.config.OutgoingTransformers, k.logger, k.name, listChannel, 0)

// goroutine to process transformed dns messages
go k.Process()
go k.ProcessDM()

// loop to process incoming messages
RUN_LOOP:
Expand Down Expand Up @@ -320,7 +320,9 @@ RUN_LOOP:
k.LogInfo("run terminated")
}

func (k *KafkaProducer) Process() {
func (k *KafkaProducer) ProcessDM() {
k.LogInfo("waiting transformed dnsmessage to process...")

ctx, cancelKafka := context.WithCancel(context.Background())
defer cancelKafka() // Libérez les ressources liées au contexte

Expand All @@ -336,8 +338,6 @@ func (k *KafkaProducer) Process() {

go k.ConnectToKafka(ctx, readyTimer)

k.LogInfo("ready to process")

PROCESS_LOOP:
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (c *ConfigLoggers) SetDefault() {
c.KafkaProducer.FlushInterval = 10
c.KafkaProducer.Topic = "dnscollector"
c.KafkaProducer.Partition = 0
c.KafkaProducer.ChannelBufferSize = 65535
c.KafkaProducer.ChannelBufferSize = 4096
c.KafkaProducer.Compression = CompressNone

c.FalcoClient.Enable = false
Expand Down

0 comments on commit 9bff9a7

Please sign in to comment.