Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kfake sasl #424

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/kfake/17_sasl_handshake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kfake

import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(17, 1, 1) }

func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLHandshakeRequest)
resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

if creq.cc.saslStage != saslStageBegin {
resp.ErrorCode = kerr.IllegalSaslState.Code
return resp, nil
}

switch req.Mechanism {
case saslPlain:
creq.cc.saslStage = saslStageAuthPlain
case saslScram256:
creq.cc.saslStage = saslStageAuthScram0_256
case saslScram512:
creq.cc.saslStage = saslStageAuthScram0_512
default:
resp.ErrorCode = kerr.UnsupportedSaslMechanism.Code
resp.SupportedMechanisms = []string{saslPlain, saslScram256, saslScram512}
}
return resp, nil
}
83 changes: 83 additions & 0 deletions pkg/kfake/36_sasl_authenticate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package kfake

import (
"errors"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(36, 0, 2) }

func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) {
req := creq.kreq.(*kmsg.SASLAuthenticateRequest)
resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

switch creq.cc.saslStage {
default:
resp.ErrorCode = kerr.IllegalSaslState.Code
return resp, nil

case saslStageAuthPlain:
u, p, err := saslSplitPlain(req.SASLAuthBytes)
if err != nil {
return nil, err
}
if c.sasls.plain == nil {
return nil, errors.New("invalid sasl")
}
if p != c.sasls.plain[u] {
return nil, errors.New("invalid sasl")
}
creq.cc.saslStage = saslStageComplete

case saslStageAuthScram0_256:
c0, err := scramParseClient0(req.SASLAuthBytes)
if err != nil {
return nil, err
}
if c.sasls.scram256 == nil {
return nil, errors.New("invalid sasl")
}
a, ok := c.sasls.scram256[c0.user]
if !ok {
return nil, errors.New("invalid sasl")
}
s0, serverFirst := scramServerFirst(c0, a)
resp.SASLAuthBytes = serverFirst
creq.cc.saslStage = saslStageAuthScram1
creq.cc.s0 = &s0

case saslStageAuthScram0_512:
c0, err := scramParseClient0(req.SASLAuthBytes)
if err != nil {
return nil, err
}
if c.sasls.scram512 == nil {
return nil, errors.New("invalid sasl")
}
a, ok := c.sasls.scram512[c0.user]
if !ok {
return nil, errors.New("invalid sasl")
}
s0, serverFirst := scramServerFirst(c0, a)
resp.SASLAuthBytes = serverFirst
creq.cc.saslStage = saslStageAuthScram1
creq.cc.s0 = &s0

case saslStageAuthScram1:
serverFinal, err := creq.cc.s0.serverFinal(req.SASLAuthBytes)
if err != nil {
return nil, err
}
resp.SASLAuthBytes = serverFinal
creq.cc.saslStage = saslStageComplete
creq.cc.s0 = nil
}

return resp, nil
}
68 changes: 68 additions & 0 deletions pkg/kfake/50_describe_user_scram_credentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package kfake

import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(50, 0, 0) }

func (c *Cluster) handleDescribeUserSCRAMCredentials(kreq kmsg.Request) (kmsg.Response, error) {
var (
req = kreq.(*kmsg.DescribeUserSCRAMCredentialsRequest)
resp = req.ResponseKind().(*kmsg.DescribeUserSCRAMCredentialsResponse)
)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

describe := make(map[string]bool) // if false, user was duplicated
for _, u := range req.Users {
if _, ok := describe[u.Name]; ok {
describe[u.Name] = true
} else {
describe[u.Name] = false
}
}
if req.Users == nil { // null returns all
for u := range c.sasls.scram256 {
describe[u] = false
}
for u := range c.sasls.scram512 {
describe[u] = false
}
}

addr := func(u string) *kmsg.DescribeUserSCRAMCredentialsResponseResult {
sr := kmsg.NewDescribeUserSCRAMCredentialsResponseResult()
sr.User = u
resp.Results = append(resp.Results, sr)
return &resp.Results[len(resp.Results)-1]
}

for u, duplicated := range describe {
sr := addr(u)
if duplicated {
sr.ErrorCode = kerr.DuplicateResource.Code
continue
}
if a, ok := c.sasls.scram256[u]; ok {
ci := kmsg.NewDescribeUserSCRAMCredentialsResponseResultCredentialInfo()
ci.Mechanism = 1
ci.Iterations = int32(a.iterations)
sr.CredentialInfos = append(sr.CredentialInfos, ci)
}
if a, ok := c.sasls.scram512[u]; ok {
ci := kmsg.NewDescribeUserSCRAMCredentialsResponseResultCredentialInfo()
ci.Mechanism = 2
ci.Iterations = int32(a.iterations)
sr.CredentialInfos = append(sr.CredentialInfos, ci)
}
if len(sr.CredentialInfos) == 0 {
sr.ErrorCode = kerr.ResourceNotFound.Code
}
}

return resp, nil
}
124 changes: 124 additions & 0 deletions pkg/kfake/51_alter_user_scram_credentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package kfake

