From 3834ba1ce06295021c65e824e3aa9337caf5f03f Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Thu, 28 Apr 2016 15:27:39 -0400 Subject: [PATCH 1/9] Add support for SASL plain text authentication --- broker.go | 34 ++++++++++++++++++++++++++++++++++ config.go | 20 ++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/broker.go b/broker.go index c4596f909..15e2def54 100644 --- a/broker.go +++ b/broker.go @@ -9,6 +9,8 @@ import ( "sync" "sync/atomic" "time" + "bytes" + "encoding/binary" ) // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. @@ -82,6 +84,38 @@ func (b *Broker) Open(conf *Config) error { b.conn = newBufConn(b.conn) b.conf = conf + + if conf.Net.SASL.Enable { + // + // Begin SASL/PLAIN authentication + // + authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) + buf := new(bytes.Buffer) + + err = binary.Write(buf, binary.BigEndian, int32(len(authBytes))) + if err != nil { + Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error()) + } + + err = binary.Write(buf, binary.BigEndian, authBytes) + if err != nil { + Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error()) + } + + b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) + b.conn.Write(buf.Bytes()) + + header := make([]byte, 4) + n, err := io.ReadFull(b.conn, header) + if err != nil { + Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error()) + } + Logger.Printf("SASL authentication successful:\n%v\n%v\n%v", n, header, string(header)) + // + // End SASL/PLAIN authentication + // + } + b.done = make(chan bool) b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1) diff --git a/config.go b/config.go index 188b4b2d7..c56304a36 100644 --- a/config.go +++ b/config.go @@ -33,6 +33,17 @@ type Config struct { Config *tls.Config } + // SASL based authentication with broker. While there are multiple SASL authentication methods + // the current implementation is limited to plaintext (SASL/PLAIN) authentication + SASL struct { + // Whether or not to use SASL authentication when connecting to the broker + // (defaults to false). + Enable bool + //username and password for SASL/PLAIN authentication + User string + Password string + } + // KeepAlive specifies the keep-alive period for an active network connection. // If zero, keep-alives are disabled. (default is 0: disabled). KeepAlive time.Duration @@ -222,6 +233,7 @@ func NewConfig() *Config { c.Net.DialTimeout = 30 * time.Second c.Net.ReadTimeout = 30 * time.Second c.Net.WriteTimeout = 30 * time.Second + c.Net.SASL.Enable = false c.Metadata.Retry.Max = 3 c.Metadata.Retry.Backoff = 250 * time.Millisecond @@ -256,6 +268,14 @@ func (c *Config) Validate() error { if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil { Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.") } + if c.Net.SASL.Enable == false { + if c.Net.SASL.User != "" { + Logger.Println("Net.SASL is disabled but a non-empty username was provided.") + } + if c.Net.SASL.Password != "" { + Logger.Println("Net.SASL is disabled but a non-empty password was provided.") + } + } if c.Producer.RequiredAcks > 1 { Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.") } From d3b1e0ebfdc2f0383f453765ff6e0421c3ebd2cf Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 08:15:18 -0400 Subject: [PATCH 2/9] throw configuration error when SASL is set but username or password is empty --- config.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index f90e7d21b..2169d2a8f 100644 --- a/config.go +++ b/config.go @@ -234,7 +234,6 @@ func NewConfig() *Config { c.Net.DialTimeout = 30 * time.Second c.Net.ReadTimeout = 30 * time.Second c.Net.WriteTimeout = 30 * time.Second - c.Net.SASL.Enable = false c.Metadata.Retry.Max = 3 c.Metadata.Retry.Backoff = 250 * time.Millisecond @@ -314,6 +313,10 @@ func (c *Config) Validate() error { return ConfigurationError("Net.WriteTimeout must be > 0") case c.Net.KeepAlive < 0: return ConfigurationError("Net.KeepAlive must be >= 0") + case (c.Net.SASL.Enable == true) && (c.Net.SASL.User == ""): + return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled") + case (c.Net.SASL.Enable == true) && (c.Net.SASL.Password == ""): + return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled") } // validate the Metadata values From 5ac5287ba1a32c08baae541e62177a3ba6853ca3 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 09:50:04 -0400 Subject: [PATCH 3/9] refactor SASL PLAIN auth into its own function --- broker.go | 82 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/broker.go b/broker.go index 15e2def54..07eda5e59 100644 --- a/broker.go +++ b/broker.go @@ -86,34 +86,14 @@ func (b *Broker) Open(conf *Config) error { b.conf = conf if conf.Net.SASL.Enable { - // - // Begin SASL/PLAIN authentication - // - authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) - buf := new(bytes.Buffer) - - err = binary.Write(buf, binary.BigEndian, int32(len(authBytes))) - if err != nil { - Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error()) - } - - err = binary.Write(buf, binary.BigEndian, authBytes) - if err != nil { - Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error()) - } - - b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) - b.conn.Write(buf.Bytes()) - - header := make([]byte, 4) - n, err := io.ReadFull(b.conn, header) + err = b.doSASLPlainAuth() if err != nil { - Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error()) + Logger.Printf("SASL authentication with broker %s failed\n", b.addr) + _ = b.Close() + return + } else { + Logger.Printf("SASL authentication with broker %s succeeded\n", b.addr) } - Logger.Printf("SASL authentication successful:\n%v\n%v\n%v", n, header, string(header)) - // - // End SASL/PLAIN authentication - // } b.done = make(chan bool) @@ -488,3 +468,53 @@ func (b *Broker) responseReceiver() { } close(b.done) } + +// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149) +// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9 +// +// In SASL Plain, Kafka expects the auth header to be in the following format +// Message format (from https://tools.ietf.org/html/rfc4616): +// +// message = [authzid] UTF8NUL authcid UTF8NUL passwd +// authcid = 1*SAFE ; MUST accept up to 255 octets +// authzid = 1*SAFE ; MUST accept up to 255 octets +// passwd = 1*SAFE ; MUST accept up to 255 octets +// UTF8NUL = %x00 ; UTF-8 encoded NUL character +// +// SAFE = UTF1 / UTF2 / UTF3 / UTF4 +// ;; any UTF-8 encoded Unicode character except NUL +// +// When credentials are valid, Kafka returns a 4 byte array of null characters. +// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way +// of responding to bad credentials but thats how its being done today. +func (b *Broker) doSASLPlainAuth() error { + authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) + buf := new(bytes.Buffer) + + err := binary.Write(buf, binary.BigEndian, int32(len(authBytes))) + if err != nil { + Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error()) + return err + } + + err = binary.Write(buf, binary.BigEndian, authBytes) + if err != nil { + Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error()) + return err + } + + b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) + b.conn.Write(buf.Bytes()) + + header := make([]byte, 4) + n, err := io.ReadFull(b.conn, header) + // If the credentials are valid, we would get a 4 byte response filled with null characters. + // Otherwise, the broker closes the connection and we get an EOF + if err != nil { + Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error()) + return err + } + + Logger.Printf("SASL authentication successful:%v - %v", n, header) + return nil +} From 0a075f2fed8115dae34d202d97cc29550cc7fa41 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 09:50:48 -0400 Subject: [PATCH 4/9] go fmt --- broker.go | 6 +++--- config.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/broker.go b/broker.go index 07eda5e59..b25ce7501 100644 --- a/broker.go +++ b/broker.go @@ -1,7 +1,9 @@ package sarama import ( + "bytes" "crypto/tls" + "encoding/binary" "fmt" "io" "net" @@ -9,8 +11,6 @@ import ( "sync" "sync/atomic" "time" - "bytes" - "encoding/binary" ) // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. @@ -486,7 +486,7 @@ func (b *Broker) responseReceiver() { // // When credentials are valid, Kafka returns a 4 byte array of null characters. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way -// of responding to bad credentials but thats how its being done today. +// of responding to bad credentials but thats how its being done today. func (b *Broker) doSASLPlainAuth() error { authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) buf := new(bytes.Buffer) diff --git a/config.go b/config.go index 2169d2a8f..46602b72d 100644 --- a/config.go +++ b/config.go @@ -40,7 +40,7 @@ type Config struct { // (defaults to false). Enable bool //username and password for SASL/PLAIN authentication - User string + User string Password string } From 2270962248a5d10bc464625bb3c19b847e66f212 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 16:10:02 -0400 Subject: [PATCH 5/9] remove use of dynamic buffer --- broker.go | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/broker.go b/broker.go index b25ce7501..a72d4f3f5 100644 --- a/broker.go +++ b/broker.go @@ -1,7 +1,6 @@ package sarama import ( - "bytes" "crypto/tls" "encoding/binary" "fmt" @@ -488,23 +487,13 @@ func (b *Broker) responseReceiver() { // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way // of responding to bad credentials but thats how its being done today. func (b *Broker) doSASLPlainAuth() error { - authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password) - buf := new(bytes.Buffer) - - err := binary.Write(buf, binary.BigEndian, int32(len(authBytes))) - if err != nil { - Logger.Printf("Failed to encode payload size (SASL credentials): %s", err.Error()) - return err - } - - err = binary.Write(buf, binary.BigEndian, authBytes) - if err != nil { - Logger.Printf("Failed to encode payload (SASL credentials): %s", err.Error()) - return err - } + length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) + authBytes := make([]byte, length + 4) //4 byte length header + auth data + binary.BigEndian.PutUint32(authBytes, uint32(length)) + copy(authBytes[4:], []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)) b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) - b.conn.Write(buf.Bytes()) + b.conn.Write(authBytes) header := make([]byte, 4) n, err := io.ReadFull(b.conn, header) From 6fbcb5f07e963954c9a34e6a52e2ca5dc3da6b88 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 16:10:32 -0400 Subject: [PATCH 6/9] go fmt --- broker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index a72d4f3f5..ae88bced9 100644 --- a/broker.go +++ b/broker.go @@ -488,9 +488,9 @@ func (b *Broker) responseReceiver() { // of responding to bad credentials but thats how its being done today. func (b *Broker) doSASLPlainAuth() error { length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) - authBytes := make([]byte, length + 4) //4 byte length header + auth data + authBytes := make([]byte, length+4) //4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length)) - copy(authBytes[4:], []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)) + copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) b.conn.Write(authBytes) From 8b754479b56de82381915d15b187bf7b7a364599 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Mon, 2 May 2016 16:41:58 -0400 Subject: [PATCH 7/9] error checks when doing SASL auth --- broker.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/broker.go b/broker.go index ae88bced9..7a2238636 100644 --- a/broker.go +++ b/broker.go @@ -492,18 +492,27 @@ func (b *Broker) doSASLPlainAuth() error { binary.BigEndian.PutUint32(authBytes, uint32(length)) copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) - b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) - b.conn.Write(authBytes) + err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) + if err != nil { + Logger.Printf("Failed to set write deadline when doing SASL auth: %s\n", err.Error()) + return nil + } + + _, err = b.conn.Write(authBytes) + if err != nil { + Logger.Printf("Failed to write SASL auth header to broker: %s\n", err.Error()) + return nil + } header := make([]byte, 4) n, err := io.ReadFull(b.conn, header) // If the credentials are valid, we would get a 4 byte response filled with null characters. // Otherwise, the broker closes the connection and we get an EOF if err != nil { - Logger.Printf("Failed to read response while authenticating with SASL: %s", err.Error()) + Logger.Printf("Failed to read response while authenticating with SASL: %s\n", err.Error()) return err } - Logger.Printf("SASL authentication successful:%v - %v", n, header) + Logger.Printf("SASL authentication successful:%v - %v\n", n, header) return nil } From 9d69627aa9448a18395a02bf6a298dcb1b9a874f Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Tue, 3 May 2016 13:20:58 -0400 Subject: [PATCH 8/9] bug fixes --- broker.go | 28 ++++++++++++++++------------ config.go | 4 ++-- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/broker.go b/broker.go index 7a2238636..5138a6957 100644 --- a/broker.go +++ b/broker.go @@ -85,13 +85,17 @@ func (b *Broker) Open(conf *Config) error { b.conf = conf if conf.Net.SASL.Enable { - err = b.doSASLPlainAuth() - if err != nil { - Logger.Printf("SASL authentication with broker %s failed\n", b.addr) - _ = b.Close() + b.connErr = b.doSASLPlainAuth() + if b.connErr != nil { + err = b.conn.Close() + if err == nil { + Logger.Printf("Closed connection to broker %s\n", b.addr) + } else { + Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) + } + b.conn = nil + atomic.StoreInt32(&b.opened, 0) return - } else { - Logger.Printf("SASL authentication with broker %s succeeded\n", b.addr) } } @@ -494,14 +498,14 @@ func (b *Broker) doSASLPlainAuth() error { err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) if err != nil { - Logger.Printf("Failed to set write deadline when doing SASL auth: %s\n", err.Error()) - return nil + Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error()) + return err } _, err = b.conn.Write(authBytes) if err != nil { - Logger.Printf("Failed to write SASL auth header to broker: %s\n", err.Error()) - return nil + Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) + return err } header := make([]byte, 4) @@ -509,10 +513,10 @@ func (b *Broker) doSASLPlainAuth() error { // If the credentials are valid, we would get a 4 byte response filled with null characters. // Otherwise, the broker closes the connection and we get an EOF if err != nil { - Logger.Printf("Failed to read response while authenticating with SASL: %s\n", err.Error()) + Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) return err } - Logger.Printf("SASL authentication successful:%v - %v\n", n, header) + Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) return nil } diff --git a/config.go b/config.go index 46602b72d..d9cf469fe 100644 --- a/config.go +++ b/config.go @@ -313,9 +313,9 @@ func (c *Config) Validate() error { return ConfigurationError("Net.WriteTimeout must be > 0") case c.Net.KeepAlive < 0: return ConfigurationError("Net.KeepAlive must be >= 0") - case (c.Net.SASL.Enable == true) && (c.Net.SASL.User == ""): + case c.Net.SASL.Enable == true && c.Net.SASL.User == "": return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled") - case (c.Net.SASL.Enable == true) && (c.Net.SASL.Password == ""): + case c.Net.SASL.Enable == true && c.Net.SASL.Password == "": return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled") } From d0d6717d96f3af442f1636905fc4cbae55821a67 Mon Sep 17 00:00:00 2001 From: Shriram Rajagopalan Date: Tue, 3 May 2016 21:15:51 -0400 Subject: [PATCH 9/9] rename doSASLPlainAuth to sendAndReceiveSASLPlainAuth --- broker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker.go b/broker.go index 5138a6957..a9b0cf437 100644 --- a/broker.go +++ b/broker.go @@ -85,7 +85,7 @@ func (b *Broker) Open(conf *Config) error { b.conf = conf if conf.Net.SASL.Enable { - b.connErr = b.doSASLPlainAuth() + b.connErr = b.sendAndReceiveSASLPlainAuth() if b.connErr != nil { err = b.conn.Close() if err == nil { @@ -490,7 +490,7 @@ func (b *Broker) responseReceiver() { // When credentials are valid, Kafka returns a 4 byte array of null characters. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way // of responding to bad credentials but thats how its being done today. -func (b *Broker) doSASLPlainAuth() error { +func (b *Broker) sendAndReceiveSASLPlainAuth() error { length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) authBytes := make([]byte, length+4) //4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length))