Skip to content

Commit

Permalink
Merge pull request containerd#3156 from thaJeztah/1.1_backport_issue_…
Browse files Browse the repository at this point in the history
…3118

[release/1.1 backport] runtime/v1/linux/proc/io: io race
  • Loading branch information
dmcgowan authored Apr 1, 2019
2 parents 2bf4d3a + de85314 commit 01cd85f
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions linux/proc/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"io"
"os"
"sync"
"sync/atomic"
"syscall"

"github.com/containerd/containerd/log"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
)
Expand All @@ -38,7 +40,7 @@ var bufPool = sync.Pool{
}

func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile io.WriteCloser
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
Expand All @@ -52,7 +54,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(wc, rio.Stdout(), *p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
log.G(ctx).Warn("error copying stdout")
}
wg.Done()
wc.Close()
if rc != nil {
Expand All @@ -69,7 +73,9 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(wc, rio.Stderr(), *p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
log.G(ctx).Warn("error copying stderr")
}
wg.Done()
wc.Close()
if rc != nil {
Expand All @@ -96,14 +102,18 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
}
} else {
if sameFile != nil {
sameFile.count++
i.dest(sameFile, nil)
continue
}
if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
}
if stdout == stderr {
sameFile = fw
sameFile = &countingWriteCloser{
WriteCloser: fw,
count: 1,
}
}
}
i.dest(fw, fr)
Expand All @@ -129,6 +139,19 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
return nil
}

// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times.
type countingWriteCloser struct {
io.WriteCloser
count int64
}

func (c *countingWriteCloser) Close() error {
if atomic.AddInt64(&c.count, -1) > 0 {
return nil
}
return c.WriteCloser.Close()
}

// isFifo checks if a file is a fifo
// if the file does not exist then it returns false
func isFifo(path string) (bool, error) {
Expand Down

0 comments on commit 01cd85f

Please sign in to comment.