Skip to content

Commit

Permalink
refactor: Migrate the SocketStream to use the new LineReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jaqx0r committed Jul 17, 2024
1 parent e856661 commit d44be39
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package logstream

import (
"bytes"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -108,8 +107,8 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa

func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) {
defer wg.Done()
b := make([]byte, defaultReadBufferSize)
partial := bytes.NewBufferString("")

lr := NewLineReader(ss.sourcename, ss.lines, c, defaultReadBufferSize)
var total int
defer func() {
glog.V(2).Infof("stream(%s): read total %d bytes from %s", ss.sourcename, c, total)
Expand All @@ -119,14 +118,15 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
logErrors.Add(ss.address, 1)
glog.Info(err)
}
lr.Finish(ctx)
logCloses.Add(ss.address, 1)
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, c)

for {
n, err := c.Read(b)
n, err := lr.ReadAndSend(ctx)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ss.sourcename, n, err)

if ss.staleTimer != nil {
Expand All @@ -135,8 +135,6 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake

if n > 0 {
total += n
//nolint:contextcheck
ss.decodeAndSend(ctx, n, b[:n], partial)
ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel)

// No error implies more to read, so restart the loop.
Expand All @@ -146,11 +144,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
}

if IsExitableError(err) {
if partial.Len() > 0 {
ss.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s): exiting, conn has error %s", ss.sourcename, err)

return
}

Expand Down

0 comments on commit d44be39

Please sign in to comment.