Skip to content

Commit

Permalink
refactor: Remove IsComplete from the LogStream interface.
Browse files Browse the repository at this point in the history
Cleanup happens in the tailer when the stream channel closes, so we can remove
the GC function to do cleanup.

Removing the method means we need a new way to check for completion in test,
which we can do by looking at the value coming off the channel.
  • Loading branch information
jaqx0r committed Jul 5, 2024
1 parent 7aebf03 commit f2d8c1c
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 42 deletions.
4 changes: 2 additions & 2 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
cancel()
wg.Wait()

if !ds.IsComplete() {
if v := <-ds.Lines(); v != nil {
t.Errorf("expecting dgramstream to be complete because socket closed")
}
}))
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {

checkLineDiff()

if !ds.IsComplete() {
if v := <-ds.Lines(); v != nil {
t.Errorf("expecting dgramstream to be complete because cancel")
}
}))
Expand Down
6 changes: 0 additions & 6 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
return nil
}

func (fs *fileStream) IsComplete() bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.completed
}

// Lines implements the LogStream interface, returning the output lines channel.
func (fs *fileStream) Lines() <-chan *logline.LogLine {
return fs.lines
Expand Down
12 changes: 6 additions & 6 deletions internal/tailer/logstream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestFileStreamRead(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFileStreamReadOneShot(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
cancel()
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
cancel()
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
cancel()
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestFileStreamPartialRead(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because cancellation")
}
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestFileStreamReadToEOFOnCancel(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because cancellation")
}
}
4 changes: 2 additions & 2 deletions internal/tailer/logstream/filestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestFileStreamRotation(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestFileStreamURL(t *testing.T) {

checkLineDiff()

if !fs.IsComplete() {
if v := <-fs.Lines(); v != nil {
t.Errorf("expecting filestream to be complete because stopped")
}
}
Expand Down
1 change: 0 additions & 1 deletion internal/tailer/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ var (

// LogStream.
type LogStream interface {
IsComplete() bool // True if the logstream has completed work and cannot recover. The caller should clean up this logstream, creating a new logstream on a pathname if necessary.
Lines() <-chan *logline.LogLine // Returns the output channel of this LogStream.
}

Expand Down
8 changes: 4 additions & 4 deletions internal/tailer/logstream/pipestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {

cancel()

if !ps.IsComplete() {
if v := <-ps.Lines(); v != nil {
t.Errorf("expecting pipestream to be complete because fifo closed")
}
})(t)
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {

checkLineDiff()

if !ps.IsComplete() {
if v := <-ps.Lines(); v != nil {
t.Errorf("expecting pipestream to be complete because cancelled")
}
})(t)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestPipeStreamReadURL(t *testing.T) {

cancel()

if !ps.IsComplete() {
if v := <-ps.Lines(); v != nil {
t.Errorf("expecting pipestream to be complete because fifo closed")
}
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
checkLineDiff()

cancel()
if !ps.IsComplete() {
if v := <-ps.Lines(); v != nil {
t.Errorf("expecting pipestream to be complete beacuse fifo closed")
}
}
4 changes: 2 additions & 2 deletions internal/tailer/logstream/socketstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {

checkLineDiff()

if !ss.IsComplete() {
if v := <-ss.Lines(); v != nil {
t.Errorf("expecting socketstream to be complete because socket closed")
}

Expand Down Expand Up @@ -115,7 +115,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {

checkLineDiff()

if !ss.IsComplete() {
if v := <-ss.Lines(); v != nil {
t.Errorf("expecting socketstream to be complete because cancel")
}
}))
Expand Down
19 changes: 0 additions & 19 deletions internal/tailer/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,26 +376,7 @@ func (t *Tailer) StartGcPoller(ctx context.Context) {
case <-ctx.Done():
return
case <-t.gcWaker.Wake():
if err := t.RemoveCompletedLogstreams(); err != nil {
glog.Info(err)
}
}
}
}()
}

// RemoveCompletedLogstreams checks if current logstreams have completed,
// removing it from the map if so.
func (t *Tailer) RemoveCompletedLogstreams() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for name, l := range t.logstreams {
if l.IsComplete() {
glog.Infof("%s is complete", name)
delete(t.logstreams, name)
logCount.Add(-1)
continue
}
}
return nil
}

0 comments on commit f2d8c1c

Please sign in to comment.