Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling random write in block cache #1495

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Bug Fixes**
- Flush shall only sync the blocks to storage and not delete them from local cache.

- Random write has been re-enabled in block cache.

## 2.3.1 (Unreleased)
**NOTICE**
Expand Down
81 changes: 25 additions & 56 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,36 @@ import (
type BlockCache struct {
internal.BaseComponent

blockSize uint64 // Size of each block to be cached
memSize uint64 // Mem size to be used for caching at the startup
tmpPath string // Disk path where these blocks will be cached
diskSize uint64 // Size of disk space allocated for the caching
diskTimeout uint32 // Timeout for which disk blocks will be cached
workers uint32 // Number of threads working to fetch the blocks
prefetch uint32 // Number of blocks to be prefetched
diskPolicy *tlru.TLRU // Disk cache eviction policy
blockPool *BlockPool // Pool of blocks
threadPool *ThreadPool // Pool of threads
fileLocks *common.LockMap // Locks for each file_blockid to avoid multiple threads to fetch same block
fileNodeMap sync.Map // Map holding files that are there in our cache
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read
enableRandomWrite bool // Enable random write in block cache
blockSize uint64 // Size of each block to be cached
memSize uint64 // Mem size to be used for caching at the startup
tmpPath string // Disk path where these blocks will be cached
diskSize uint64 // Size of disk space allocated for the caching
diskTimeout uint32 // Timeout for which disk blocks will be cached
workers uint32 // Number of threads working to fetch the blocks
prefetch uint32 // Number of blocks to be prefetched
diskPolicy *tlru.TLRU // Disk cache eviction policy
blockPool *BlockPool // Pool of blocks
threadPool *ThreadPool // Pool of threads
fileLocks *common.LockMap // Locks for each file_blockid to avoid multiple threads to fetch same block
fileNodeMap sync.Map // Map holding files that are there in our cache
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read

lazyWrite bool // Flag to indicate if lazy write is enabled
fileCloseOpt sync.WaitGroup // Wait group to wait for all async close operations to complete
}

// Structure defining your config parameters
type BlockCacheOptions struct {
BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"`
MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"`
TmpPath string `config:"path" yaml:"path,omitempty"`
DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"`
DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"`
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
EnableRandomWrite bool `config:"enable-random-write" yaml:"enable-random-write,omitempty"`
BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"`
MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"`
TmpPath string `config:"path" yaml:"path,omitempty"`
DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"`
DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"`
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
}

const (
Expand Down Expand Up @@ -213,7 +211,6 @@ func (bc *BlockCache) Configure(_ bool) error {
bc.prefetchOnOpen = conf.PrefetchOnOpen
bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
bc.noPrefetch = false
bc.enableRandomWrite = conf.EnableRandomWrite

if defaultMemSize && (uint64(bc.prefetch)*uint64(bc.blockSize)) > bc.memSize {
bc.prefetch = (MIN_PREFETCH * 2) + 1
Expand Down Expand Up @@ -275,8 +272,8 @@ func (bc *BlockCache) Configure(_ bool) error {
return fmt.Errorf("config error in %s [memory limit too low for configured prefetch]", bc.Name())
}

log.Info("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v, enable-random-write %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch, bc.enableRandomWrite)
log.Info("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)

bc.blockPool = NewBlockPool(bc.blockSize, bc.memSize)
if bc.blockPool == nil {
Expand Down Expand Up @@ -978,21 +975,6 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
return dataWritten, nil
}

// block random write if,
// - random write is disabled
// - block index of write offset is less than than the block index of the handle size
// - block index is present in the blockList map which indicates that it has been staged earlier
func (bc *BlockCache) blockRandomWrite(handle *handlemap.Handle, index uint64) bool {
if !bc.enableRandomWrite && index < bc.getBlockIndex(uint64(handle.Size)) {
shouldCommit, shouldDownload := shouldCommitAndDownload(int64(index), handle)
if !shouldCommit && !shouldDownload {
return false
}
return true
}
return false
}

func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) (*Block, error) {
// Check the given block index is already available or not
index := bc.getBlockIndex(offset)
Expand All @@ -1008,13 +990,6 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)

node, found := handle.GetValue(fmt.Sprintf("%v", index))
if !found {
// block not present in the buffer list
// check if it is a random write case and should be blocked
if bc.blockRandomWrite(handle, index) {
log.Err("BlockCache::WriteFile : Random write detection for write offset %v and block %v, where handle size is %v", offset, index, handle.Size)
return nil, fmt.Errorf("blocking random write for write offset %v and block %v where handle size is %v", offset, index, handle.Size)
}

// If too many buffers are piled up for this file then try to evict some of those which are already uploaded
if handle.Buffers.Cooked.Len()+handle.Buffers.Cooking.Len() >= int(bc.prefetch) {
bc.waitAndFreeUploadedBlocks(handle, 1)
Expand Down Expand Up @@ -1666,10 +1641,4 @@ func init() {

blockCachePrefetchOnOpen := config.AddBoolFlag("block-cache-prefetch-on-open", false, "Start prefetching on open or wait for first read.")
config.BindPFlag(compName+".prefetch-on-open", blockCachePrefetchOnOpen)

// flag is hidden and its default value is false.
// It will block the random write cases where data-integrity issues have been observed.
blockCacheEnableRandomWrite := config.AddBoolFlag("block-cache-enable-random-write", false, "Enable random write in block cache")
config.BindPFlag(compName+".enable-random-write", blockCacheEnableRandomWrite)
blockCacheEnableRandomWrite.Hidden = true
}
28 changes: 17 additions & 11 deletions component/block_cache/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlock() {
}

func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
tobj, err := setupPipeline(cfg)
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
Expand Down Expand Up @@ -962,8 +961,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
}

func (suite *blockCacheTestSuite) TestWritefileWithAppend() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
tobj, err := setupPipeline(cfg)
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
Expand Down Expand Up @@ -1514,7 +1512,7 @@ func (suite *blockCacheTestSuite) TestRandomWriteFileOneBlock() {
}

func (suite *blockCacheTestSuite) TestRandomWriteFlushAndOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down Expand Up @@ -1652,15 +1650,15 @@ func (suite *blockCacheTestSuite) TestDisableRandomWrite() {
suite.assert.Equal(fs.Size(), int64(0))
}

func (suite *blockCacheTestSuite) TestDisableRandomWriteExistingFile() {
func (suite *blockCacheTestSuite) TestRandomWriteExistingFile() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)

path := "testDisableRandomWriteExistingFile"
path := "testRandomWriteExistingFile"
storagePath := filepath.Join(tobj.fake_storage_path, path)

// write using block cache
Expand Down Expand Up @@ -1693,8 +1691,16 @@ func (suite *blockCacheTestSuite) TestDisableRandomWriteExistingFile() {

// write randomly in new handle at offset 2MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: nh, Offset: int64(2 * _1MB), Data: dataBuff[0:10]})
suite.assert.NotNil(err)
suite.assert.Equal(n, 0)
suite.assert.Nil(err)
suite.assert.Equal(n, 10)
suite.assert.True(nh.Dirty())

err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: nh})
suite.assert.Nil(err)

fs, err = os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(5*_1MB))
}

func (suite *blockCacheTestSuite) TestPreventRaceCondition() {
Expand Down Expand Up @@ -2055,7 +2061,7 @@ func (suite *blockCacheTestSuite) TestBlockParallelReadAndWriteValidation() {
}

func (suite *blockCacheTestSuite) TestBlockOverwriteValidation() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down Expand Up @@ -2152,7 +2158,7 @@ func (suite *blockCacheTestSuite) TestBlockOverwriteValidation() {
}

func (suite *blockCacheTestSuite) TestBlockFailOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down
1 change: 0 additions & 1 deletion testdata/config/azure_block_perf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ block_cache:
parallelism: 128
disk-timeout-sec: 200
prefetch-on-open: true
enable-random-write: true

attr_cache:
timeout-sec: 7200
Expand Down