Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configuring target kafka version #676

Merged
merged 1 commit into from
Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ type Config struct {
// in the background while user code is working, greatly improving throughput.
// Defaults to 256.
ChannelBufferSize int
// The version of Kafka that Sarama will assume it is running against.
// Defaults to the oldest supported stable version. Since Kafka provides
// backwards-compatibility, setting it to a version older than you have
// will not break anything, although it may prevent you from using the
// latest features. Setting it to a version greater than you are actually
// running may lead to random breakage.
Version KafkaVersion
}

// NewConfig returns a new configuration instance with sane defaults.
Expand Down Expand Up @@ -258,9 +265,9 @@ func NewConfig() *Config {
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.ChannelBufferSize = 256

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = V0_8_2_0

return c
}
Expand Down
38 changes: 38 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,41 @@ func newBufConn(conn net.Conn) *bufConn {
func (bc *bufConn) Read(b []byte) (n int, err error) {
return bc.buf.Read(b)
}

// KafkaVersion instances represent versions of the upstream Kafka broker.
type KafkaVersion struct {
// it's a struct rather than just typing the array directly to make it opaque and stop people
// generating their own arbitrary versions
version [4]uint
}

func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
return KafkaVersion{
version: [4]uint{major, minor, veryMinor, patch},
}
}

// IsAtLeast return true if and only if the version it is called on is
// greater than or equal to the version passed in:
// V1.IsAtLeast(V2) // false
// V2.IsAtLeast(V1) // true
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
for i := range v.version {
if v.version[i] > other.version[i] {
return true
} else if v.version[i] < other.version[i] {
return false
}
}
return true
}

// Effective constants defining the supported kafka versions.
var (
V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
)
21 changes: 21 additions & 0 deletions utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sarama

import "testing"

func TestVersionCompare(t *testing.T) {
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
t.Error("0.8.2.0 >= 0.8.2.1")
}
if !V0_8_2_1.IsAtLeast(V0_8_2_0) {
t.Error("! 0.8.2.1 >= 0.8.2.0")
}
if !V0_8_2_0.IsAtLeast(V0_8_2_0) {
t.Error("! 0.8.2.0 >= 0.8.2.0")
}
if !V0_9_0_0.IsAtLeast(V0_8_2_1) {
t.Error("! 0.9.0.0 >= 0.8.2.1")
}
if V0_8_2_1.IsAtLeast(V0_10_0_0) {
t.Error("0.8.2.1 >= 0.10.0.0")
}
}