Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ordering of offset commit requests #2941

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sarama
import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -105,6 +107,111 @@ func TestNewOffsetManager(t *testing.T) {
}
}

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
mu := &sync.Mutex{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below, just go ahead and declare the variable instead: var mu sync.Mutex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer - which I've declared as per your suggestion.

lastOffset := map[int32]int64{}
outOfOrder := ""
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"MetadataRequest": func(req *request) (rest encoderWithHeader) {
resp := new(MetadataResponse)
resp.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
return resp
},
"FindCoordinatorRequest": func(req *request) (rest encoderWithHeader) {
resp := new(FindCoordinatorResponse)
resp.Coordinator = &Broker{id: seedBroker.brokerID, addr: seedBroker.Addr()}
return resp
},
"OffsetFetchRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetFetchRequest)
resp := new(OffsetFetchResponse)
resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
for topic, partitions := range req.partitions {
for _, partition := range partitions {
if _, ok := resp.Blocks[topic]; !ok {
resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
}
resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
Offset: 0,
Err: ErrNoError,
}
}
}
return resp
},
"OffsetCommitRequest": func(r *request) (rest encoderWithHeader) {
req := r.body.(*OffsetCommitRequest)
func() {
mu.Lock()
defer mu.Unlock()
if outOfOrder == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer reordering this to an early return/guard condition:

if outOfOrder != "" {
	return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer

for partition, offset := range req.blocks["topic"] {
last := lastOffset[partition]
if last > offset.offset {
outOfOrder =
fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a sync/atomic.Value with a pointer to a struct{ partition, last, offset IntType } shape struct, to avoid the inherent serialization caused by mutex locking.

Copy link
Member Author

@prestona prestona Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I ended up using an atomic.Pointer, as it avoids the need to make a type assertion when calling Load().

}
lastOffset[partition] = offset.offset
}
}
}()

// Potentially yield, to try and avoid each Go routine running sequentially to completion
runtime.Gosched()

resp := new(OffsetCommitResponse)
resp.Errors = map[string]map[int32]KError{}
resp.Errors["topic"] = map[int32]KError{}
for partition := range req.blocks["topic"] {
resp.Errors["topic"][partition] = ErrNoError
}
return resp
},
})
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}
defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Error(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the test keep going after this? om is almost certainly a nil pointer, so the first use is going to nil pointer exception, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted. Substituted with t.Fatal(...).

}
defer safeClose(t, om)

const numPartitions = 10
const commitsPerPartition = 1000

wg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer simply declaring a variable var wg sync.WaitGroup rather than assigning a zero value / pointer to zero value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

for p := 0; p < numPartitions; p++ {
pom, err := om.ManagePartition("topic", int32(p))
if err != nil {
t.Error(err)
}

wg.Add(1)
go func() {
for c := 0; c < commitsPerPartition; c++ {
pom.MarkOffset(int64(c+1), "")
om.Commit()
}
wg.Done()
}()
}

wg.Wait()
mu.Lock()
defer mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock should be unnecessary. The wg.Wait() ensures all writes should be complete, and performs all the synchronization necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer. The intent was to ensure that the error message assigned to outOfOrder (by the goroutine running the handler in the mock broker) was guaranteed to be observed by the goroutine running the test function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured it was added out of an abundance of caution. The information here (helpful apart from even this code review) is that the wg.Wait() itself is synchronizing, as all calls to wg.Done() happen before the return of wg.Wait(), the same as if you had used a mutex.

if outOfOrder != "" {
t.Error(outOfOrder)
}
}

var offsetsautocommitTestTable = []struct {
name string
set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
Expand Down
Loading