Skip to content

Commit

Permalink
[Retention] (1) Added retention (2) Stop implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
hyunsooda committed Dec 18, 2024
1 parent 14757e7 commit 6831749
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 40 deletions.
15 changes: 8 additions & 7 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ const (
)

const (
DefaultTriesInMemory = 128
DefaultBlockInterval = 128
DefaultPruningRetention = 172800 // 2*params.DefaultStakeUpdateInterval
MaxPrefetchTxs = 20000
DefaultChunkBlockSize = uint64(10000)
MB_1 = uint64(1000000)
DefaultCompressChunkCap = MB_1 * 100 // 100MB
DefaultTriesInMemory = 128
DefaultBlockInterval = 128
DefaultPruningRetention = 172800 // 2*params.DefaultStakeUpdateInterval
MaxPrefetchTxs = 20000
MB_1 = uint64(1000000)
DefaultChunkBlockSize = uint64(10000)
DefaultCompressChunkCap = MB_1 * 100 // 100MB
DefaultCompressRetention = 86400

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
// Changelog:
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ func (kCfg *KaiaConfig) SetKaiaConfig(ctx *cli.Context, stack *node.Node) {

cfg.CompressBlockChunkSize = ctx.Uint64(CompressBlockChunkSizeFlag.Name)
cfg.CompressChunkCap = ctx.Uint64(CompressChunkCapFlag.Name)
cfg.CompressRetention = ctx.Uint64(CompressRetentionFlag.Name)

if gcmode := ctx.String(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" {
log.Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/flaggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ var FlagGroups = []FlagGroup{
DBNoPerformanceMetricsFlag,
CompressBlockChunkSizeFlag,
CompressChunkCapFlag,
CompressRetentionFlag,
TxPruningFlag,
TxPruningRetentionFlag,
ReceiptPruningFlag,
Expand Down
12 changes: 10 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,21 +489,29 @@ var (
Category: "DATABASE",
}
CompressBlockChunkSizeFlag = &cli.Uint64Flag{
Name: "database.compress-check-block-size",
Name: "database.compress-chunk-block-size",
Usage: "Number of blocks which chunk can accomodate",
Value: blockchain.DefaultChunkBlockSize,
Aliases: []string{},
EnvVars: []string{"KLAYTN_COMPRESS_BLOCK_CHUNK_SIZE", "KAIA_COMPRESS_BLOCK_CHUNK_SIZE"},
Category: "DATABASE",
}
CompressChunkCapFlag = &cli.Uint64Flag{
Name: "database.compress-check-cap",
Name: "database.compress-chunk-cap",
Usage: "Byte size which chunk can accomodate",
Value: blockchain.DefaultCompressChunkCap,
Aliases: []string{},
EnvVars: []string{"KLAYTN_COMPRESS_CHUNK_CAP", "KAIA_COMPRESS_CHUNK_CAP"},
Category: "DATABASE",
}
CompressRetentionFlag = &cli.Uint64Flag{
Name: "database.compress-retention",
Usage: "Number of blocks from the latest block where compression should not be performed",
Value: blockchain.DefaultCompressRetention,
Aliases: []string{},
EnvVars: []string{"KLAYTN_COMPRESS_RETENTIONP", "KAIA_COMPRESS_RETENTION"},
Category: "DATABASE",
}
SenderTxHashIndexingFlag = &cli.BoolFlag{
Name: "sendertxhashindexing",
Usage: "Enables storing mapping information of senderTxHash to txHash",
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/nodeflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ var CommonNodeFlags = []cli.Flag{
altsrc.NewBoolFlag(LevelDBNoBufferPoolFlag),
altsrc.NewUint64Flag(CompressBlockChunkSizeFlag),
altsrc.NewUint64Flag(CompressChunkCapFlag),
altsrc.NewUint64Flag(CompressRetentionFlag),
altsrc.NewBoolFlag(DBNoPerformanceMetricsFlag),
altsrc.NewBoolFlag(RocksDBSecondaryFlag),
altsrc.NewUint64Flag(RocksDBCacheSizeFlag),
Expand Down
85 changes: 70 additions & 15 deletions kaiax/compress/impl/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ import (
"github.com/stretchr/testify/assert"
)

const SEC_TEN = time.Second * 10

func (c *CompressModule) stopCompress() {
for range allCompressTypes {
c.terminateCompress <- struct{}{}
}
for {
if len(c.terminateCompress) == 0 {
for _, compressTyp := range allCompressTypes {
c.setIdleState(compressTyp, nil)
}
return
}
}
}

func (c *CompressModule) Compress() {
go c.compressHeader()
go c.compressBody()
Expand All @@ -52,30 +68,68 @@ func (c *CompressModule) compressReceipts() {
c.compress(ReceiptCompressType, compressReceipts)
}

func (c *CompressModule) idle(compressTyp CompressionType, nBlocks uint64) bool {
idle := false
for {
select {
case <-c.terminateCompress:
logger.Info("[Compression] Stop signal received", "type", compressTyp.String())
return true
case <-time.After(SEC_TEN):
return false
default:
if !idle {
idealIdleTime := time.Second * time.Duration(nBlocks)
c.setIdleState(compressTyp, &IdleState{true, idealIdleTime})
logger.Info("[Compression] Enter idle state", "type", compressTyp.String(), "idle", SEC_TEN, "ideal idle time", idealIdleTime)
idle = true
}
}
}
}

func (c *CompressModule) compress(compressTyp CompressionType, compressFn CompressFn) {
var (
SEC_TEN = time.Second * 10
totalChunks = 0
)
totalChunks := 0
for {
select {
case <-c.terminateCompress:
logger.Info("[Compression] Stop signal received", "type", compressTyp.String())
return
default:
}

var (
curBlkNum = c.Chain.CurrentBlock().NumberU64()
residualBlkCnt = curBlkNum % c.getCompressChunk()
nextCompressionBlkNum = readSubsequentCompressionBlkNumber(c.Dbm, compressTyp)
curBlkNum = c.Chain.CurrentBlock().NumberU64()
residualBlkCnt = curBlkNum % c.getCompressChunk()
nextCompressionBlkNum = readSubsequentCompressionBlkNumber(c.Dbm, compressTyp)
nextCompressionDistance = curBlkNum - nextCompressionBlkNum
// Do not wait if next compression block number is far awway. Start migration right now
noWait = curBlkNum > nextCompressionBlkNum && curBlkNum-nextCompressionBlkNum > c.getCompressChunk()
noWait = curBlkNum > nextCompressionBlkNum && nextCompressionDistance > c.getCompressChunk()
originFrom = readSubsequentCompressionBlkNumber(c.Dbm, compressTyp)
from = originFrom
)
// 1. Idle check
if curBlkNum < c.getCompressChunk() || (residualBlkCnt != 0 && !noWait) {
idealIdleTime := time.Second * time.Duration(c.getCompressChunk()-residualBlkCnt)
c.setIdleState(compressTyp, &IdleState{true, idealIdleTime})
logger.Info("[Compression] Enter idle state", "type", compressTyp.String(), "idle", SEC_TEN, "ideal idle time", idealIdleTime)
time.Sleep(SEC_TEN)
if c.idle(compressTyp, c.getCompressChunk()-residualBlkCnt) {
return
}
continue
}
if c.getCompressRetention() > nextCompressionDistance {
if c.idle(compressTyp, c.getCompressRetention()-nextCompressionDistance) {
return
}
continue
}
c.setIdleState(compressTyp, nil)
// 2. Main loop (compression)
for {
select {
case <-c.terminateCompress:
logger.Info("[Compression] Stop signal received", "type", compressTyp.String())
return
default:
}
subsequentBlkNumber, compressedSize, err := compressFn(c.Dbm, from, 0, curBlkNum, c.getCompressChunk(), c.getChunkCap(), true)
if err != nil {
logger.Warn("[Compression] failed to compress chunk", "type", compressTyp.String(), "err", err)
Expand All @@ -88,6 +142,7 @@ func (c *CompressModule) compress(compressTyp CompressionType, compressFn Compre
from = subsequentBlkNumber
totalChunks++
}

}
}

Expand Down Expand Up @@ -128,7 +183,7 @@ func (c *CompressModule) FindReceiptsFromChunkWithBlkHash(dbm database.DBManager
}

func (c *CompressModule) restoreFragmentByRewind() {
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
var (
lastCompressDeleteKeyPrefix, lastCompressDeleteValuePrefix = getLsatCompressionDeleteKeyPrefix(compressTyp), getLsatCompressionDeleteValuePrefix(compressTyp)
miscDB = c.Dbm.GetMiscDB()
Expand Down Expand Up @@ -297,7 +352,7 @@ func (c *CompressModule) testCompressPerformance(from, to uint64) error {
totalBodyFinderElapsedTime time.Duration
totalReceiptsFinderElapsedTime time.Duration
)
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
from = originFrom
writeSubsequentCompressionBlkNumber(c.Dbm, compressTyp, from)
for {
Expand Down Expand Up @@ -415,7 +470,7 @@ func TestCompressFinder(t *testing.T, setup func(t *testing.T) (*blockchain.Bloc
originBodyElapsed time.Duration
originReceiptsElapsed time.Duration
)
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
var (
r = rand.Uint64() % (5000)
hash = mCompress.Dbm.ReadCanonicalHash(r)
Expand Down
62 changes: 52 additions & 10 deletions kaiax/compress/impl/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"math/big"
"testing"

"github.com/kaiachain/kaia/blockchain"
"github.com/kaiachain/kaia/blockchain/types"
"github.com/stretchr/testify/assert"
)

func TestCompressStorage(t *testing.T) {
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
switch compressTyp {
case HeaderCompressType:
testHeaderCompress(t)
Expand All @@ -38,7 +39,7 @@ func TestCompressStorage(t *testing.T) {
}

func TestDecompressStorage(t *testing.T) {
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
switch compressTyp {
case HeaderCompressType:
testHeaderDecompress(t)
Expand All @@ -51,7 +52,7 @@ func TestDecompressStorage(t *testing.T) {
}

func TestDeleteStorage(t *testing.T) {
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
for _, compressTyp := range allCompressTypes {
switch compressTyp {
case HeaderCompressType:
testCompressedHeaderDelete(t)
Expand Down Expand Up @@ -88,14 +89,14 @@ func TestRewind(t *testing.T) {
next compression block number
Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10
[Phase2: Rewind]
[Phase2: Rewind] (`Stop` called)
Once `setHead` is invoked,
0 ------------ 50 ---- 55
^
next compression block number
Chunks = C1|C2|C3|C4|C5
[Phase3: Compress again]
[Phase3: Compress again] (`Start` called)
compressed data range 50-59 is restored and Sync is started from 55. Finally,
0 ------------ 50 ------------ 99
Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10
Expand All @@ -107,6 +108,7 @@ func TestRewind(t *testing.T) {
mCompress, dbm, headers, bodies, receipts = runCompress(t, nBlocks)
)

mCompress.Stop()
for i := nBlocks - 1; i >= setHeadTo; i-- {
num := uint64(i)
hash := dbm.ReadCanonicalHash(num)
Expand Down Expand Up @@ -147,14 +149,54 @@ func TestRewind(t *testing.T) {
// expected next compression block number should be equal or less than `setHeadTo`.
// Given the value of `setHeadTo` is 55 and chunk size is 10,
// The rewinded next compression block number should be 50.
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, HeaderCompressType)
assert.Equal(t, int(nextCompressionNumber), setHeadTo-(setHeadTo%int(mCompress.getCompressChunk())))
for _, compressTyp := range allCompressTypes {
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, compressTyp)
assert.Equal(t, int(nextCompressionNumber), setHeadTo-(setHeadTo%int(mCompress.getCompressChunk())))
}

go mCompress.Start() // fragment restore invoked before starting compression
mCompress.Start() // fragment restore invoked before starting compression
waitCompression(mCompress)
checkCompressedIntegrity(t, dbm, 0, nBlocks-1, canonicalHeaders, canonicalBodies, canonicalReceipts, false)

// Once completed the compression, next compression block number reaches to `nBlocks - 1`
nextCompressionNumber = readSubsequentCompressionBlkNumber(dbm, HeaderCompressType)
assert.Equal(t, int(nextCompressionNumber), nBlocks-1)
for _, compressTyp := range allCompressTypes {
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, compressTyp)
assert.Equal(t, int(nextCompressionNumber), nBlocks-1)
}
}

func TestRetention(t *testing.T) {
var (
nBlocks = 100
chain, dbm = initMock(t, nBlocks)
mCompress = NewCompression()
err = mCompress.Init(&InitOpts{
ChunkBlockSize: blockchain.DefaultChunkBlockSize,
ChunkCap: blockchain.DefaultCompressChunkCap,
Chain: chain,
Dbm: dbm,
})
)
assert.Nil(t, err)
dbm.SetCompressModule(mCompress)
mCompress.setCompressChunk(10)
mCompress.setCompressRetention(uint64(nBlocks))
go mCompress.Start()
waitCompression(mCompress)

// compression is not performed by retention
for _, compressTyp := range allCompressTypes {
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, compressTyp)
assert.Equal(t, nextCompressionNumber, uint64(0))
}
mCompress.Stop()

// compress work by reset retention
mCompress.setCompressRetention(0)
mCompress.Start()
waitCompression(mCompress)
for _, compressTyp := range allCompressTypes {
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, compressTyp)
assert.Equal(t, int(nextCompressionNumber), nBlocks-1)
}
}
Loading

0 comments on commit 6831749

Please sign in to comment.