import (
"bytes"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(51, 0, 0) }

func (c *Cluster) handleAlterUserSCRAMCredentials(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
var (
req = kreq.(*kmsg.AlterUserSCRAMCredentialsRequest)
resp = req.ResponseKind().(*kmsg.AlterUserSCRAMCredentialsResponse)
)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

addr := func(u string) *kmsg.AlterUserSCRAMCredentialsResponseResult {
sr := kmsg.NewAlterUserSCRAMCredentialsResponseResult()
sr.User = u
resp.Results = append(resp.Results, sr)
return &resp.Results[len(resp.Results)-1]
}
doneu := func(u string, code int16) *kmsg.AlterUserSCRAMCredentialsResponseResult {
sr := addr(u)
sr.ErrorCode = code
return sr
}

users := make(map[string]int16)

// Validate everything up front, keeping track of all (and duplicate)
// users. If we are not controller, we fail with our users map.
for _, d := range req.Deletions {
if d.Name == "" {
users[d.Name] = kerr.UnacceptableCredential.Code
continue
}
if d.Mechanism != 1 && d.Mechanism != 2 {
users[d.Name] = kerr.UnsupportedSaslMechanism.Code
continue
}
users[d.Name] = 0
}
for _, u := range req.Upsertions {
if u.Name == "" || u.Iterations < 4096 || u.Iterations > 16384 { // Kafka min/max
users[u.Name] = kerr.UnacceptableCredential.Code
continue
}
if u.Mechanism != 1 && u.Mechanism != 2 {
users[u.Name] = kerr.UnsupportedSaslMechanism.Code
continue
}
if code, deleting := users[u.Name]; deleting && code == 0 {
users[u.Name] = kerr.DuplicateResource.Code
continue
}
users[u.Name] = 0
}

if b != c.controller {
for u := range users {
doneu(u, kerr.NotController.Code)
}
return resp, nil
}

// Add anything that failed validation.
for u, code := range users {
if code != 0 {
doneu(u, code)
}
}

// Process all deletions, adding ResourceNotFound as necessary.
for _, d := range req.Deletions {
if users[d.Name] != 0 {
continue
}
m := c.sasls.scram256
if d.Mechanism == 2 {
m = c.sasls.scram512
}
if m == nil {
doneu(d.Name, kerr.ResourceNotFound.Code)
continue
}
if _, ok := m[d.Name]; !ok {
doneu(d.Name, kerr.ResourceNotFound.Code)
continue
}
delete(m, d.Name)
doneu(d.Name, 0)
}

// Process all upsertions.
for _, u := range req.Upsertions {
if users[u.Name] != 0 {
continue
}
m := &c.sasls.scram256
mech := saslScram256
if u.Mechanism == 2 {
m = &c.sasls.scram512
mech = saslScram512
}
if *m == nil {
*m = make(map[string]scramAuth)
}
(*m)[u.Name] = scramAuth{
mechanism: mech,
iterations: int(u.Iterations),
saltedPass: bytes.Clone(u.SaltedPassword),
salt: bytes.Clone(u.Salt),
}
doneu(u.Name, 0)
}

return resp, nil
}
18 changes: 9 additions & 9 deletions pkg/kfake/NOTES
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ MISC
x OffsetForLeaderEpoch

SASL
* SaslHandshake
* SaslAuthenticate
* DescribeUserScramCredentials
* AlterUserScramCredentials
x SaslHandshake
x SaslAuthenticate
x DescribeUserScramCredentials
x AlterUserScramCredentials

TXNS
* AddPartitionsToTxn
* AddOffsetsToTxn
* EndTxn
* TxnOffsetCommit

ACLS
* DescribeACLs
* CreateACLs
* DeleteACLs

LOW-PRIO
* DeleteRecords
* DescribeConfigs
Expand All @@ -47,11 +52,6 @@ LOW-PRIO
* DescribeTransactions
* ListTransactions

ACLS
* DescribeACLs
* CreateACLs
* DeleteACLs

* AlterPartitionAssignments
* ListPartitionReassignments

Expand Down
3 changes: 3 additions & 0 deletions pkg/kfake/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type (
b *broker
conn net.Conn
respCh chan clientResp

saslStage saslStage
s0 *scramServer0
}

clientReq struct {
Expand Down
Loading