diff --git a/pkg/kfake/17_sasl_handshake.go b/pkg/kfake/17_sasl_handshake.go new file mode 100644 index 00000000..34028bc8 --- /dev/null +++ b/pkg/kfake/17_sasl_handshake.go @@ -0,0 +1,35 @@ +package kfake + +import ( + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func init() { regKey(17, 1, 1) } + +func (c *Cluster) handleSASLHandshake(creq clientReq) (kmsg.Response, error) { + req := creq.kreq.(*kmsg.SASLHandshakeRequest) + resp := req.ResponseKind().(*kmsg.SASLHandshakeResponse) + + if err := checkReqVersion(req.Key(), req.Version); err != nil { + return nil, err + } + + if creq.cc.saslStage != saslStageBegin { + resp.ErrorCode = kerr.IllegalSaslState.Code + return resp, nil + } + + switch req.Mechanism { + case saslPlain: + creq.cc.saslStage = saslStageAuthPlain + case saslScram256: + creq.cc.saslStage = saslStageAuthScram0_256 + case saslScram512: + creq.cc.saslStage = saslStageAuthScram0_512 + default: + resp.ErrorCode = kerr.UnsupportedSaslMechanism.Code + resp.SupportedMechanisms = []string{saslPlain, saslScram256, saslScram512} + } + return resp, nil +} diff --git a/pkg/kfake/36_sasl_authenticate.go b/pkg/kfake/36_sasl_authenticate.go new file mode 100644 index 00000000..ec0e2a97 --- /dev/null +++ b/pkg/kfake/36_sasl_authenticate.go @@ -0,0 +1,83 @@ +package kfake + +import ( + "errors" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func init() { regKey(36, 0, 2) } + +func (c *Cluster) handleSASLAuthenticate(creq clientReq) (kmsg.Response, error) { + req := creq.kreq.(*kmsg.SASLAuthenticateRequest) + resp := req.ResponseKind().(*kmsg.SASLAuthenticateResponse) + + if err := checkReqVersion(req.Key(), req.Version); err != nil { + return nil, err + } + + switch creq.cc.saslStage { + default: + resp.ErrorCode = kerr.IllegalSaslState.Code + return resp, nil + + case saslStageAuthPlain: + u, p, err := saslSplitPlain(req.SASLAuthBytes) + if err != nil { + return nil, err + } + if c.sasls.plain == nil { + return nil, errors.New("invalid sasl") + } + if p != c.sasls.plain[u] { + return nil, errors.New("invalid sasl") + } + creq.cc.saslStage = saslStageComplete + + case saslStageAuthScram0_256: + c0, err := scramParseClient0(req.SASLAuthBytes) + if err != nil { + return nil, err + } + if c.sasls.scram256 == nil { + return nil, errors.New("invalid sasl") + } + a, ok := c.sasls.scram256[c0.user] + if !ok { + return nil, errors.New("invalid sasl") + } + s0, serverFirst := scramServerFirst(c0, a) + resp.SASLAuthBytes = serverFirst + creq.cc.saslStage = saslStageAuthScram1 + creq.cc.s0 = &s0 + + case saslStageAuthScram0_512: + c0, err := scramParseClient0(req.SASLAuthBytes) + if err != nil { + return nil, err + } + if c.sasls.scram512 == nil { + return nil, errors.New("invalid sasl") + } + a, ok := c.sasls.scram512[c0.user] + if !ok { + return nil, errors.New("invalid sasl") + } + s0, serverFirst := scramServerFirst(c0, a) + resp.SASLAuthBytes = serverFirst + creq.cc.saslStage = saslStageAuthScram1 + creq.cc.s0 = &s0 + + case saslStageAuthScram1: + serverFinal, err := creq.cc.s0.serverFinal(req.SASLAuthBytes) + if err != nil { + return nil, err + } + resp.SASLAuthBytes = serverFinal + creq.cc.saslStage = saslStageComplete + creq.cc.s0 = nil + } + + return resp, nil +} diff --git a/pkg/kfake/50_describe_user_scram_credentials.go b/pkg/kfake/50_describe_user_scram_credentials.go new file mode 100644 index 00000000..0cb107d2 --- /dev/null +++ b/pkg/kfake/50_describe_user_scram_credentials.go @@ -0,0 +1,68 @@ +package kfake + +import ( + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func init() { regKey(50, 0, 0) } + +func (c *Cluster) handleDescribeUserSCRAMCredentials(kreq kmsg.Request) (kmsg.Response, error) { + var ( + req = kreq.(*kmsg.DescribeUserSCRAMCredentialsRequest) + resp = req.ResponseKind().(*kmsg.DescribeUserSCRAMCredentialsResponse) + ) + + if err := checkReqVersion(req.Key(), req.Version); err != nil { + return nil, err + } + + describe := make(map[string]bool) // if false, user was duplicated + for _, u := range req.Users { + if _, ok := describe[u.Name]; ok { + describe[u.Name] = true + } else { + describe[u.Name] = false + } + } + if req.Users == nil { // null returns all + for u := range c.sasls.scram256 { + describe[u] = false + } + for u := range c.sasls.scram512 { + describe[u] = false + } + } + + addr := func(u string) *kmsg.DescribeUserSCRAMCredentialsResponseResult { + sr := kmsg.NewDescribeUserSCRAMCredentialsResponseResult() + sr.User = u + resp.Results = append(resp.Results, sr) + return &resp.Results[len(resp.Results)-1] + } + + for u, duplicated := range describe { + sr := addr(u) + if duplicated { + sr.ErrorCode = kerr.DuplicateResource.Code + continue + } + if a, ok := c.sasls.scram256[u]; ok { + ci := kmsg.NewDescribeUserSCRAMCredentialsResponseResultCredentialInfo() + ci.Mechanism = 1 + ci.Iterations = int32(a.iterations) + sr.CredentialInfos = append(sr.CredentialInfos, ci) + } + if a, ok := c.sasls.scram512[u]; ok { + ci := kmsg.NewDescribeUserSCRAMCredentialsResponseResultCredentialInfo() + ci.Mechanism = 2 + ci.Iterations = int32(a.iterations) + sr.CredentialInfos = append(sr.CredentialInfos, ci) + } + if len(sr.CredentialInfos) == 0 { + sr.ErrorCode = kerr.ResourceNotFound.Code + } + } + + return resp, nil +} diff --git a/pkg/kfake/51_alter_user_scram_credentials.go b/pkg/kfake/51_alter_user_scram_credentials.go new file mode 100644 index 00000000..7f853b56 --- /dev/null +++ b/pkg/kfake/51_alter_user_scram_credentials.go @@ -0,0 +1,124 @@ +package kfake + +import ( + "bytes" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func init() { regKey(51, 0, 0) } + +func (c *Cluster) handleAlterUserSCRAMCredentials(b *broker, kreq kmsg.Request) (kmsg.Response, error) { + var ( + req = kreq.(*kmsg.AlterUserSCRAMCredentialsRequest) + resp = req.ResponseKind().(*kmsg.AlterUserSCRAMCredentialsResponse) + ) + + if err := checkReqVersion(req.Key(), req.Version); err != nil { + return nil, err + } + + addr := func(u string) *kmsg.AlterUserSCRAMCredentialsResponseResult { + sr := kmsg.NewAlterUserSCRAMCredentialsResponseResult() + sr.User = u + resp.Results = append(resp.Results, sr) + return &resp.Results[len(resp.Results)-1] + } + doneu := func(u string, code int16) *kmsg.AlterUserSCRAMCredentialsResponseResult { + sr := addr(u) + sr.ErrorCode = code + return sr + } + + users := make(map[string]int16) + + // Validate everything up front, keeping track of all (and duplicate) + // users. If we are not controller, we fail with our users map. + for _, d := range req.Deletions { + if d.Name == "" { + users[d.Name] = kerr.UnacceptableCredential.Code + continue + } + if d.Mechanism != 1 && d.Mechanism != 2 { + users[d.Name] = kerr.UnsupportedSaslMechanism.Code + continue + } + users[d.Name] = 0 + } + for _, u := range req.Upsertions { + if u.Name == "" || u.Iterations < 4096 || u.Iterations > 16384 { // Kafka min/max + users[u.Name] = kerr.UnacceptableCredential.Code + continue + } + if u.Mechanism != 1 && u.Mechanism != 2 { + users[u.Name] = kerr.UnsupportedSaslMechanism.Code + continue + } + if code, deleting := users[u.Name]; deleting && code == 0 { + users[u.Name] = kerr.DuplicateResource.Code + continue + } + users[u.Name] = 0 + } + + if b != c.controller { + for u := range users { + doneu(u, kerr.NotController.Code) + } + return resp, nil + } + + // Add anything that failed validation. + for u, code := range users { + if code != 0 { + doneu(u, code) + } + } + + // Process all deletions, adding ResourceNotFound as necessary. + for _, d := range req.Deletions { + if users[d.Name] != 0 { + continue + } + m := c.sasls.scram256 + if d.Mechanism == 2 { + m = c.sasls.scram512 + } + if m == nil { + doneu(d.Name, kerr.ResourceNotFound.Code) + continue + } + if _, ok := m[d.Name]; !ok { + doneu(d.Name, kerr.ResourceNotFound.Code) + continue + } + delete(m, d.Name) + doneu(d.Name, 0) + } + + // Process all upsertions. + for _, u := range req.Upsertions { + if users[u.Name] != 0 { + continue + } + m := &c.sasls.scram256 + mech := saslScram256 + if u.Mechanism == 2 { + m = &c.sasls.scram512 + mech = saslScram512 + } + if *m == nil { + *m = make(map[string]scramAuth) + } + (*m)[u.Name] = scramAuth{ + mechanism: mech, + iterations: int(u.Iterations), + saltedPass: bytes.Clone(u.SaltedPassword), + salt: bytes.Clone(u.Salt), + } + doneu(u.Name, 0) + } + + return resp, nil +} diff --git a/pkg/kfake/NOTES b/pkg/kfake/NOTES index fda018b2..35c30e43 100644 --- a/pkg/kfake/NOTES +++ b/pkg/kfake/NOTES @@ -26,10 +26,10 @@ MISC x OffsetForLeaderEpoch SASL -* SaslHandshake -* SaslAuthenticate -* DescribeUserScramCredentials -* AlterUserScramCredentials +x SaslHandshake +x SaslAuthenticate +x DescribeUserScramCredentials +x AlterUserScramCredentials TXNS * AddPartitionsToTxn @@ -37,6 +37,11 @@ TXNS * EndTxn * TxnOffsetCommit +ACLS +* DescribeACLs +* CreateACLs +* DeleteACLs + LOW-PRIO * DeleteRecords * DescribeConfigs @@ -47,11 +52,6 @@ LOW-PRIO * DescribeTransactions * ListTransactions -ACLS -* DescribeACLs -* CreateACLs -* DeleteACLs - * AlterPartitionAssignments * ListPartitionReassignments diff --git a/pkg/kfake/client_conn.go b/pkg/kfake/client_conn.go index 46477d3d..2d6b89f1 100644 --- a/pkg/kfake/client_conn.go +++ b/pkg/kfake/client_conn.go @@ -16,6 +16,9 @@ type ( b *broker conn net.Conn respCh chan clientResp + + saslStage saslStage + s0 *scramServer0 } clientReq struct { diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 5e69d12a..ad93f935 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -15,14 +15,9 @@ import ( // TODO // -// * Handle requests concurrently, i.e. JoinGroup -// * Actually, just spin out concurrent group manager that then hooks back -// into the control loop -// // * Add raft and make the brokers independent // // * Support multiple replicas -- we just pass this through -// * Support per-partition leader epoch type ( @@ -44,6 +39,7 @@ type ( data data pids pids groups groups + sasls sasls die chan struct{} dead atomic.Bool @@ -110,6 +106,35 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) { } }() + for mu, p := range cfg.sasls { + switch mu.m { + case saslPlain: + if c.sasls.plain == nil { + c.sasls.plain = make(map[string]string) + } + c.sasls.plain[mu.u] = p + case saslScram256: + if c.sasls.scram256 == nil { + c.sasls.scram256 = make(map[string]scramAuth) + } + c.sasls.scram256[mu.u] = newScramAuth(saslScram256, p) + case saslScram512: + if c.sasls.scram512 == nil { + c.sasls.scram512 = make(map[string]scramAuth) + } + c.sasls.scram512[mu.u] = newScramAuth(saslScram512, p) + default: + return nil, fmt.Errorf("unknown SASL mechanism %v", mu.m) + } + } + cfg.sasls = nil + + if cfg.enableSASL && c.sasls.empty() { + c.sasls.scram256 = map[string]scramAuth{ + "admin": newScramAuth(saslScram256, "admin"), + } + } + for i := 0; i < cfg.nbrokers; i++ { var port int if len(cfg.ports) > 0 { @@ -206,6 +231,13 @@ func (c *Cluster) run() { goto afterControl } + if c.cfg.enableSASL { + if allow := c.handleSASL(creq); !allow { + err = errors.New("not allowed given SASL state") + goto afterControl + } + } + switch k := kmsg.Key(kreq.Key()); k { case kmsg.Produce: kresp, err = c.handleProduce(creq.cc.b, kreq) @@ -233,6 +265,8 @@ func (c *Cluster) run() { kresp, err = c.handleDescribeGroups(creq) case kmsg.ListGroups: kresp, err = c.handleListGroups(creq) + case kmsg.SASLHandshake: + kresp, err = c.handleSASLHandshake(creq) case kmsg.ApiVersions: kresp, err = c.handleApiVersions(kreq) case kmsg.CreateTopics: @@ -243,10 +277,16 @@ func (c *Cluster) run() { kresp, err = c.handleInitProducerID(kreq) case kmsg.OffsetForLeaderEpoch: kresp, err = c.handleOffsetForLeaderEpoch(creq.cc.b, kreq) + case kmsg.SASLAuthenticate: + kresp, err = c.handleSASLAuthenticate(creq) case kmsg.CreatePartitions: kresp, err = c.handleCreatePartitions(creq.cc.b, kreq) case kmsg.DeleteGroups: kresp, err = c.handleDeleteGroups(creq) + case kmsg.DescribeUserSCRAMCredentials: + kresp, err = c.handleDescribeUserSCRAMCredentials(kreq) + case kmsg.AlterUserSCRAMCredentials: + kresp, err = c.handleAlterUserSCRAMCredentials(creq.cc.b, kreq) default: err = fmt.Errorf("unahndled key %v", k) } diff --git a/pkg/kfake/config.go b/pkg/kfake/config.go index aae808b0..a2066e3c 100644 --- a/pkg/kfake/config.go +++ b/pkg/kfake/config.go @@ -21,6 +21,9 @@ type cfg struct { minSessionTimeout time.Duration maxSessionTimeout time.Duration + + enableSASL bool + sasls map[struct{ m, u string }]string // cleared after client initialization } // NumBrokers sets the number of brokers to start in the fake cluster. @@ -67,3 +70,19 @@ func GroupMinSessionTimeout(d time.Duration) Opt { func GroupMaxSessionTimeout(d time.Duration) Opt { return opt{func(cfg *cfg) { cfg.maxSessionTimeout = d }} } + +// EnableSASL enables SASL authentication for the cluster. If you do not +// configure a bootstrap user / pass, the default superuser is "admin" / +// "admin" with the SCRAM-SHA-256 SASL mechanisms. +func EnableSASL() Opt { + return opt{func(cfg *cfg) { cfg.enableSASL = true }} +} + +// Superuser seeds the cluster with a superuser. The method must be either +// PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. +// Note that PLAIN superusers cannot be deleted. +// SCRAM superusers can be modified with AlterUserScramCredentials. +// If you delete all SASL users, the kfake cluster will be unusable. +func Superuser(method, user, pass string) Opt { + return opt{func(cfg *cfg) { cfg.sasls[struct{ m, u string }{method, user}] = pass }} +} diff --git a/pkg/kfake/go.mod b/pkg/kfake/go.mod index 2b80a0a2..2e81ffaf 100644 --- a/pkg/kfake/go.mod +++ b/pkg/kfake/go.mod @@ -5,4 +5,5 @@ go 1.20 require ( github.com/twmb/franz-go v1.13.0 github.com/twmb/franz-go/pkg/kmsg v1.4.0 + golang.org/x/crypto v0.7.0 ) diff --git a/pkg/kfake/go.sum b/pkg/kfake/go.sum index d3852dcb..c4727521 100644 --- a/pkg/kfake/go.sum +++ b/pkg/kfake/go.sum @@ -2,3 +2,5 @@ github.com/twmb/franz-go v1.13.0 h1:J4VyTXVlOhiCDCXS56ut2ZRAylaimPXnIqtCq9Wlfbw= github.com/twmb/franz-go v1.13.0/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= diff --git a/pkg/kfake/sasl.go b/pkg/kfake/sasl.go new file mode 100644 index 00000000..9140157b --- /dev/null +++ b/pkg/kfake/sasl.go @@ -0,0 +1,296 @@ +package kfake + +import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + "crypto/sha512" + "encoding/base64" + "errors" + "fmt" + "regexp" + "strings" + + "github.com/twmb/franz-go/pkg/kmsg" + "golang.org/x/crypto/pbkdf2" +) + +// TODO server-error-value in serverFinal + +const ( + saslPlain = "PLAIN" + saslScram256 = "SCRAM-SHA-256" + saslScram512 = "SCRAM-SHA-512" + scramIterations = 4096 +) + +type ( + sasls struct { + plain map[string]string // user => pass + scram256 map[string]scramAuth // user => scram auth + scram512 map[string]scramAuth // user => scram auth + } + + saslStage uint8 +) + +func (s sasls) empty() bool { + return len(s.plain) == 0 && len(s.scram256) == 0 && len(s.scram512) == 0 +} + +const ( + saslStageBegin saslStage = iota + saslStageAuthPlain + saslStageAuthScram0_256 + saslStageAuthScram0_512 + saslStageAuthScram1 + saslStageComplete +) + +func (c *Cluster) handleSASL(creq clientReq) (allow bool) { + switch creq.cc.saslStage { + case saslStageBegin: + switch creq.kreq.(type) { + case *kmsg.ApiVersionsRequest, + *kmsg.SASLHandshakeRequest: + return true + default: + return false + } + case saslStageAuthPlain, + saslStageAuthScram0_256, + saslStageAuthScram0_512, + saslStageAuthScram1: + switch creq.kreq.(type) { + case *kmsg.ApiVersionsRequest, + *kmsg.SASLAuthenticateRequest: + return true + default: + return false + } + case saslStageComplete: + return true + default: + panic("unreachable") + } +} + +/////////// +// PLAIN // +/////////// + +func saslSplitPlain(auth []byte) (user, pass string, err error) { + parts := strings.SplitN(string(auth), "\x00", 3) + if len(parts) != 3 { + return "", "", errors.New("invalid plain auth") + } + if len(parts[0]) != 0 && parts[0] != parts[1] { + return "", "", errors.New("authzid is not equal to username") // see below + } + return parts[1], parts[2], nil +} + +/////////// +// SCRAM // +/////////// + +func newScramAuth(mechanism, pass string) scramAuth { + var saltedPass []byte + salt := randBytes(10) + switch mechanism { + case saslScram256: + saltedPass = pbkdf2.Key([]byte(pass), salt, scramIterations, sha256.Size, sha256.New) + case saslScram512: + saltedPass = pbkdf2.Key([]byte(pass), salt, scramIterations, sha512.Size, sha512.New) + default: + panic("unreachable") + } + return scramAuth{ + mechanism: mechanism, + iterations: scramIterations, + saltedPass: saltedPass, + salt: salt, + } +} + +type scramAuth struct { + mechanism string // scram 256 or 512 + iterations int + saltedPass []byte + salt []byte +} + +// client-first-message +type scramClient0 struct { + user string + bare []byte // client-first-message-bare + nonce []byte // nonce in client0 +} + +var scramUnescaper = strings.NewReplacer("=3D", "=", "=2C", ",") + +func scramParseClient0(client0 []byte) (scramClient0, error) { + m := reClient0.FindSubmatch(client0) + if len(m) == 0 { + return scramClient0{}, errors.New("invalid client0") + } + var ( + zid = string(m[1]) + bare = bytes.Clone(m[2]) + user = string(m[3]) + nonce = bytes.Clone(m[4]) + ext = string(m[5]) + ) + if len(ext) != 0 { + return scramClient0{}, errors.New("invalid extensions") + } + if zid != "" && zid != user { + return scramClient0{}, errors.New("authzid is not equal to username") // Kafka & Redpanda enforce that a present zid == username + } + return scramClient0{ + user: scramUnescaper.Replace(user), + bare: bare, + nonce: nonce, + }, nil +} + +func scramServerFirst(client0 scramClient0, auth scramAuth) (scramServer0, []byte) { + nonce := append(client0.nonce, base64.RawStdEncoding.EncodeToString(randBytes(16))...) + serverFirst := []byte(fmt.Sprintf("r=%s,s=%s,i=%d", + nonce, + base64.StdEncoding.EncodeToString(auth.salt), + scramIterations, + )) + return scramServer0{ + a: auth, + c0bare: client0.bare, + s0: serverFirst, + }, serverFirst +} + +// server-first-message +type scramServer0 struct { + a scramAuth + c0bare []byte + s0 []byte +} + +// validates client-final-message and replies with server-final-message +func (s *scramServer0) serverFinal(clientFinal []byte) ([]byte, error) { + m := reClientFinal.FindSubmatch(clientFinal) + if len(m) == 0 { + return nil, errors.New("invalid client-final-message") + } + var ( + finalWithoutProof = m[1] + channel = m[2] + clientProof64 = m[3] + h = sha256.New + ) + if s.a.mechanism == saslScram512 { + h = sha512.New + } + if !bytes.Equal(channel, []byte("biws")) { // "biws" == base64("n,,") + return nil, errors.New("invalid channel binding") + } + clientProof, err := base64.StdEncoding.DecodeString(string(clientProof64)) + if err != nil { + return nil, errors.New("client proof is not std-base64") + } + if len(clientProof) != h().Size() { + return nil, fmt.Errorf("len(client proof) %d != expected %d", len(clientProof), h().Size()) + } + + var clientKey []byte // := HMAC(SaltedPass, "Client Key") + { + mac := hmac.New(h, s.a.saltedPass) + mac.Write([]byte("Client Key")) + clientKey = mac.Sum(nil) + } + + var storedKey []byte // := H(ClientKey) + { + h := h() + h.Write(clientKey) + storedKey = h.Sum(nil) + } + + var authMessage []byte // := client-first-bare-message + "," + server-first-message + "," + client-final-message-without-proof + { + authMessage = append(s.c0bare, ',') + authMessage = append(authMessage, s.s0...) + authMessage = append(authMessage, ',') + authMessage = append(authMessage, finalWithoutProof...) + } + + var clientSignature []byte // := HMAC(StoredKey, AuthMessage) + { + mac := hmac.New(h, storedKey) + mac.Write(authMessage) + clientSignature = mac.Sum(nil) + } + + usedKey := clientProof // := ClientKey XOR ClientSignature + { + for i, b := range clientSignature { + usedKey[i] ^= b + } + h := h() + h.Write(usedKey) + usedKey = h.Sum(nil) + } + if !bytes.Equal(usedKey, storedKey) { + return nil, errors.New("invalid password") + } + + var serverKey []byte // := HMAC(SaltedPass, "Server Key") + { + mac := hmac.New(h, s.a.saltedPass) + mac.Write([]byte("Server Key")) + serverKey = mac.Sum(nil) + } + var serverSignature []byte // := HMAC(ServerKey, AuthMessage) + { + mac := hmac.New(h, serverKey) + mac.Write(authMessage) + serverSignature = mac.Sum(nil) + } + + serverFinal := []byte(fmt.Sprintf("v=%s", base64.StdEncoding.EncodeToString(serverSignature))) + return serverFinal, nil +} + +var reClient0, reClientFinal *regexp.Regexp + +func init() { + // https://datatracker.ietf.org/doc/html/rfc5802#section-7 + const ( + valueSafe = "[\x01-\x2b\x2d-\x3c\x3e-\x7f]+" // all except \0 - , + value = "[\x01-\x2b\x2d-\x7f]+" // all except \0 , + printable = "[\x21-\x2b\x2d-\x7e]+" // all except , (and DEL, unnoted) + saslName = "(?:[\x01-\x2b\x2d-\x3c\x3e-\x7f]|=2C|=3D)+" // valueSafe | others; kafka is lazy here + b64 = `[a-zA-Z0-9/+]+={0,3}` // we are lazy here matching up to 3 = + ext = "(?:,[a-zA-Z]+=[\x01-\x2b\x2d-\x7f]+)*" + ) + + // 0: entire match + // 1: authzid + // 2: client-first-message-bare + // 3: username + // 4: nonce + // 5: ext + client0 := fmt.Sprintf("^n,(?:a=(%s))?,((?:m=%s,)?n=(%s),r=(%s)(%s))$", saslName, value, saslName, printable, ext) + + // We reject extensions in client0. Kafka does not validate the nonce + // and some clients may generate it incorrectly (i.e. old franz-go), so + // we do not validate it. + // + // 0: entire match + // 1: channel-final-message-without-proof + // 2: channel binding + // 3: proof + clientFinal := fmt.Sprintf("^(c=(%s),r=%s),p=(%s)$", b64, printable, b64) + + reClient0 = regexp.MustCompile(client0) + reClientFinal = regexp.MustCompile(clientFinal) +}