diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 884fca0..53256e9 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -10,8 +10,9 @@ type Config struct { Brokers []string `yaml:"brokers"` ClientID string `yaml:"clientId"` - TLS TLSConfig `yaml:"tls"` - SASL SASLConfig `yaml:"sasl"` + TLS TLSConfig `yaml:"tls"` + SASL SASLConfig `yaml:"sasl"` + TopicReplicationFactor int16 `yaml:"topicReplicationFactor"` } // RegisterFlags for all sensitive Kafka SASL configs. @@ -36,6 +37,7 @@ func (c *Config) Validate() error { // SetDefaults for Kafka config func (c *Config) SetDefaults() { c.ClientID = "owl-shop" + c.TopicReplicationFactor = 3 c.SASL.SetDefaults() } diff --git a/pkg/shop/address_service.go b/pkg/shop/address_service.go index 6258a62..653127e 100644 --- a/pkg/shop/address_service.go +++ b/pkg/shop/address_service.go @@ -194,7 +194,7 @@ func (svc *AddressService) createKafkaTopic(ctx context.Context) error { { Topic: svc.topicName, NumPartitions: 6, - ReplicationFactor: 3, + ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor, Configs: []kmsg.CreateTopicsRequestTopicConfig{ {"cleanup.policy", &cleanupPolicy}, }, diff --git a/pkg/shop/customer_service.go b/pkg/shop/customer_service.go index f75f09e..1942511 100644 --- a/pkg/shop/customer_service.go +++ b/pkg/shop/customer_service.go @@ -216,7 +216,7 @@ func (svc *CustomerService) createKafkaTopic(ctx context.Context) error { { Topic: svc.topicName, NumPartitions: 6, - ReplicationFactor: 3, + ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor, Configs: []kmsg.CreateTopicsRequestTopicConfig{ {"cleanup.policy", &cleanupPolicy}, }, diff --git a/pkg/shop/frontend_service.go b/pkg/shop/frontend_service.go index c28c208..d4bd359 100644 --- a/pkg/shop/frontend_service.go +++ b/pkg/shop/frontend_service.go @@ -120,7 +120,7 @@ func (svc *FrontendService) createKafkaTopic(ctx context.Context) error { { Topic: svc.topicName, NumPartitions: 6, - ReplicationFactor: 3, + ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor, Configs: []kmsg.CreateTopicsRequestTopicConfig{ {"cleanup.policy", &cleanupPolicy}, {"retention.bytes", &retentionBytes}, diff --git a/pkg/shop/order_service.go b/pkg/shop/order_service.go index 1059aaa..f9c210f 100644 --- a/pkg/shop/order_service.go +++ b/pkg/shop/order_service.go @@ -197,7 +197,7 @@ func (svc *OrderService) createKafkaTopic(ctx context.Context) error { { Topic: svc.topicName, NumPartitions: 6, - ReplicationFactor: 3, + ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor, Configs: []kmsg.CreateTopicsRequestTopicConfig{ {"cleanup.policy", &cleanupPolicy}, },