Skip to content

Commit

Permalink
kadm: add state config altering using the older AlterConfigs request
Browse files Browse the repository at this point in the history
Closes #352.
  • Loading branch information
twmb committed Feb 25, 2023
1 parent 3322f31 commit a9369be
Showing 1 changed file with 98 additions and 11 deletions.
109 changes: 98 additions & 11 deletions pkg/kadm/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ const (
SubtractConfig
)

// AlterConfig is an individual key/value operation to perform when
// incrementally altering configs.
// AlterConfig is an individual key/value operation to perform when altering
// configs.
//
// This package includes a StringPtr function to aid in building config values.
type AlterConfig struct {
Op IncrementalOp // Op is the incremental alter operation to perform.
Op IncrementalOp // Op is the incremental alter operation to perform. This is ignored for State alter functions.
Name string // Name is the name of the config to alter.
Value *string // Value is the value to use when altering, if any.
}
Expand Down Expand Up @@ -216,9 +216,8 @@ func (rs AlterConfigsResponses) On(name string, fn func(*AlterConfigsResponse) e
// broker reimplementations support this request even if they do not support
// all other requests from Kafka v2.3).
//
// This admin client does not support the original AlterConfigs request. The
// original request is problematic: any existing dynamic configurations that
// were not specified in the AlterConfigs request itself would be lost.
// If you want to alter the entire configs state using the older AlterConfigs
// request, use AlterTopicConfigsState.
//
// This may return *ShardErrors. You may consider checking
// ValidateAlterTopicConfigs before using this method.
Expand All @@ -227,7 +226,7 @@ func (cl *Client) AlterTopicConfigs(ctx context.Context, configs []AlterConfig,
}

// ValidateAlterTopicConfigs validates an incremental alter config for the given
// topics with the config alterations.
// topics.
//
// This returns exactly what AlterTopicConfigs returns, but does not actually
// alter configurations.
Expand All @@ -244,9 +243,8 @@ func (cl *Client) ValidateAlterTopicConfigs(ctx context.Context, configs []Alter
// broker reimplementations support this request even if they do not support
// all other requests from Kafka v2.3).
//
// This admin client does not support the original AlterConfigs request. The
// original request is problematic: any existing dynamic configurations that
// were not specified in the AlterConfigs request itself would be lost.
// If you want to alter the entire configs state using the older AlterConfigs
// request, use AlterBrokerConfigsState.
//
// This may return *ShardErrors. You may consider checking
// ValidateAlterBrokerConfigs before using this method.
Expand All @@ -262,7 +260,7 @@ func (cl *Client) AlterBrokerConfigs(ctx context.Context, configs []AlterConfig,
}

// ValidateAlterBrokerConfigs validates an incremental alter config for the given
// brokers with the config alterations.
// brokers.
//
// This returns exactly what AlterBrokerConfigs returns, but does not actually
// alter configurations.
Expand Down Expand Up @@ -323,3 +321,92 @@ func (cl *Client) alterConfigs(
return nil
})
}

// AlterTopicConfigsState alters the full state of topic configurations.
// All prior configuration is lost.
//
// This may return *ShardErrors. You may consider checking
// ValidateAlterTopicConfigs before using this method.
func (cl *Client) AlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeTopic, topics)
}

// ValidateAlterTopicConfigs validates an AlterTopicConfigsState for the given
// topics.
//
// This returns exactly what AlterTopicConfigsState returns, but does not
// actually alter configurations.
func (cl *Client) ValidateAlterTopicConfigsState(ctx context.Context, configs []AlterConfig, topics ...string) (AlterConfigsResponses, error) {
return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeTopic, topics)
}

// AlterBrokerConfigs alters the full state of broker configurations. If
// broker are specified, this updates each specific broker. If no brokers are
// specified, this updates whole-cluster broker configuration values.
// All prior configuration is lost.
//
// This may return *ShardErrors. You may consider checking
// ValidateAlterBrokerConfigs before using this method.
func (cl *Client) AlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
var names []string
if len(brokers) == 0 {
names = append(names, "")
}
for _, broker := range brokers {
names = append(names, strconv.Itoa(int(broker)))
}
return cl.alterConfigsState(ctx, false, configs, kmsg.ConfigResourceTypeBroker, names)
}

// ValidateAlterBrokerConfigs validates an AlterBrokerconfigsState for the
// given brokers.
//
// This returns exactly what AlterBrokerConfigs returns, but does not actually
// alter configurations.
func (cl *Client) ValidateAlterBrokerConfigsState(ctx context.Context, configs []AlterConfig, brokers ...int32) (AlterConfigsResponses, error) {
var names []string
if len(brokers) == 0 {
names = append(names, "")
}
for _, broker := range brokers {
names = append(names, strconv.Itoa(int(broker)))
}
return cl.alterConfigsState(ctx, true, configs, kmsg.ConfigResourceTypeBroker, names)
}

func (cl *Client) alterConfigsState(
ctx context.Context,
dry bool,
configs []AlterConfig,
kind kmsg.ConfigResourceType,
names []string,
) (AlterConfigsResponses, error) {
req := kmsg.NewPtrAlterConfigsRequest()
req.ValidateOnly = dry
for _, name := range names {
rr := kmsg.NewAlterConfigsRequestResource()
rr.ResourceType = kind
rr.ResourceName = name
for _, config := range configs {
rc := kmsg.NewAlterConfigsRequestResourceConfig()
rc.Name = config.Name
rc.Value = config.Value
rr.Configs = append(rr.Configs, rc)
}
req.Resources = append(req.Resources, rr)
}

shards := cl.cl.RequestSharded(ctx, req)

var rs []AlterConfigsResponse
return rs, shardErrEach(req, shards, func(kr kmsg.Response) error {
resp := kr.(*kmsg.AlterConfigsResponse)
for _, r := range resp.Resources {
rs = append(rs, AlterConfigsResponse{ // we are not storing in a map, no existence check possible
Name: r.ResourceName,
Err: kerr.ErrorForCode(r.ErrorCode),
})
}
return nil
})
}

0 comments on commit a9369be

Please sign in to comment.