Skip to content

Commit

Permalink
KIP-590: support new admin EnvelopeRequest & DefaultPrincipalData
Browse files Browse the repository at this point in the history
- generation: defines the new request & misc type
- kversion: adds a new request
- client: supports the new admin request for error parsing
- kerr: adds a new error

This required some minor code generation changes, but includes zero
functional changes.
  • Loading branch information
twmb committed Nov 10, 2020
1 parent 724a2c1 commit 56ab90c
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 131 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,6 @@ a protocol is supported by code generation.
- [KIP-570](https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest) (leader epoch in stop replica; 2.6.0)
- [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients) (exponential backoff; 2.6.0)
- [KIP-588](https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts) (producer recovery from txn timeout; 2.7.0)
- [KIP-590](https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller) (support for forwarding admin requests; 2.7.0)
- [KIP-595](https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum) (new APIs for raft protocol; 2.7.0)
- [KIP-599](https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations) (throttle create/delete topic/partition; 2.7.0)
19 changes: 19 additions & 0 deletions generate/definitions/58_envelope
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Introduced for KIP-590, EnvelopeRequest is what brokers use to wrap an
// incoming request before forwarding it to another broker.
EnvelopeRequest => key 58, max version 0, flexible v0+, admin
// The embedded request header and data.
RequestData: bytes
// Value of the initial client principal when the request is redirected by a broker.
RequestPrincipal: nullable-bytes
// The original client's address in bytes.
ClientHostAddress: bytes

EnvelopeResponse =>
// The embedded response header and data.
ResponseData: nullable-bytes
// The error code, or 0 if there was no error.
//
// NOT_CONTROLLER is returned when the request is not sent to the controller.
//
// CLUSTER_AUTHORIZATION_FAILED is returned if inter-broker authorization failed.
ErrorCode: int16
11 changes: 11 additions & 0 deletions generate/definitions/misc
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,14 @@ GroupMemberAssignment => not top level
Partitions: [int32]
// UserData is arbitrary client data for a given client in the group.
UserData: bytes

// DefaultPrincipalData is the encoded princpal data. This is used in an
// envelope request from broker to broker.
DefaultPrincipalData => not top level, with version field, flexible v0+
Version: int16
// The principal type.
Type: string
// The principal name.
Name: string
// Whether the principal was authenticated by a delegation token on the forwarding broker.
TokenAuthenticated: bool
2 changes: 2 additions & 0 deletions pkg/kerr/kerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ var (
InconsistentVoterSet = &Error{"INCONSISTENT_VOTER_SET", 94, false, "Indicates that either the sender or recipient of a voter-only request is not one of the expected voters."}
InvalidUpdateVersion = &Error{"INVALID_UPDATE_VERSION", 95, false, "The given update version was invalid."}
FeatureUpdateFailed = &Error{"FEATURE_UPDATE_FAILED", 96, false, "Unable to update finalized features due to an unexpected server error."}
PrincipalDeserializationFailure = &Error{"PRINCIPAL_DESERIALIZATION_FAILURE", 97, false, "Request principal deserialization failed during forwarding. This indicates an internal error on the broker cluster security setup."}
)

var code2err = map[int16]error{
Expand Down Expand Up @@ -241,4 +242,5 @@ var code2err = map[int16]error{
94: InconsistentVoterSet,
95: InvalidUpdateVersion,
96: FeatureUpdateFailed,
97: PrincipalDeserializationFailure,
}
2 changes: 2 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,8 @@ func (cl *Client) handleAdminReq(ctx context.Context, req kmsg.Request) Response
code = t.ErrorCode
case *kmsg.UpdateFeaturesResponse:
code = t.ErrorCode
case *kmsg.EnvelopeResponse:
code = t.ErrorCode
}
if err := kerr.ErrorForCode(code); err == kerr.NotController {
// There must be a last broker if we were able to issue
Expand Down
Loading

0 comments on commit 56ab90c

Please sign in to comment.