Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more docs for kafka #637

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ __pycache__/
go-dnscollector
bin/
include/
docs/_integration/elasticsearch/data/
docs/_integration/elasticsearch/data/
docs/_integration/kafka/data/
2 changes: 1 addition & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ multiplexer:
# # Kafka partition
# partition: 0
# # Channel buffer size for incoming packets, number of packet before to drop it.
# chan-buffer-size: 65535
# chan-buffer-size: 4096
# # Compression for Kafka messages: none, gzip, lz4, snappy, zstd
# compression: none

Expand Down
18 changes: 18 additions & 0 deletions docs/_integration/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

# DNS-collector with Kafka

- Copy folder [./docs/_integration/kafka] and start the docker stack:

```bash
sudo docker compose up -d
```

- Go to Apache Kafka interface through `http://127.0.0.1:8080`

- The `dnscollector` topics should be available.

- Finally, run DNScollector from source:

```bash
go run . -config docs/_integration/kafka/config.yml
```
36 changes: 36 additions & 0 deletions docs/_integration/kafka/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

global:
trace:
verbose: true

multiplexer:
collectors:
- name: tap
dnstap:
listen-ip: 0.0.0.0
listen-port: 6000
chan-buffer-size: 4096
loggers:
- name: kafka
kafkaproducer:
remote-address: 127.0.0.1
remote-port: 9092
connect-timeout: 5
retry-interval: 10
flush-interval: 30
tls-support: false
tls-insecure: false
sasl-support: false
sasl-mechanism: PLAIN
sasl-username: false
sasl-password: false
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
chan-buffer-size: 4096
compression: none

routes:
- from: [ tap ]
to: [ kafka ]
60 changes: 60 additions & 0 deletions docs/_integration/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3.8'
services:
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.1
container_name: kafka-ui
environment:
DYNAMIC_CONFIG_ENABLED: true
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
ports:
- 8080:8080
depends_on:
- kafka

zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: kafka
volumes:
- "./data:/var/lib/kafka/data"


kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka-init-topic
volumes:
- ./message.json:/data/message.json
depends_on:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic dnscollector --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092 < /data/message.json'"
1 change: 1 addition & 0 deletions docs/_integration/kafka/message.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
90 changes: 42 additions & 48 deletions docs/loggers/logger_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,45 @@ Kafka producer, based on [kafka-go](https://github.com/segmentio/kafka-go) libra

Options:

- `remote-address`: (string) remote address
- `remote-port`: (integer) remote tcp port
- `connect-timeout`: (integer) connect timeout in second
- `retry-interval`: (integer) interval in second between retry reconnect
- `flush-interval`: (integer) interval in second before to flush the buffer
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
- `tls-min-version`: (string) min tls version, default to 1.2
- `ca-file`: (string) provide CA file to verify the server certificate
- `cert-file`: (string) provide client certificate file for mTLS
- `key-file`: (string) provide client private key file for mTLS
- `sasl-support`: (boolean) enable SASL
- `sasl-username`: (string) SASL username
- `sasl-password`: (string) SASL password
- `sasl-mechanism`: (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512`
- `mode`: (string) output format: `text`, `json`, or `flat-json`
- `buffer-size`: (integer) how many DNS messages will be buffered before being sent
- `topic`: (integer) kafka topic to forward messages to
- `partition`: (integer) kafka partition
- `chan-buffer-size`: (integer) channel buffer size used on incoming dns message, number of messages before to drop it.
- `compression`: (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`

Default values:

```yaml
kafkaproducer:
remote-address: 127.0.0.1
remote-port: 9092
connect-timeout: 5
retry-interval: 10
flush-interval: 30
tls-support: false
tls-insecure: false
tls-min-version: 1.2
ca-file: ""
cert-file: ""
key-file: ""
sasl-support: false
sasl-mechanism: PLAIN
sasl-username: ""
sasl-password: ""
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
chan-buffer-size: 65535
compression: "none"
```
- `remote-address` (string) remote address. Default to `127.0.0.1`.
> Specifies the remote address to connect to.
- `remote-port` (integer) remote tcp port. Default to `9092`.
> Specifies the remote TCP port to connect to.
- `connect-timeout` (integer) connect timeout in second. Default to `5` seconds.
> Specifies the maximum time to wait for a connection attempt to complete.
- `retry-interval` (integer) interval in second between retry reconnect. Default to `10` seconds.
> Specifies the interval between attempts to reconnect in case of connection failure.
- `flush-interval` (integer) interval in second before to flush the buffer. Default to `30` seconds.
> Specifies the interval between buffer flushes.
- `tls-support` (boolean) enable TLS. Default to `false`.
> Enables or disables TLS (Transport Layer Security) support. If set to true, TLS will be used for secure communication.
- `tls-insecure` (boolean) insecure skip verify. Default to `false`.
> If set to true, skip verification of server certificate.
- `tls-min-version` (string) min tls version. Default to `1.2`.
> Specifies the minimum TLS version that the server will support.
- `ca-file` (string) provide CA file to verify the server certificate. Default to `(empty)`.
> Specifies the path to the CA (Certificate Authority) file used to verify the server's certificate.
- `cert-file` (string) provide client certificate file for mTLS. Default to `(empty)`.
> Specifies the path to the certificate file to be used. This is a required parameter if TLS support is enabled.
- `key-file` (string) provide client private key file for mTLS. Default to `(empty)`.
> Specifies the path to the key file corresponding to the certificate file. This is a required parameter if TLS support is enabled.
- `sasl-support` (boolean) enable SASL. Default to `false`.
> Enable or disable SASL (Simple Authentication and Security Layer) support for Kafka.
- `sasl-username` (string) SASL username. Default to `(empty)`.
> Specifies the SASL username for authentication with Kafka brokers.
- `sasl-password` (string) SASL password. Default to `(empty)`.
> Specifies the SASL password for authentication with Kafka brokers.
- `sasl-mechanism` (string) SASL mechanism: `PLAIN` or `SCRAM-SHA-512`. Default to `PLAIN`.
> Specifies the SASL mechanism to use for authentication with Kafka brokers.
- `mode` (string) output format: `text`, `json`, or `flat-json`. Default to `flat-json`.
> Specifies the output format for Kafka messages.
- `buffer-size` (integer) how many DNS messages will be buffered before being sent. Default to `100`.
> Specifies the size of the buffer for DNS messages before they are sent to Kafka.
- `topic` (integer) kafka topic to forward messages to. Default to `dnscollector`.
> Specifies the Kafka topic to which messages will be forwarded.
- `partition` (integer) kafka partition. Default to `0`.
> Specifies the Kafka partition to which messages will be sent.
- `chan-buffer-size` (int) incoming channel size, number of packet before to drop it. Default to `4096`.
> Specifies the maximum number of packets that can be buffered before dropping additional packets.
- `compression` (string) Compression for Kafka messages: `none`, `gzip`, `lz4`, `snappy`, `zstd`. Default to `none`.
> Specifies the compression algorithm to use for Kafka messages.
10 changes: 5 additions & 5 deletions loggers/kafkaproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (k *KafkaProducer) FlushBuffer(buf *[]dnsutils.DNSMessage) {
}

func (k *KafkaProducer) Run() {
k.LogInfo("running in background...")
k.LogInfo("waiting dnsmessage to process...")

// prepare next channels
defaultRoutes, defaultNames := k.RoutingHandler.GetDefaultRoutes()
Expand All @@ -276,7 +276,7 @@ func (k *KafkaProducer) Run() {
subprocessors := transformers.NewTransforms(&k.config.OutgoingTransformers, k.logger, k.name, listChannel, 0)

// goroutine to process transformed dns messages
go k.Process()
go k.ProcessDM()

// loop to process incoming messages
RUN_LOOP:
Expand Down Expand Up @@ -320,7 +320,9 @@ RUN_LOOP:
k.LogInfo("run terminated")
}

func (k *KafkaProducer) Process() {
func (k *KafkaProducer) ProcessDM() {
k.LogInfo("waiting transformed dnsmessage to process...")

ctx, cancelKafka := context.WithCancel(context.Background())
defer cancelKafka() // Libérez les ressources liées au contexte

Expand All @@ -336,8 +338,6 @@ func (k *KafkaProducer) Process() {

go k.ConnectToKafka(ctx, readyTimer)

k.LogInfo("ready to process")

PROCESS_LOOP:
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (c *ConfigLoggers) SetDefault() {
c.KafkaProducer.FlushInterval = 10
c.KafkaProducer.Topic = "dnscollector"
c.KafkaProducer.Partition = 0
c.KafkaProducer.ChannelBufferSize = 65535
c.KafkaProducer.ChannelBufferSize = 4096
c.KafkaProducer.Compression = CompressNone

c.FalcoClient.Enable = false
Expand Down
Loading