-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: fix Windows blocking on pipe close #4400
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -364,7 +364,7 @@ func (e *UniversalExecutor) configureLoggers() error { | |
return fmt.Errorf("error creating new stdout log file for %q: %v", e.ctx.Task.Name, err) | ||
} | ||
|
||
r, err := NewLogRotatorWrapper(lro) | ||
r, err := newLogRotatorWrapper(e.logger, lro) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -378,7 +378,7 @@ func (e *UniversalExecutor) configureLoggers() error { | |
return fmt.Errorf("error creating new stderr log file for %q: %v", e.ctx.Task.Name, err) | ||
} | ||
|
||
r, err := NewLogRotatorWrapper(lre) | ||
r, err := newLogRotatorWrapper(e.logger, lre) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -851,11 +851,12 @@ type logRotatorWrapper struct { | |
processOutReader *os.File | ||
rotatorWriter *logging.FileRotator | ||
hasFinishedCopied chan struct{} | ||
logger *log.Logger | ||
} | ||
|
||
// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the | ||
// newLogRotatorWrapper takes a rotator and returns a wrapper that has the | ||
// processOutWriter to attach to the processes stdout or stderr. | ||
func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, error) { | ||
func newLogRotatorWrapper(logger *log.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) { | ||
r, w, err := os.Pipe() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create os.Pipe for extracting logs: %v", err) | ||
|
@@ -865,7 +866,8 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err | |
processOutWriter: w, | ||
processOutReader: r, | ||
rotatorWriter: rotator, | ||
hasFinishedCopied: make(chan struct{}, 1), | ||
hasFinishedCopied: make(chan struct{}), | ||
logger: logger, | ||
} | ||
wrap.start() | ||
return wrap, nil | ||
|
@@ -875,22 +877,51 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err | |
// called by the constructor and not the user of the wrapper. | ||
func (l *logRotatorWrapper) start() { | ||
go func() { | ||
io.Copy(l.rotatorWriter, l.processOutReader) | ||
l.processOutReader.Close() // in case io.Copy stopped due to write error | ||
close(l.hasFinishedCopied) | ||
defer close(l.hasFinishedCopied) | ||
_, err := io.Copy(l.rotatorWriter, l.processOutReader) | ||
if err != nil { | ||
// Close reader to propagate io error across pipe. | ||
// Note that this may block until the process exits on | ||
// Windows due to | ||
// https://github.com/PowerShell/PowerShell/issues/4254 | ||
// or similar issues. Since this is already running in | ||
// a goroutine its safe to block until the process is | ||
// force-killed. | ||
l.processOutReader.Close() | ||
} | ||
}() | ||
return | ||
} | ||
|
||
// Close closes the rotator and the process writer to ensure that the Wait | ||
// command exits. | ||
func (l *logRotatorWrapper) Close() error { | ||
func (l *logRotatorWrapper) Close() { | ||
// Wait up to the close tolerance before we force close | ||
select { | ||
case <-l.hasFinishedCopied: | ||
case <-time.After(processOutputCloseTolerance): | ||
} | ||
err := l.processOutReader.Close() | ||
|
||
// Closing the read side of a pipe may block on Windows if the process | ||
// is being debugged as in: | ||
// https://github.com/PowerShell/PowerShell/issues/4254 | ||
// The pipe will be closed and cleaned up when the process exits. | ||
closeDone := make(chan struct{}) | ||
go func() { | ||
defer close(closeDone) | ||
err := l.processOutReader.Close() | ||
if err != nil && !strings.Contains(err.Error(), "file already closed") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this logging meant to be temporary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not? I don't think there's any way to know when it would be safe to remove. The contains check is just to prevent spamming the logs since we close the pipe multiple times. We could probably try to fix that, but I'm not sure it's worth the effort as there's no harm in multiple Closes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What value is logging the error giving if we don't change the outcome in the call site anyway by ignoring the error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly "file already closed" is the only error I've ever seen returned from I put it in in case it helped debug future issues similar to this. I have never seen anything like this behavior before (eg the blocking on Close), so I have little idea where extra logging might be helpful in the future or just noise. |
||
l.logger.Printf("[WARN] executor: error closing read-side of process output pipe: %v", err) | ||
} | ||
|
||
}() | ||
|
||
select { | ||
case <-closeDone: | ||
case <-time.After(processOutputCloseTolerance): | ||
l.logger.Printf("[WARN] executor: timed out waiting for read-side of process output pipe to close") | ||
} | ||
|
||
l.rotatorWriter.Close() | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error was never being checked, so there's no harm in elliding it in favor of logging errors directly from this method. |
||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added this close for processOutreader in 4150296 to fix this failing test TestExecutor_Start_Wait. Removing this entirely doesn't affect that test (I tested with -count 50 locally). Wondering if we can remove that
l.processOutReader.Close()
line 893 entirely. That would simplify all this even more.That test was failing because without the channel blocked wait on
hasFinishedCopied
, a very short lived command would never get its standard output read and stored in the log file. Now that we added a wait on that channel and a grace period of 2 seconds, I don't see why we need to call close on the processOutreader again in line 893.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of line 893 (now 890) is that it is to propagate errors from the io.Copy destination (
rotator
) back to the source (processOutReader
).So if
io.Copy
fails to write, it needs to signal toprocessOutReader
that i will never be read again byClose()
ing it.I think this will handle cases like running-out-of-disk where we can no longer write anything, so we signal that to the process by closing its output (and likely causing the process to crash).