Skip to content

Commit

Permalink
fix: Handle oneShot mode in the dgramStream.
Browse files Browse the repository at this point in the history
In this mode we merely tell datagrams to exit when a zero byte read occurs in
oneshot mode.  This is the same as before except we must also explicitly tell
the stream to do it in this mode.
  • Loading branch information
jaqx0r committed Jun 8, 2024
1 parent aa61830 commit 6ab8d9c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
13 changes: 10 additions & 3 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type dgramStream struct {
stopChan chan struct{} // Close to start graceful shutdown.
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine) (LogStream, error) {
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})}
if err := ss.stream(ctx, wg, waker); err != nil {
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
return ss, nil
Expand All @@ -50,7 +50,7 @@ func (ss *dgramStream) LastReadTime() time.Time {
// The read buffer size for datagrams.
const datagramReadBufferSize = 131072

func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {
func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ss.scheme, ss.address)
if err != nil {
logErrors.Add(ss.address, 1)
Expand Down Expand Up @@ -89,6 +89,13 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
}
return
}
select {
case <-ss.stopChan:
glog.V(2).Infof("%v: exiting because zero byte read after Stop", c)
Expand Down
4 changes: 2 additions & 2 deletions internal/tailer/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st
default:
glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pathname)
case "unixgram":
return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines)
return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot)
case "unix":
return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot)
case "tcp":
return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot)
case "udp":
return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines)
return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot)
case "", "file":
path = u.Path
}
Expand Down
4 changes: 0 additions & 4 deletions internal/tailer/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,6 @@ func (t *Tailer) TailPath(pathname string) error {
if err != nil {
return err
}
if t.oneShot {
glog.V(2).Infof("Starting oneshot read at startup of %q", pathname)
l.Stop()
}
t.logstreams[pathname] = l
glog.Infof("Tailing %s", pathname)
logCount.Add(1)
Expand Down

0 comments on commit 6ab8d9c

Please sign in to comment.