Skip to content

Commit

Permalink
Merge pull request #22 from zerodha/refactor
Browse files Browse the repository at this point in the history
Fully refactor the codebase. This is a breaking major version change.
  • Loading branch information
knadh authored Jun 21, 2024
2 parents 38d15df + 6156bde commit c0e47b7
Show file tree
Hide file tree
Showing 18 changed files with 2,133 additions and 2,064 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ go.work
config.toml
/dist
kaf-relay.bin
kaf-relay
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ builds:
archives:
- format: tar.gz
files:
- config.toml.sample
- config.sample.toml
- README.md
- LICENSE
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ BIN := kaf-relay
.PHONY: build
build: $(BIN)

$(BIN):
$(BIN): $(shell find . -type f -name "*.go") go.mod go.sum
CGO_ENABLED=0 go build -o ${BIN} --ldflags="-X 'main.buildString=${BUILDSTR}'"

.PHONY: clean
Expand Down
104 changes: 0 additions & 104 deletions config.go

This file was deleted.

135 changes: 135 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
[app]
log_level = "debug"
metrics_server_addr = ":7081"


# Map of topics from the source to sync to the target.
# source_topic => target_topic:optional_target_partition
# If the target partition is not specified, whatever partition a message
# was received from the source, the same partition is written to the target.
[[topics]]
source_topic1 = "target_topic1:1"
source_topic2 = "target_topic2"


[source_pool]
# Kafka client config common to all upstream sources ([[sources]]).
initial_offset = "start"
instance_id = "client_instance_id"
group_id = "consumer_group"

# Frequency at which source servers are polled for health/lag.
healthcheck_interval = "3s"

# Max difference in total offsets across all topics on a source
# against other sources, which when breached, the source is marked
# as unhealthy because of a lag.
offset_lag_threshold = 1000

# Maximum number of connect/fetch retries before exiting. -1 for infinite.
max_retries = -1

# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"

# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"


[[sources]]
name = "node1"
servers = ["127.0.0.1:9092"]
session_timeout = "6s"
enable_auth = true
sasl_mechanism = "PLAIN"
username = "user-x"
password = "pass-x"
max_wait_time = "10ms"
max_failovers = -1 # infinite

enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""

enable_log = false

[[sources]]
name = "node2"
servers = ["node2:9092"]
session_timeout = "6s"
enable_auth = true
sasl_mechanism = "PLAIN"
username = "user-x"
password = "pass-x"
max_wait_time = "10ms"
max_failovers = -1 # infinite

enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""

enable_log = false


# Destination kafka producer configuration
[target]
name = "node3"
servers = ["127.0.0.1:9095"]
enable_log = false
enable_auth = true
sasl_mechanism = "PLAIN" # PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
username = "user-y"
password = "pass-y"
enable_idempotency = true
commit_ack_type = "cluster"
flush_frequency = "20ms"
session_timeout = "6s"

enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""

# -1 for infinite.
max_retries = -1
flush_batch_size = 1000
batch_size = 1000
max_message_bytes = 10000000

# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"

# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"


# Custom go-plugin filter to load to filter messages when relaying
[filters.test]
enabled = false
path = "test.bin"
config = '''
{
"address": ["127.0.0.1:6379"],
"username": "",
"password": "",
"db": 10,
"max_active": 50,
"max_idle": 20,
"dial_timeout": 3000,
"read_timeout": 3000,
"write_timeout": 3000,
"idle_timeout": 30000
}
'''
120 changes: 0 additions & 120 deletions config.toml.sample

This file was deleted.

Loading

0 comments on commit c0e47b7

Please sign in to comment.