Skip to content

Commit

Permalink
Refine err msg in SetOffsetAt() (#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
haorenfsa authored Oct 18, 2022
1 parent bef2911 commit 40898d3
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,12 +1083,16 @@ func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
}
r.mutex.Unlock()

if len(r.config.Brokers) < 1 {
return errors.New("no brokers in config")
}
var conn *Conn
var err error
for _, broker := range r.config.Brokers {
conn, err := r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
if err != nil {
continue
}

deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
offset, err := conn.ReadOffset(t)
Expand All @@ -1099,7 +1103,7 @@ func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {

return r.SetOffset(offset)
}
return fmt.Errorf("error setting offset for timestamp %+v", t)
return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
}

// Stats returns a snapshot of the reader stats since the last time the method
Expand Down

0 comments on commit 40898d3

Please sign in to comment.