Skip to content

Commit

Permalink
chore: Pass contexts instead of storing them in the logstreams.
Browse files Browse the repository at this point in the history
We don't use the contexts in struct anyway, so this is neater.

Fixes a lint warning.
  • Loading branch information
jaqx0r committed Jun 11, 2024
1 parent f35753e commit c967ed5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
9 changes: 4 additions & 5 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
)

type dgramStream struct {
ctx context.Context
cancel context.CancelFunc

lines chan<- *logline.LogLine
Expand All @@ -33,9 +32,9 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker,
if address == "" {
return nil, ErrEmptySocketAddress
}
ss := &dgramStream{scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
ss.ctx, ss.cancel = context.WithCancel(ctx)
if err := ss.stream(ss.ctx, wg, waker, oneShot); err != nil {
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
return ss, nil
Expand Down Expand Up @@ -110,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial)
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
ss.mu.Lock()
ss.lastReadTime = time.Now()
ss.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ var fileTruncates = expvar.NewMap("file_truncates_total")
// a new goroutine and closes itself down. The shared context is used for
// cancellation.
type fileStream struct {
ctx context.Context
cancel context.CancelFunc
lines chan<- *logline.LogLine

lines chan<- *logline.LogLine

pathname string // Given name for the underlying file on the filesystem

Expand All @@ -47,11 +47,11 @@ type fileStream struct {

// newFileStream creates a new log stream from a regular file.
func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
fs := &fileStream{pathname: pathname, lastReadTime: time.Now(), lines: lines}
fs.ctx, fs.cancel = context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines}
// Stream from the start of the file when in one shot mode.
streamFromStart := oneShot == OneShotEnabled
if err := fs.stream(fs.ctx, wg, waker, fi, oneShot, streamFromStart); err != nil {
if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil {
return nil, err
}
return fs, nil
Expand Down
12 changes: 6 additions & 6 deletions internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
)

type socketStream struct {
ctx context.Context
cancel context.CancelFunc
lines chan<- *logline.LogLine

lines chan<- *logline.LogLine

oneShot OneShotMode
scheme string // URL Scheme to listen with, either tcp or unix
Expand All @@ -33,9 +33,9 @@ func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker,
if address == "" {
return nil, ErrEmptySocketAddress
}
ss := &socketStream{ctx: ctx, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
ss.ctx, ss.cancel = context.WithCancel(ctx)
if err := ss.stream(ss.ctx, wg, waker); err != nil {
ctx, cancel := context.WithCancel(ctx)
ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
if err := ss.stream(ctx, wg, waker); err != nil {
return nil, err
}
return ss, nil
Expand Down Expand Up @@ -142,7 +142,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial)
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
ss.mu.Lock()
ss.lastReadTime = time.Now()
ss.mu.Unlock()
Expand Down

0 comments on commit c967ed5

Please sign in to comment.