Skip to content
This repository has been archived by the owner on Sep 25, 2023. It is now read-only.

Commit

Permalink
Fixed segmentio#428 by making reader.read respect context closure.
Browse files Browse the repository at this point in the history
  • Loading branch information
feldgendler committed Apr 14, 2020
1 parent e0af1cf commit 4d6d176
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
35 changes: 29 additions & 6 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,25 +1337,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
return
}

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
// readBatch wraps the call to conn.ReadBatchWith to make it interruptible.
// Conn methods are written in a non-interruptible style, so the only way to
// interrupt them is to close the connection in another goroutine.
func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) {
done := make(chan struct{})
defer close(done)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
go func() {
select {
case <-ctx.Done():
conn.Close()
case <-done:
return
}
}()

batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
return batch, ctx.Err()
}

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))

batch, err := r.readBatch(ctx, conn)
if err != nil {
return offset, err
}

highWaterMark := batch.HighWaterMark()

t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))

var msg Message
var err error
var size int64
var bytes int64

Expand Down
21 changes: 21 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,3 +1313,24 @@ func getOffsets(t *testing.T, config ReaderConfig) offsetFetchResponseV1 {

return offsets
}

func TestReaderClose(t *testing.T) {
t.Parallel()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: makeTopic(),
MaxWait: 2 * time.Second,
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, _ = r.FetchMessage(ctx)

t0 := time.Now()
r.Close()
if time.Since(t0) > 100*time.Millisecond {
t.Errorf("r.Close took too long")
}
}

0 comments on commit 4d6d176

Please sign in to comment.