Skip to content

Commit

Permalink
refactor: Merge the oneshot and regular socket connection handlers.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaqx0r committed Jul 5, 2024
1 parent bfe5fec commit cf3931d
Showing 1 changed file with 22 additions and 29 deletions.
51 changes: 22 additions & 29 deletions internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,18 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa
}
glog.V(2).Infof("stream(%s:%s): opened new socket listener %+v", ss.scheme, ss.address, l)

initDone := make(chan struct{})
// signals when a connection has been opened
started := make(chan struct{})
// tracks connection handling routines
var connWg sync.WaitGroup

// Set up for shutdown
wg.Add(1)
go func() {
defer wg.Done()
// If oneshot, wait only for the one conn handler to start, otherwise wait for context Done or stopChan.
<-initDone
// If oneshot, wait only for the one conn handler to start, otherwise
// wait for context Done or stopChan.
<-started
if !ss.oneShot {
<-ctx.Done()
}
Expand All @@ -67,46 +72,34 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa
if err != nil {
glog.Info(err)
}
connWg.Wait()
if !ss.oneShot {
close(ss.lines)
}
}()

acceptConn := func() error {
c, err := l.Accept()
if err != nil {
glog.Info(err)
return err
}
glog.V(2).Infof("stream(%s:%s): got new conn %v", ss.scheme, ss.address, c)
wg.Add(1)
go ss.handleConn(ctx, wg, waker, c)
return nil
}

if ss.oneShot {
wg.Add(1)
go func() {
defer wg.Done()
if err := acceptConn(); err != nil {
glog.Info(err)
}
glog.Infof("stream(%s:%s): oneshot mode, returning", ss.scheme, ss.address)
close(initDone)
}()
return nil
}
var connOnce sync.Once

wg.Add(1)
go func() {
defer wg.Done()
for {
if err := acceptConn(); err != nil {
c, err := l.Accept()
if err != nil {
glog.Info(err)
return
}
glog.V(2).Infof("stream(%s:%s): got new conn %v", ss.scheme, ss.address, c)
connWg.Add(1)
go ss.handleConn(ctx, &connWg, waker, c)
connOnce.Do(func() { close(started) })
if ss.oneShot {
glog.Infof("stream(%s:%s): oneshot mode, exiting accept loop", ss.scheme, ss.address)
return
}
}
}()
close(initDone)

return nil
}

Expand Down

0 comments on commit cf3931d

Please sign in to comment.