From b42feb0c06ef349333538ad7667a970b46dc2632 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Sun, 11 Aug 2024 23:40:03 +0530 Subject: [PATCH 1/3] return error when download fails during overwriting a block --- component/block_cache/block_cache.go | 28 +++++++---- component/block_cache/block_cache_test.go | 57 ++++++++++++++++++++++- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/component/block_cache/block_cache.go b/component/block_cache/block_cache.go index 31abae94c..def3a9988 100644 --- a/component/block_cache/block_cache.go +++ b/component/block_cache/block_cache.go @@ -610,10 +610,7 @@ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Bl log.Err("BlockCache::getBlock : Failed to download block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index) // Remove this node from handle so that next read retries to download the block again - _ = handle.Buffers.Cooking.Remove(block.node) - handle.RemoveValue(fmt.Sprintf("%v", block.id)) - block.ReUse() - bc.blockPool.Release(block) + bc.releaseFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } @@ -1036,10 +1033,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) // Remove this node from handle so that next read retries to download the block again - _ = handle.Buffers.Cooking.Remove(block.node) - handle.RemoveValue(fmt.Sprintf("%v", block.id)) - block.ReUse() - bc.blockPool.Release(block) + bc.releaseFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } } else { @@ -1078,6 +1072,15 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path) <-block.state block.Unblock() + + // if the block failed to download, it can't be used for overwriting + if block.IsFailed() { + log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) + + // Remove this node from handle so that next read retries to download the block again + bc.releaseFailedBlock(handle, block) + return nil, fmt.Errorf("failed to download block") + } } else if block.flags.IsSet(BlockFlagUploading) { // If the block is being staged, then wait till it is uploaded, // and then write to the same block and move it back to cooking queue @@ -1090,6 +1093,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) } block.node = handle.Buffers.Cooking.PushBack(block) } + block.flags.Clear(BlockFlagUploading) block.flags.Clear(BlockFlagDownloading) block.flags.Clear(BlockFlagSynced) @@ -1123,6 +1127,14 @@ func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error { return nil } +// remove the block which failed to download so that it can be used again +func (bc *BlockCache) releaseFailedBlock(handle *handlemap.Handle, block *Block) { + _ = handle.Buffers.Cooking.Remove(block.node) + handle.RemoveValue(fmt.Sprintf("%v", block.id)) + block.ReUse() + bc.blockPool.Release(block) +} + func (bc *BlockCache) printCooking(handle *handlemap.Handle) { //nolint nodeList := handle.Buffers.Cooking node := nodeList.Front() diff --git a/component/block_cache/block_cache_test.go b/component/block_cache/block_cache_test.go index 4bcabcb93..7aa2caffa 100644 --- a/component/block_cache/block_cache_test.go +++ b/component/block_cache/block_cache_test.go @@ -2255,13 +2255,68 @@ func (suite *blockCacheTestSuite) TestBlockFailOverwrite() { size: _1MB, } - // write offset 0 where block 0 download will fail + // write at offset 0 where block 0 download will fail n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:1*_1MB]}) suite.assert.NotNil(err) suite.assert.Contains(err.Error(), "failed to download block") suite.assert.Equal(n, 0) suite.assert.False(h.Dirty()) + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + + fs, err := os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(fs.Size(), int64(0)) +} + +func (suite *blockCacheTestSuite) TestBlockDownloadFailed() { + cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10" + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR}) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + // updating the size to replicate the download failure + h.Size = int64(4 * _1MB) + + data := make([]byte, _1MB) + n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data}) + suite.assert.NotNil(err) + suite.assert.Contains(err.Error(), "failed to download block") + suite.assert.Equal(n, 0) + + // 1-4MB data being prefetched in blocks 1-3 + suite.assert.Equal(h.Buffers.Cooking.Len(), 3) + + // write at offset 1MB where block 1 download will fail + n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(_1MB), Data: dataBuff[:1*_1MB]}) + suite.assert.NotNil(err) + suite.assert.Contains(err.Error(), "failed to download block") + suite.assert.Equal(n, 0) + suite.assert.False(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + fs, err := os.Stat(storagePath) suite.assert.Nil(err) suite.assert.Equal(fs.Size(), int64(0)) From bb3f357aa6f3059113694b7bcd32cb55a7f37ec7 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Mon, 12 Aug 2024 10:41:36 +0530 Subject: [PATCH 2/3] Adding changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8e852aab..bd3d0287d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Flush shall only sync the blocks to storage and not delete them from local cache. - Random write has been re-enabled in block cache. - Writing to an uncommitted block which has been deleted from the in-memory cache. +- Return failure if download of a block failed which is to be updated. ## 2.3.1 (Unreleased) **NOTICE** From d99a04cef3febd7bc7fee94a48a64626dbb5bdc5 Mon Sep 17 00:00:00 2001 From: souravgupta Date: Mon, 12 Aug 2024 10:47:39 +0530 Subject: [PATCH 3/3] Updating changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd3d0287d..c4384ee48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ - Flush shall only sync the blocks to storage and not delete them from local cache. - Random write has been re-enabled in block cache. - Writing to an uncommitted block which has been deleted from the in-memory cache. -- Return failure if download of a block failed which is to be updated. +- Check download status of a block before updating and return error if it failed to download. ## 2.3.1 (Unreleased) **NOTICE**