From e3281273b85f40851e4eb696beb6ba171d9b9a01 Mon Sep 17 00:00:00 2001
From: abushwang <abushwangs@gmail.com>
Date: Tue, 19 Nov 2024 19:08:06 +0800
Subject: [PATCH] fuse passthrough: fix some review comments

Signed-off-by: abushwang <abushwangs@gmail.com>
---
 cache/cache.go                     |  2 ++
 cmd/containerd-stargz-grpc/main.go |  4 ++++
 fs/config/config.go                |  2 +-
 fs/layer/node.go                   |  7 ++++---
 fs/reader/reader.go                | 30 ++++++++++++++++++++++++------
 5 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/cache/cache.go b/cache/cache.go
index c21399f7f..ea43a6690 100644
--- a/cache/cache.go
+++ b/cache/cache.go
@@ -82,6 +82,8 @@ type BlobCache interface {
 type Reader interface {
 	io.ReaderAt
 	Close() error
+
+	// If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough
 	GetReaderAt() io.ReaderAt
 }
 
diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go
index df12670a2..c80469784 100644
--- a/cmd/containerd-stargz-grpc/main.go
+++ b/cmd/containerd-stargz-grpc/main.go
@@ -138,6 +138,10 @@ func main() {
 	// Configure FUSE passthrough
 	if config.Config.Config.FuseConfig.PassThrough, err = isFusePthEnable(); err != nil {
 		log.G(ctx).Warnf("failed to check FUSE passthrough support")
+	} else if config.Config.Config.FuseConfig.PassThrough {
+		// Always set Direct to true to ensure that
+		// *directoryCache.Get always return *os.File instead of buffer
+		config.Config.Config.DirectoryCacheConfig.Direct = true
 	}
 
 	// Configure keychain
diff --git a/fs/config/config.go b/fs/config/config.go
index 4d9db30f2..44b87e8d6 100644
--- a/fs/config/config.go
+++ b/fs/config/config.go
@@ -150,5 +150,5 @@ type FuseConfig struct {
 	EntryTimeout int64 `toml:"entry_timeout"`
 
 	// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
-	PassThrough bool `toml:"passthrough"`
+	PassThrough bool `toml:"passthrough" default:"false"`
 }
diff --git a/fs/layer/node.go b/fs/layer/node.go
index 0c30e80bd..02f9e6488 100644
--- a/fs/layer/node.go
+++ b/fs/layer/node.go
@@ -356,10 +356,11 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu
 		if getter, ok := ra.(reader.PassthroughFdGetter); ok {
 			fd, err := getter.GetPassthroughFd()
 			if err != nil {
-				n.fs.s.report(fmt.Errorf("node.Open: %v", err))
-				return nil, 0, syscall.EIO
+				n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err))
+				n.fs.passThrough = false
+			} else {
+				f.InitFd(int(fd))
 			}
-			f.InitFd(int(fd))
 		}
 	}
 
diff --git a/fs/reader/reader.go b/fs/reader/reader.go
index 086fb031b..4da2fa0e1 100644
--- a/fs/reader/reader.go
+++ b/fs/reader/reader.go
@@ -496,9 +496,9 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {
 
 func (sf *file) GetPassthroughFd() (uintptr, error) {
 	var (
-		offset           int64 = 0
+		offset           int64
 		firstChunkOffset int64 = -1
-		totalSize        int64 = 0
+		totalSize        int64
 	)
 
 	for {
@@ -538,9 +538,9 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
 
 func (sf *file) prefetchEntireFile() error {
 	var (
-		offset           int64 = 0
+		offset           int64
 		firstChunkOffset int64 = -1
-		totalSize        int64 = 0
+		totalSize        int64
 	)
 	combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer)
 	combinedBuffer.Reset()
@@ -554,10 +554,28 @@ func (sf *file) prefetchEntireFile() error {
 		if firstChunkOffset == -1 {
 			firstChunkOffset = chunkOffset
 		}
+
+		id := genID(sf.id, chunkOffset, chunkSize)
 		b := sf.gr.bufPool.Get().(*bytes.Buffer)
 		b.Reset()
 		b.Grow(int(chunkSize))
 		ip := b.Bytes()[:chunkSize]
+
+		// Check if the content exists in the cache
+		if r, err := sf.gr.cache.Get(id); err == nil {
+			n, err := r.ReadAt(ip, 0)
+			if (err == nil || err == io.EOF) && int64(n) == chunkSize {
+				combinedBuffer.Write(ip[:n])
+				totalSize += int64(n)
+				offset = chunkOffset + int64(n)
+				r.Close()
+				sf.gr.putBuffer(b)
+				continue
+			}
+			r.Close()
+		}
+
+		// cache miss, prefetch the whole chunk
 		if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
 			sf.gr.putBuffer(b)
 			return fmt.Errorf("failed to read data: %w", err)
@@ -572,8 +590,8 @@ func (sf *file) prefetchEntireFile() error {
 		sf.gr.putBuffer(b)
 	}
 	combinedIp := combinedBuffer.Bytes()
-	id := genID(sf.id, firstChunkOffset, totalSize)
-	sf.gr.cacheData(combinedIp, id)
+	combinedId := genID(sf.id, firstChunkOffset, totalSize)
+	sf.gr.cacheData(combinedIp, combinedId)
 	return nil
 }