Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SASL plain text authentication #648

Merged
merged 11 commits into from
May 5, 2016
62 changes: 62 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"crypto/tls"
"encoding/binary"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -82,6 +83,18 @@ func (b *Broker) Open(conf *Config) error {
b.conn = newBufConn(b.conn)

b.conf = conf

if conf.Net.SASL.Enable {
err = b.doSASLPlainAuth()
if err != nil {
Logger.Printf("SASL authentication with broker %s failed\n", b.addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you log every case in doSASLPlainAuth and both cases here, that seems a bit much?

_ = b.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go's mutexes are not re-entrant, so this will deadlock; to handle the error please use connErr and follow the pattern established a few lines up when we fail to connect at all; note you will still need to close b.conn since it has now been established

return
} else {
Logger.Printf("SASL authentication with broker %s succeeded\n", b.addr)
}
}

b.done = make(chan bool)
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

Expand Down Expand Up @@ -454,3 +467,52 @@ func (b *Broker) responseReceiver() {
}
close(b.done)
}

// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
//
// In SASL Plain, Kafka expects the auth header to be in the following format
// Message format (from https://tools.ietf.org/html/rfc4616):
//
// message = [authzid] UTF8NUL authcid UTF8NUL passwd
// authcid = 1*SAFE ; MUST accept up to 255 octets
// authzid = 1*SAFE ; MUST accept up to 255 octets
// passwd = 1*SAFE ; MUST accept up to 255 octets
// UTF8NUL = %x00 ; UTF-8 encoded NUL character
//
// SAFE = UTF1 / UTF2 / UTF3 / UTF4
// ;; any UTF-8 encoded Unicode character except NUL
//
// When credentials are valid, Kafka returns a 4 byte array of null characters.
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
// of responding to bad credentials but thats how its being done today.
func (b *Broker) doSASLPlainAuth() error {
Copy link
Contributor

@wvanbergen wvanbergen May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as name, I prefer sendAndReceiveSASLPlainAuth which is more in line with naming we have elsewhere

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))

err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
Logger.Printf("Failed to set write deadline when doing SASL auth: %s\n", err.Error())
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we really be returning nil here (and in the case we failed to write)? the connection will potentially continue unauthenticated in that case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry. There seems to be some mess up from my older version of code and the newer version. This was intended to return err .

}

_, err = b.conn.Write(authBytes)
if err != nil {
Logger.Printf("Failed to write SASL auth header to broker: %s\n", err.Error())
return nil
}

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Logger.Printf("Failed to read response while authenticating with SASL: %s\n", err.Error())
return err
}

Logger.Printf("SASL authentication successful:%v - %v\n", n, header)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is too different from the normal API packets we send over the wire in order to reuse our main sendAndReceive method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. This authentication preceeds the normal kafka message exchanges and follows a different format (no correlation IDs, etc.). Besides, cant use the send function, as it would deadlock

23 changes: 23 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type Config struct {
Config *tls.Config
}

// SASL based authentication with broker. While there are multiple SASL authentication methods
// the current implementation is limited to plaintext (SASL/PLAIN) authentication
Copy link
Contributor

@eapache eapache Apr 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we currently only support plain-text, should we enforce that SASL is only used over TLS to avoid leaking credentials on the wire?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments.
With regard to enforcing use of TLS (not https) for SASL, I thought I would
keep the auth separate from the encryption to give people a choice should
the opt to use non tls transports with password authentication (as a
framework, it seemed harsh to enforce policies/preferences on the user).

Will address your other comments and send out a new PR soon.

On Thu, Apr 28, 2016 at 3:46 PM Evan Huus [email protected] wrote:

In config.go
#648 (comment):

@@ -33,6 +33,17 @@ type Config struct {
Config *tls.Config
}

  •   // SASL based authentication with broker. While there are multiple SASL authentication methods
    
  •   // the current implementation is limited to plaintext (SASL/PLAIN) authentication
    

since we currently only support plain-text, should we enforce that SASL is
only used over HTTPS to avoid leaking credentials on the wire?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
https://github.com/Shopify/sarama/pull/648/files/3834ba1ce06295021c65e824e3aa9337caf5f03f#r61490796

~shriram

SASL struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool
//username and password for SASL/PLAIN authentication
User string
Password string
}

// KeepAlive specifies the keep-alive period for an active network connection.
// If zero, keep-alives are disabled. (default is 0: disabled).
KeepAlive time.Duration
Expand Down Expand Up @@ -257,6 +268,14 @@ func (c *Config) Validate() error {
if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil {
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
}
if c.Net.SASL.Enable == false {
if c.Net.SASL.User != "" {
Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are good checks; I don't know a lot about SASL, but is it valid to have empty usernames and passwords? If not you should do the opposite check as well - if it's enabled but the user/pass is blank return a configuration error

}
if c.Net.SASL.Password != "" {
Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
}
}
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
Expand Down Expand Up @@ -294,6 +313,10 @@ func (c *Config) Validate() error {
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.KeepAlive < 0:
return ConfigurationError("Net.KeepAlive must be >= 0")
case (c.Net.SASL.Enable == true) && (c.Net.SASL.User == ""):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think generally these parentheses are not needed?

return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
case (c.Net.SASL.Enable == true) && (c.Net.SASL.Password == ""):
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}

// validate the Metadata values
Expand Down