-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for SASL plain text authentication #648
Changes from 10 commits
3834ba1
369d017
d3b1e0e
5ac5287
0a075f2
2270962
6fbcb5f
a95632e
8b75447
9d69627
d0d6717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package sarama | |
|
||
import ( | ||
"crypto/tls" | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
"net" | ||
|
@@ -82,6 +83,22 @@ func (b *Broker) Open(conf *Config) error { | |
b.conn = newBufConn(b.conn) | ||
|
||
b.conf = conf | ||
|
||
if conf.Net.SASL.Enable { | ||
b.connErr = b.doSASLPlainAuth() | ||
if b.connErr != nil { | ||
err = b.conn.Close() | ||
if err == nil { | ||
Logger.Printf("Closed connection to broker %s\n", b.addr) | ||
} else { | ||
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) | ||
} | ||
b.conn = nil | ||
atomic.StoreInt32(&b.opened, 0) | ||
return | ||
} | ||
} | ||
|
||
b.done = make(chan bool) | ||
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1) | ||
|
||
|
@@ -454,3 +471,52 @@ func (b *Broker) responseReceiver() { | |
} | ||
close(b.done) | ||
} | ||
|
||
// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149) | ||
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9 | ||
// | ||
// In SASL Plain, Kafka expects the auth header to be in the following format | ||
// Message format (from https://tools.ietf.org/html/rfc4616): | ||
// | ||
// message = [authzid] UTF8NUL authcid UTF8NUL passwd | ||
// authcid = 1*SAFE ; MUST accept up to 255 octets | ||
// authzid = 1*SAFE ; MUST accept up to 255 octets | ||
// passwd = 1*SAFE ; MUST accept up to 255 octets | ||
// UTF8NUL = %x00 ; UTF-8 encoded NUL character | ||
// | ||
// SAFE = UTF1 / UTF2 / UTF3 / UTF4 | ||
// ;; any UTF-8 encoded Unicode character except NUL | ||
// | ||
// When credentials are valid, Kafka returns a 4 byte array of null characters. | ||
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way | ||
// of responding to bad credentials but thats how its being done today. | ||
func (b *Broker) doSASLPlainAuth() error { | ||
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) | ||
authBytes := make([]byte, length+4) //4 byte length header + auth data | ||
binary.BigEndian.PutUint32(authBytes, uint32(length)) | ||
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) | ||
|
||
err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) | ||
if err != nil { | ||
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error()) | ||
return err | ||
} | ||
|
||
_, err = b.conn.Write(authBytes) | ||
if err != nil { | ||
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) | ||
return err | ||
} | ||
|
||
header := make([]byte, 4) | ||
n, err := io.ReadFull(b.conn, header) | ||
// If the credentials are valid, we would get a 4 byte response filled with null characters. | ||
// Otherwise, the broker closes the connection and we get an EOF | ||
if err != nil { | ||
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) | ||
return err | ||
} | ||
|
||
Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) | ||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose this is too different from the normal API packets we send over the wire in order to reuse our main There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. This authentication preceeds the normal kafka message exchanges and follows a different format (no correlation IDs, etc.). Besides, cant use the send function, as it would deadlock |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,17 @@ type Config struct { | |
Config *tls.Config | ||
} | ||
|
||
// SASL based authentication with broker. While there are multiple SASL authentication methods | ||
// the current implementation is limited to plaintext (SASL/PLAIN) authentication | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we currently only support plain-text, should we enforce that SASL is only used over TLS to avoid leaking credentials on the wire? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comments. Will address your other comments and send out a new PR soon. On Thu, Apr 28, 2016 at 3:46 PM Evan Huus [email protected] wrote:
~shriram |
||
SASL struct { | ||
// Whether or not to use SASL authentication when connecting to the broker | ||
// (defaults to false). | ||
Enable bool | ||
//username and password for SASL/PLAIN authentication | ||
User string | ||
Password string | ||
} | ||
|
||
// KeepAlive specifies the keep-alive period for an active network connection. | ||
// If zero, keep-alives are disabled. (default is 0: disabled). | ||
KeepAlive time.Duration | ||
|
@@ -257,6 +268,14 @@ func (c *Config) Validate() error { | |
if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil { | ||
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.") | ||
} | ||
if c.Net.SASL.Enable == false { | ||
if c.Net.SASL.User != "" { | ||
Logger.Println("Net.SASL is disabled but a non-empty username was provided.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are good checks; I don't know a lot about SASL, but is it valid to have empty usernames and passwords? If not you should do the opposite check as well - if it's enabled but the user/pass is blank return a configuration error |
||
} | ||
if c.Net.SASL.Password != "" { | ||
Logger.Println("Net.SASL is disabled but a non-empty password was provided.") | ||
} | ||
} | ||
if c.Producer.RequiredAcks > 1 { | ||
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.") | ||
} | ||
|
@@ -294,6 +313,10 @@ func (c *Config) Validate() error { | |
return ConfigurationError("Net.WriteTimeout must be > 0") | ||
case c.Net.KeepAlive < 0: | ||
return ConfigurationError("Net.KeepAlive must be >= 0") | ||
case c.Net.SASL.Enable == true && c.Net.SASL.User == "": | ||
return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled") | ||
case c.Net.SASL.Enable == true && c.Net.SASL.Password == "": | ||
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled") | ||
} | ||
|
||
// validate the Metadata values | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as name, I prefer
sendAndReceiveSASLPlainAuth
which is more in line with naming we have elsewhereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed