Skip to content

Commit

Permalink
Force close stdout/stderr after grace
Browse files Browse the repository at this point in the history
This commit changes the force closing of the stdout/stderr file
descriptor from closing immediately to being closed after a grace
period. This allows the created process to close its own file and allows
copying of the data.
  • Loading branch information
dadgar committed May 31, 2018
1 parent 09b90e4 commit 4150296
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// tree for finding out the pids that the executor and it's child processes
// have forked
pidScanInterval = 5 * time.Second

// processOutputCloseTolerance is the length of time we will wait for the
// launched process to close its stdout/stderr before we force close it. If
// data is writen after this tolerance, we will not capture it.
processOutputCloseTolerance = 2 * time.Second
)

var (
Expand Down Expand Up @@ -285,6 +290,11 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.cmd.Args, err)
}

// Close the files. This is copied from the os/exec package.
e.lro.processOutWriter.Close()
e.lre.processOutWriter.Close()

go e.collectPids()
go e.wait()
ic := e.resConCtx.getIsolationConfig()
Expand Down Expand Up @@ -832,9 +842,10 @@ func (e *UniversalExecutor) collectLogs(we io.Writer, wo io.Writer) {
// log rotator data. The processOutWriter should be attached to the process and
// data will be copied from the reader to the rotator.
type logRotatorWrapper struct {
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
processOutWriter *os.File
processOutReader *os.File
rotatorWriter *logging.FileRotator
hasFinishedCopied chan struct{}
}

// NewLogRotatorWrapper takes a rotator and returns a wrapper that has the
Expand All @@ -846,9 +857,10 @@ func NewLogRotatorWrapper(rotator *logging.FileRotator) (*logRotatorWrapper, err
}

wrap := &logRotatorWrapper{
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
processOutWriter: w,
processOutReader: r,
rotatorWriter: rotator,
hasFinishedCopied: make(chan struct{}, 1),
}
wrap.start()
return wrap, nil
Expand All @@ -860,13 +872,20 @@ func (l *logRotatorWrapper) start() {
go func() {
io.Copy(l.rotatorWriter, l.processOutReader)
l.processOutReader.Close() // in case io.Copy stopped due to write error
close(l.hasFinishedCopied)
}()
return
}

// Close closes the rotator and the process writer to ensure that the Wait
// command exits.
func (l *logRotatorWrapper) Close() error {
// Wait up to the close tolerance before we force close
select {
case <-l.hasFinishedCopied:
case <-time.After(processOutputCloseTolerance):
}
err := l.processOutReader.Close()
l.rotatorWriter.Close()
return l.processOutWriter.Close()
return err
}

0 comments on commit 4150296

Please sign in to comment.