diff --git a/modules/redpanda/examples_test.go b/modules/redpanda/examples_test.go index 822522255c..68cb314418 100644 --- a/modules/redpanda/examples_test.go +++ b/modules/redpanda/examples_test.go @@ -17,6 +17,8 @@ func ExampleRun() { redpanda.WithEnableSASL(), redpanda.WithEnableKafkaAuthorization(), redpanda.WithEnableWasmTransform(), + redpanda.WithBootstrapConfig("data_transforms_per_core_memory_reservation", 33554432), + redpanda.WithBootstrapConfig("data_transforms_per_function_memory_limit", 16777216), redpanda.WithNewServiceAccount("superuser-1", "test"), redpanda.WithNewServiceAccount("superuser-2", "test"), redpanda.WithNewServiceAccount("no-superuser", "test"), diff --git a/modules/redpanda/mounts/bootstrap.yaml.tpl b/modules/redpanda/mounts/bootstrap.yaml.tpl index 643236bbef..cf5bb6ea5f 100644 --- a/modules/redpanda/mounts/bootstrap.yaml.tpl +++ b/modules/redpanda/mounts/bootstrap.yaml.tpl @@ -22,3 +22,8 @@ data_transforms_enabled: true {{- if .AutoCreateTopics }} auto_create_topics_enabled: true {{- end }} + +{{- range $key, $value := .ExtraBootstrapConfig }} +{{ $key }}: {{ $value }} +{{- end }} + diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index f78766115b..ae15e33001 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -43,6 +43,10 @@ type options struct { // Listeners is a list of custom listeners that can be provided to access the // containers form within docker networks Listeners []listener + + // ExtraBootstrapConfig is a map of configs to be interpolated into the + // container's bootstrap.yml + ExtraBootstrapConfig map[string]any } func defaultOptions() options { @@ -55,6 +59,7 @@ func defaultOptions() options { AutoCreateTopics: false, EnableTLS: false, Listeners: make([]listener, 0), + ExtraBootstrapConfig: make(map[string]any, 0), } } @@ -155,3 +160,13 @@ func WithListener(lis string) Option { }) } } + +// WithBootstrapConfig adds an arbitrary config kvp to the Redpanda container. +// Per the name, this config will be interpolated into the generated bootstrap +// config file, which is particularly useful for configs requiring a restart +// when otherwise applied to a running Redpanda instance. +func WithBootstrapConfig(cfg string, val any) Option { + return func(o *options) { + o.ExtraBootstrapConfig[cfg] = val + } +} diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 608396ec29..7f46617536 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -281,6 +281,7 @@ func renderBootstrapConfig(settings options) ([]byte, error) { KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization, AutoCreateTopics: settings.AutoCreateTopics, EnableWasmTransform: settings.EnableWasmTransform, + ExtraBootstrapConfig: settings.ExtraBootstrapConfig, } tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl) @@ -355,6 +356,7 @@ type redpandaBootstrapConfigTplParams struct { KafkaAPIEnableAuthorization bool AutoCreateTopics bool EnableWasmTransform bool + ExtraBootstrapConfig map[string]any } type redpandaConfigTplParams struct { diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 948ea4c741..09bad2c0d0 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -2,6 +2,7 @@ package redpanda_test import ( "context" + "encoding/json" "fmt" "io" "net/http" @@ -573,3 +574,58 @@ func TestRedpandaListener_NoNetwork(t *testing.T) { require.Contains(t, err.Error(), "container must be attached to at least one network") } + +func TestRedpandaBootstrapConfig(t *testing.T) { + ctx := context.Background() + + container, err := redpanda.RunContainer(ctx, + redpanda.WithEnableWasmTransform(), + // These configs would require a restart if applied to a live Redpanda instance + redpanda.WithBootstrapConfig("data_transforms_per_core_memory_reservation", 33554432), + redpanda.WithBootstrapConfig("data_transforms_per_function_memory_limit", 16777216), + ) + require.NoError(t, err) + + httpCl := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + ForceAttemptHTTP2: true, + TLSHandshakeTimeout: 10 * time.Second, + }, + } + adminAPIUrl, err := container.AdminAPIAddress(ctx) + require.NoError(t, err) + + { + // Check that the configs reflect specified values + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster_config", adminAPIUrl), nil) + require.NoError(t, err) + resp, err := httpCl.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + var data map[string]any + err = json.NewDecoder(resp.Body).Decode(&data) + require.NoError(t, err) + reservation := int(data["data_transforms_per_core_memory_reservation"].(float64)) + require.Equal(t, 33554432, reservation) + pf_limit := int(data["data_transforms_per_function_memory_limit"].(float64)) + require.Equal(t, 16777216, pf_limit) + } + + { + // Check that no restart is required. i.e. that the configs were applied via bootstrap config + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster_config/status", adminAPIUrl), nil) + require.NoError(t, err) + resp, err := httpCl.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + var data []map[string]any + err = json.NewDecoder(resp.Body).Decode(&data) + require.NoError(t, err) + require.Len(t, data, 1) + needs_restart := data[0]["restart"].(bool) + require.False(t, needs_restart) + } +}