diff --git a/cache/cache.go b/cache/cache.go index c21399f7f..ea43a6690 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -82,6 +82,8 @@ type BlobCache interface { type Reader interface { io.ReaderAt Close() error + + // If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough GetReaderAt() io.ReaderAt } diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index df12670a2..c80469784 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -138,6 +138,10 @@ func main() { // Configure FUSE passthrough if config.Config.Config.FuseConfig.PassThrough, err = isFusePthEnable(); err != nil { log.G(ctx).Warnf("failed to check FUSE passthrough support") + } else if config.Config.Config.FuseConfig.PassThrough { + // Always set Direct to true to ensure that + // *directoryCache.Get always return *os.File instead of buffer + config.Config.Config.DirectoryCacheConfig.Direct = true } // Configure keychain diff --git a/fs/config/config.go b/fs/config/config.go index 4d9db30f2..44b87e8d6 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -150,5 +150,5 @@ type FuseConfig struct { EntryTimeout int64 `toml:"entry_timeout"` // PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false. - PassThrough bool `toml:"passthrough"` + PassThrough bool `toml:"passthrough" default:"false"` } diff --git a/fs/layer/node.go b/fs/layer/node.go index 0c30e80bd..02f9e6488 100644 --- a/fs/layer/node.go +++ b/fs/layer/node.go @@ -356,10 +356,11 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu if getter, ok := ra.(reader.PassthroughFdGetter); ok { fd, err := getter.GetPassthroughFd() if err != nil { - n.fs.s.report(fmt.Errorf("node.Open: %v", err)) - return nil, 0, syscall.EIO + n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err)) + n.fs.passThrough = false + } else { + f.InitFd(int(fd)) } - f.InitFd(int(fd)) } } diff --git a/fs/reader/reader.go b/fs/reader/reader.go index 086fb031b..4da2fa0e1 100644 --- a/fs/reader/reader.go +++ b/fs/reader/reader.go @@ -496,9 +496,9 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { func (sf *file) GetPassthroughFd() (uintptr, error) { var ( - offset int64 = 0 + offset int64 firstChunkOffset int64 = -1 - totalSize int64 = 0 + totalSize int64 ) for { @@ -538,9 +538,9 @@ func (sf *file) GetPassthroughFd() (uintptr, error) { func (sf *file) prefetchEntireFile() error { var ( - offset int64 = 0 + offset int64 firstChunkOffset int64 = -1 - totalSize int64 = 0 + totalSize int64 ) combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer) combinedBuffer.Reset() @@ -554,10 +554,28 @@ func (sf *file) prefetchEntireFile() error { if firstChunkOffset == -1 { firstChunkOffset = chunkOffset } + + id := genID(sf.id, chunkOffset, chunkSize) b := sf.gr.bufPool.Get().(*bytes.Buffer) b.Reset() b.Grow(int(chunkSize)) ip := b.Bytes()[:chunkSize] + + // Check if the content exists in the cache + if r, err := sf.gr.cache.Get(id); err == nil { + n, err := r.ReadAt(ip, 0) + if (err == nil || err == io.EOF) && int64(n) == chunkSize { + combinedBuffer.Write(ip[:n]) + totalSize += int64(n) + offset = chunkOffset + int64(n) + r.Close() + sf.gr.putBuffer(b) + continue + } + r.Close() + } + + // cache miss, prefetch the whole chunk if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF { sf.gr.putBuffer(b) return fmt.Errorf("failed to read data: %w", err) @@ -572,8 +590,8 @@ func (sf *file) prefetchEntireFile() error { sf.gr.putBuffer(b) } combinedIp := combinedBuffer.Bytes() - id := genID(sf.id, firstChunkOffset, totalSize) - sf.gr.cacheData(combinedIp, id) + combinedId := genID(sf.id, firstChunkOffset, totalSize) + sf.gr.cacheData(combinedIp, combinedId) return nil }