Skip to content

Commit

Permalink
Enabling random write in block cache (#1495)
Browse files Browse the repository at this point in the history
* Enabling random write
  • Loading branch information
souravgupta-msft authored Aug 8, 2024
1 parent 78a335b commit f649724
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 69 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Bug Fixes**
- Flush shall only sync the blocks to storage and not delete them from local cache.

- Random write has been re-enabled in block cache.

## 2.3.1 (Unreleased)
**NOTICE**
Expand Down
81 changes: 25 additions & 56 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,36 @@ import (
type BlockCache struct {
internal.BaseComponent

blockSize uint64 // Size of each block to be cached
memSize uint64 // Mem size to be used for caching at the startup
tmpPath string // Disk path where these blocks will be cached
diskSize uint64 // Size of disk space allocated for the caching
diskTimeout uint32 // Timeout for which disk blocks will be cached
workers uint32 // Number of threads working to fetch the blocks
prefetch uint32 // Number of blocks to be prefetched
diskPolicy *tlru.TLRU // Disk cache eviction policy
blockPool *BlockPool // Pool of blocks
threadPool *ThreadPool // Pool of threads
fileLocks *common.LockMap // Locks for each file_blockid to avoid multiple threads to fetch same block
fileNodeMap sync.Map // Map holding files that are there in our cache
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read
enableRandomWrite bool // Enable random write in block cache
blockSize uint64 // Size of each block to be cached
memSize uint64 // Mem size to be used for caching at the startup
tmpPath string // Disk path where these blocks will be cached
diskSize uint64 // Size of disk space allocated for the caching
diskTimeout uint32 // Timeout for which disk blocks will be cached
workers uint32 // Number of threads working to fetch the blocks
prefetch uint32 // Number of blocks to be prefetched
diskPolicy *tlru.TLRU // Disk cache eviction policy
blockPool *BlockPool // Pool of blocks
threadPool *ThreadPool // Pool of threads
fileLocks *common.LockMap // Locks for each file_blockid to avoid multiple threads to fetch same block
fileNodeMap sync.Map // Map holding files that are there in our cache
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read

lazyWrite bool // Flag to indicate if lazy write is enabled
fileCloseOpt sync.WaitGroup // Wait group to wait for all async close operations to complete
}

// Structure defining your config parameters
type BlockCacheOptions struct {
BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"`
MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"`
TmpPath string `config:"path" yaml:"path,omitempty"`
DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"`
DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"`
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
EnableRandomWrite bool `config:"enable-random-write" yaml:"enable-random-write,omitempty"`
BlockSize float64 `config:"block-size-mb" yaml:"block-size-mb,omitempty"`
MemSize uint64 `config:"mem-size-mb" yaml:"mem-size-mb,omitempty"`
TmpPath string `config:"path" yaml:"path,omitempty"`
DiskSize uint64 `config:"disk-size-mb" yaml:"disk-size-mb,omitempty"`
DiskTimeout uint32 `config:"disk-timeout-sec" yaml:"timeout-sec,omitempty"`
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
}

const (
Expand Down Expand Up @@ -213,7 +211,6 @@ func (bc *BlockCache) Configure(_ bool) error {
bc.prefetchOnOpen = conf.PrefetchOnOpen
bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
bc.noPrefetch = false
bc.enableRandomWrite = conf.EnableRandomWrite

if defaultMemSize && (uint64(bc.prefetch)*uint64(bc.blockSize)) > bc.memSize {
bc.prefetch = (MIN_PREFETCH * 2) + 1
Expand Down Expand Up @@ -275,8 +272,8 @@ func (bc *BlockCache) Configure(_ bool) error {
return fmt.Errorf("config error in %s [memory limit too low for configured prefetch]", bc.Name())
}

log.Info("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v, enable-random-write %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch, bc.enableRandomWrite)
log.Info("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)

bc.blockPool = NewBlockPool(bc.blockSize, bc.memSize)
if bc.blockPool == nil {
Expand Down Expand Up @@ -978,21 +975,6 @@ func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error)
return dataWritten, nil
}

// block random write if,
// - random write is disabled
// - block index of write offset is less than than the block index of the handle size
// - block index is present in the blockList map which indicates that it has been staged earlier
func (bc *BlockCache) blockRandomWrite(handle *handlemap.Handle, index uint64) bool {
if !bc.enableRandomWrite && index < bc.getBlockIndex(uint64(handle.Size)) {
shouldCommit, shouldDownload := shouldCommitAndDownload(int64(index), handle)
if !shouldCommit && !shouldDownload {
return false
}
return true
}
return false
}

func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) (*Block, error) {
// Check the given block index is already available or not
index := bc.getBlockIndex(offset)
Expand All @@ -1008,13 +990,6 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64)

node, found := handle.GetValue(fmt.Sprintf("%v", index))
if !found {
// block not present in the buffer list
// check if it is a random write case and should be blocked
if bc.blockRandomWrite(handle, index) {
log.Err("BlockCache::WriteFile : Random write detection for write offset %v and block %v, where handle size is %v", offset, index, handle.Size)
return nil, fmt.Errorf("blocking random write for write offset %v and block %v where handle size is %v", offset, index, handle.Size)
}

// If too many buffers are piled up for this file then try to evict some of those which are already uploaded
if handle.Buffers.Cooked.Len()+handle.Buffers.Cooking.Len() >= int(bc.prefetch) {
bc.waitAndFreeUploadedBlocks(handle, 1)
Expand Down Expand Up @@ -1666,10 +1641,4 @@ func init() {

blockCachePrefetchOnOpen := config.AddBoolFlag("block-cache-prefetch-on-open", false, "Start prefetching on open or wait for first read.")
config.BindPFlag(compName+".prefetch-on-open", blockCachePrefetchOnOpen)

// flag is hidden and its default value is false.
// It will block the random write cases where data-integrity issues have been observed.
blockCacheEnableRandomWrite := config.AddBoolFlag("block-cache-enable-random-write", false, "Enable random write in block cache")
config.BindPFlag(compName+".enable-random-write", blockCacheEnableRandomWrite)
blockCacheEnableRandomWrite.Hidden = true
}
28 changes: 17 additions & 11 deletions component/block_cache/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlock() {
}

func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
tobj, err := setupPipeline(cfg)
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
Expand Down Expand Up @@ -962,8 +961,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
}

func (suite *blockCacheTestSuite) TestWritefileWithAppend() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
tobj, err := setupPipeline(cfg)
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
Expand Down Expand Up @@ -1514,7 +1512,7 @@ func (suite *blockCacheTestSuite) TestRandomWriteFileOneBlock() {
}

func (suite *blockCacheTestSuite) TestRandomWriteFlushAndOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down Expand Up @@ -1652,15 +1650,15 @@ func (suite *blockCacheTestSuite) TestDisableRandomWrite() {
suite.assert.Equal(fs.Size(), int64(0))
}

func (suite *blockCacheTestSuite) TestDisableRandomWriteExistingFile() {
func (suite *blockCacheTestSuite) TestRandomWriteExistingFile() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)

path := "testDisableRandomWriteExistingFile"
path := "testRandomWriteExistingFile"
storagePath := filepath.Join(tobj.fake_storage_path, path)

// write using block cache
Expand Down Expand Up @@ -1693,8 +1691,16 @@ func (suite *blockCacheTestSuite) TestDisableRandomWriteExistingFile() {

// write randomly in new handle at offset 2MB
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: nh, Offset: int64(2 * _1MB), Data: dataBuff[0:10]})
suite.assert.NotNil(err)
suite.assert.Equal(n, 0)
suite.assert.Nil(err)
suite.assert.Equal(n, 10)
suite.assert.True(nh.Dirty())

err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: nh})
suite.assert.Nil(err)

fs, err = os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(5*_1MB))
}

func (suite *blockCacheTestSuite) TestPreventRaceCondition() {
Expand Down Expand Up @@ -2055,7 +2061,7 @@ func (suite *blockCacheTestSuite) TestBlockParallelReadAndWriteValidation() {
}

func (suite *blockCacheTestSuite) TestBlockOverwriteValidation() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down Expand Up @@ -2152,7 +2158,7 @@ func (suite *blockCacheTestSuite) TestBlockOverwriteValidation() {
}

func (suite *blockCacheTestSuite) TestBlockFailOverwrite() {
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10\n enable-random-write: true"
cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10"
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

Expand Down
1 change: 0 additions & 1 deletion testdata/config/azure_block_perf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ block_cache:
parallelism: 128
disk-timeout-sec: 200
prefetch-on-open: true
enable-random-write: true

attr_cache:
timeout-sec: 7200
Expand Down

0 comments on commit f649724

Please sign in to comment.