Skip to content

Commit

Permalink
fuse passthrough: prefetchEntireFile just retry once to avoid excepti…
Browse files Browse the repository at this point in the history
…on stuck

Signed-off-by: abushwang <[email protected]>
  • Loading branch information
wswsmao committed Dec 2, 2024
1 parent 56c7b0f commit 9b54e47
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,30 +515,33 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {

id := genID(sf.id, firstChunkOffset, totalSize)

for {
// cache.PassThrough() is necessary to take over files
r, err := sf.gr.cache.Get(id, cache.PassThrough())
if err != nil {
if err := sf.prefetchEntireFile(); err != nil {
return 0, err
}
continue
// cache.PassThrough() is necessary to take over files
r, err := sf.gr.cache.Get(id, cache.PassThrough())
if err != nil {
if id, err = sf.prefetchEntireFile(); err != nil {
return 0, err
}

readerAt := r.GetReaderAt()
file, ok := readerAt.(*os.File)
if !ok {
r.Close()
return 0, fmt.Errorf("The cached ReaderAt is not of type *os.File, fd obtain failed")
// just retry once to avoid exception stuck
r, err = sf.gr.cache.Get(id, cache.PassThrough())
if err != nil {
return 0, err
}
}

fd := file.Fd()
readerAt := r.GetReaderAt()
file, ok := readerAt.(*os.File)
if !ok {
r.Close()
return fd, nil
return 0, fmt.Errorf("the cached ReaderAt is not of type *os.File, fd obtain failed")
}

fd := file.Fd()
r.Close()
return fd, nil
}

func (sf *file) prefetchEntireFile() error {
func (sf *file) prefetchEntireFile() (string, error) {
var (
offset int64
firstChunkOffset int64 = -1
Expand Down Expand Up @@ -581,11 +584,11 @@ func (sf *file) prefetchEntireFile() error {
// cache miss, prefetch the whole chunk
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
sf.gr.putBuffer(b)
return fmt.Errorf("failed to read data: %w", err)
return "", fmt.Errorf("failed to read data: %w", err)
}
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil {
sf.gr.putBuffer(b)
return err
return "", err
}
combinedBuffer.Write(ip)
totalSize += chunkSize
Expand All @@ -595,7 +598,7 @@ func (sf *file) prefetchEntireFile() error {
combinedIP := combinedBuffer.Bytes()
combinedID := genID(sf.id, firstChunkOffset, totalSize)
sf.gr.cacheData(combinedIP, combinedID)
return nil
return combinedID, nil
}

func (gr *reader) verifyOneChunk(entryID uint32, ip []byte, chunkDigestStr string) error {
Expand Down

0 comments on commit 9b54e47

Please sign in to comment.