Skip to content

Commit

Permalink
extract configuration to buildNatsConfig method
Browse files Browse the repository at this point in the history
  • Loading branch information
adriandieter committed Dec 11, 2024
1 parent e133384 commit 87c3588
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type JetStreamController interface {

// ReadOnly returns true when no changes should be made by the controller.
ReadOnly() bool

// ValidNamespace ok if the controllers namespace restriction allows the given namespace.
ValidNamespace(string string) bool

Expand Down Expand Up @@ -62,16 +63,30 @@ func (c *jsController) ValidNamespace(targetNamespace string) bool {
}

func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error {

// Build single use client
// TODO(future feature): Use client-pool
serverUrl := strings.Join(opts.Servers, ",")
// TODO(future-feature): Use client-pool instead of single use client
cfg := c.buildNatsConfig(opts)

jsClient, closer, err := CreateJetStreamClient(cfg, true)
if err != nil {
return fmt.Errorf("create jetstream client: %w", err)
}
defer closer.Close()

return op(jsClient)
}

// buildNatsConfig uses given opts to override the base NatsConfig.
func (c *jsController) buildNatsConfig(opts *connectionOptions) *NatsConfig {

// Build nats config from opts and controller base config.
// Takes opts values if present.
serverUrls := strings.Join(opts.Servers, ",")

// Takes opts values if present
cfg := &NatsConfig{
CRDConnect: false,
ClientName: c.config.ClientName,
ServerURL: or(serverUrl, c.config.ServerURL),
ServerURL: or(serverUrls, c.config.ServerURL),
TLSFirst: c.config.TLSFirst, // TODO(review): should this value depend on any opts? There is no TLSFirst in the spec
}

Expand All @@ -95,13 +110,7 @@ func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js j
cfg.Key = c.config.Key
}

client, closer, err := CreateJetStreamClient(cfg, true)
if err != nil {
return fmt.Errorf("create jetstream client: %w", err)
}
defer closer.Close()

return op(client)
return cfg
}

// or returns the value if it is not the null value. Otherwise, the fallback value is returned
Expand All @@ -112,7 +121,7 @@ func or[T comparable](v T, fallback T) T {
return v
}

// updateReadyCondition returns the conditions with an added or updated ready condition
// updateReadyCondition returns the given conditions with an added or updated ready condition.
func updateReadyCondition(conditions []api.Condition, status v1.ConditionStatus, reason string, message string) []api.Condition {

var currentStatus v1.ConditionStatus
Expand Down

0 comments on commit 87c3588

Please sign in to comment.