Skip to content

Commit

Permalink
support userscramcredentials apis (#1168)
Browse files Browse the repository at this point in the history
* userscramcredentials protocols

* alteruserscramcredentials working

* describeuserscramcredentials working

* gofmt -s -w alteruserscramcredentials_test.go

* fix typo

* add tests for deletion

* gofmt

* improve test

* separate alteruserscramcredentials_test and describeuserscramcredentials_test

* add protocol tests

* remove unused v1 constant

* change iterations from int32 to int

* keep errors with results
  • Loading branch information
petedannemann authored Jul 28, 2023
1 parent 861e102 commit 6193fa9
Show file tree
Hide file tree
Showing 9 changed files with 739 additions and 102 deletions.
107 changes: 107 additions & 0 deletions alteruserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alteruserscramcredentials"
)

// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to
// alter user scram credentials.
type AlterUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of credentials to delete.
Deletions []UserScramCredentialsDeletion

// List of credentials to upsert.
Upsertions []UserScramCredentialsUpsertion
}

type ScramMechanism int8

const (
ScramMechanismUnknown ScramMechanism = iota // 0
ScramMechanismSha256 // 1
ScramMechanismSha512 // 2
)

type UserScramCredentialsDeletion struct {
Name string
Mechanism ScramMechanism
}

type UserScramCredentialsUpsertion struct {
Name string
Mechanism ScramMechanism
Iterations int
Salt []byte
SaltedPassword []byte
}

// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user
// credentials request.
type AlterUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of altered user scram credentials.
Results []AlterUserScramCredentialsResponseUser
}

type AlterUserScramCredentialsResponseUser struct {
User string
Error error
}

// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions))
upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions))

for deletionIdx, deletion := range req.Deletions {
deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{
Name: deletion.Name,
Mechanism: int8(deletion.Mechanism),
}
}

for upsertionIdx, upsertion := range req.Upsertions {
upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{
Name: upsertion.Name,
Mechanism: int8(upsertion.Mechanism),
Iterations: int32(upsertion.Iterations),
Salt: upsertion.Salt,
SaltedPassword: upsertion.SaltedPassword,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{
Deletions: deletions,
Upsertions: upsertions,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err)
}

res := m.(*alteruserscramcredentials.Response)
responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results))

for responseIdx, responseResult := range res.Results {
responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{
User: responseResult.User,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &AlterUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: responseEntries,
}

return ret, nil
}
73 changes: 73 additions & 0 deletions alteruserscramcredentials_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestAlterUserScramCredentials(t *testing.T) {
// https://issues.apache.org/jira/browse/KAFKA-10259
if !ktesting.KafkaIsAtLeast("2.7.0") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

name := makeTopic()

createRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Upsertions: []UserScramCredentialsUpsertion{
{
Name: name,
Mechanism: ScramMechanismSha512,
Iterations: 15000,
Salt: []byte("my-salt"),
SaltedPassword: []byte("my-salted-password"),
},
},
})

if err != nil {
t.Fatal(err)
}

if len(createRes.Results) != 1 {
t.Fatalf("expected 1 createResult; got %d", len(createRes.Results))
}

if createRes.Results[0].User != name {
t.Fatalf("expected createResult with user: %s, got %s", name, createRes.Results[0].User)
}

if createRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in createResult, got %v", createRes.Results[0].Error)
}

deleteRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Deletions: []UserScramCredentialsDeletion{
{
Name: name,
Mechanism: ScramMechanismSha512,
},
},
})

if err != nil {
t.Fatal(err)
}

if len(deleteRes.Results) != 1 {
t.Fatalf("expected 1 deleteResult; got %d", len(deleteRes.Results))
}

if deleteRes.Results[0].User != name {
t.Fatalf("expected deleteResult with user: %s, got %s", name, deleteRes.Results[0].User)
}

if deleteRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in deleteResult, got %v", deleteRes.Results[0].Error)
}
}
97 changes: 97 additions & 0 deletions describeuserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/describeuserscramcredentials"
)

// DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to
// describe user scram credentials.
type DescribeUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of Scram users to describe
Users []UserScramCredentialsUser
}

type UserScramCredentialsUser struct {
Name string
}

// DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user
// credentials request.
type DescribeUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Top level error that occurred while attempting to describe
// the user scram credentials.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error

// List of described user scram credentials.
Results []DescribeUserScramCredentialsResponseResult
}

type DescribeUserScramCredentialsResponseResult struct {
User string
CredentialInfos []DescribeUserScramCredentialsCredentialInfo
Error error
}

type DescribeUserScramCredentialsCredentialInfo struct {
Mechanism ScramMechanism
Iterations int
}

// DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns
// the response.
func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
users := make([]describeuserscramcredentials.RequestUser, len(req.Users))

for userIdx, user := range req.Users {
users[userIdx] = describeuserscramcredentials.RequestUser{
Name: user.Name,
}
}

m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{
Users: users,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err)
}

res := m.(*describeuserscramcredentials.Response)
responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results))

for responseIdx, responseResult := range res.Results {
credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos))

for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos {
credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{
Mechanism: ScramMechanism(credentialInfo.Mechanism),
Iterations: int(credentialInfo.Iterations),
}
}
responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{
User: responseResult.User,
CredentialInfos: credentialInfos,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &DescribeUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Results: responseResults,
}

return ret, nil
}
Loading

0 comments on commit 6193fa9

Please sign in to comment.