Skip to content

Commit

Permalink
[Rewind] Restore behavior fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
hyunsooda committed Dec 17, 2024
1 parent 75c768b commit 315fdf7
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 2 deletions.
38 changes: 38 additions & 0 deletions kaiax/compress/impl/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package compress

import (
"bytes"
"errors"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -119,6 +120,43 @@ func (c *CompressModule) FindReceiptsFromChunkWithBlkHash(dbm database.DBManager
return findReceiptsFromChunkWithBlkHash(dbm, number, hash)
}

func (c *CompressModule) restoreFragmentByRewind() {
for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} {
var (
lastCompressDeleteKeyPrefix, lastCompressDeleteValuePrefix = getLsatCompressionDeleteKeyPrefix(compressTyp), getLsatCompressionDeleteValuePrefix(compressTyp)
miscDB = c.Dbm.GetMiscDB()
)
key, err := miscDB.Get(lastCompressDeleteKeyPrefix)
if err != nil {
return
}
value, err := miscDB.Get(lastCompressDeleteValuePrefix)
if err != nil {
return
}
if bytes.Equal(key, lastCompressionCleared) && bytes.Equal(value, lastCompressionCleared) {
// No reserved last compression data. Noting to do
return
}

var (
db = getCompressDB(c.Dbm, compressTyp)
from, to = parseCompressKey(compressTyp, key)
)
// 1. restore last compression data
if err := db.Put(key, value); err != nil {
logger.Crit(fmt.Sprintf("Failed to restore last compressed data. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to))
}
// 2. clear last compression data
if err := miscDB.Put(lastCompressDeleteKeyPrefix, lastCompressionCleared); err != nil {
logger.Crit("Failed to clear last decompression key")
}
if err := miscDB.Put(lastCompressDeleteValuePrefix, lastCompressionCleared); err != nil {
logger.Crit("Failed to clear last decompression value")
}
}
}

// TODO-hyunsooda: Move to `compress_test.go`
func (c *CompressModule) testCopyOriginData(compressTyp CompressionType, copyTestDB database.Batch, from, to uint64) {
// Copy origin
Expand Down
35 changes: 34 additions & 1 deletion kaiax/compress/impl/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/kaiachain/kaia/blockchain/types"
"github.com/stretchr/testify/assert"
)

func TestCompressStorage(t *testing.T) {
Expand Down Expand Up @@ -86,6 +87,27 @@ func TestRewind(t *testing.T) {
// 3. Restart the compression module
// - start compression from block number 50 to 100

/*
[Phase1: Setup]
Compression completed
0 ------------ 50 ------------ 99
^
next compression block number
Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10
[Phase2: Rewind]
Once `setHead` is invoked,
0 ------------ 50 ---- 55
^
next compression block number
Chunks = C1|C2|C3|C4|C5
[Phase3: Compress again]
compressed data range 50-59 is restored and Sync is started from 55. Finally,
0 ------------ 50 ------------ 99
Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10
*/

for i := nBlocks - 1; i >= setHeadTo; i-- {
num := uint64(i)
hash := dbm.ReadCanonicalHash(num)
Expand All @@ -103,6 +125,7 @@ func TestRewind(t *testing.T) {
newBodies []*types.Body
newReceipts []types.Receipts
)

for i := setHeadTo; i < nBlocks; i++ {
h := genHeader()
h.Number = big.NewInt(int64(i))
Expand All @@ -122,8 +145,18 @@ func TestRewind(t *testing.T) {
canonicalBodies := append(bodies[:setHeadTo], newBodies[:]...)
canonicalReceipts := append(receipts[:setHeadTo], newReceipts[:]...)

go mCompress.Compress()
// expected next compression block number should be equal or less than `setHeadTo`.
// Given the value of `setHeadTo` is 55 and chunk size is 10,
// The rewinded next compression block number should be 50.
nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, HeaderCompressType)
assert.Equal(t, int(nextCompressionNumber), setHeadTo-(setHeadTo%int(mCompress.getCompressChunk())))

go mCompress.Start() // fragment restore invoked before starting compression
time.Sleep(time.Second)
waitCompression(mCompress)
checkCompressedIntegrity(t, dbm, 0, nBlocks-1, canonicalHeaders, canonicalBodies, canonicalReceipts, false)

// Once completed the compression, next compression block number reaches to `nBlocks - 1`
nextCompressionNumber = readSubsequentCompressionBlkNumber(dbm, HeaderCompressType)
assert.Equal(t, int(nextCompressionNumber), nBlocks-1)
}
9 changes: 9 additions & 0 deletions kaiax/compress/impl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ func deleteDataFromChunk(dbm database.DBManager, compressTyp CompressionType, nu
return 0, nil
}
if from <= number && number <= to {
// Temporally store compression to MiscDB to restore `from` to setHead block
miscDB := dbm.GetMiscDB()
lastCompressDeleteKeyPrefix, lastCompressDeleteValuePrefix := getLsatCompressionDeleteKeyPrefix(compressTyp), getLsatCompressionDeleteValuePrefix(compressTyp)
if err := miscDB.Put(lastCompressDeleteKeyPrefix, it.Key()); err != nil {
logger.Crit(fmt.Sprintf("Failed to store temporal compressed data by rewind. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to))
}
if err := miscDB.Put(lastCompressDeleteValuePrefix, it.Value()); err != nil {
logger.Crit(fmt.Sprintf("Failed to store temporal compressed data by rewind. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to))
}
// delete compression and return the starting number so that the compression moduel can start work from there
if err := db.Delete(it.Key()); err != nil {
logger.Crit(fmt.Sprintf("Failed to delete compressed data. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to))
Expand Down
1 change: 1 addition & 0 deletions kaiax/compress/impl/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (c *CompressModule) Init(opts *InitOpts) error {

func (c *CompressModule) Start() error {
logger.Info("[Compression] Compression started")
c.restoreFragmentByRewind()
go c.Compress()
return nil
}
Expand Down
34 changes: 34 additions & 0 deletions kaiax/compress/impl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ var (
compressReceiptPrefix = []byte("CompressdReceipt-")
compressBodyPrefix = []byte("CompressdBody-")

lastHeaderCompressionDeleteKey = []byte("LastHeaderCompressionDeleteKey")
lastHeaderCompressionDeleteValue = []byte("LastHeaderCompressionDeleteValue")
lastBodyCompressionDeleteKey = []byte("LastBodyCompressionDeleteKey")
lastBodyCompressionDeleteValue = []byte("LastBodyCompressionDeleteValue")
lastReceiptsCompressionDeleteKey = []byte("LastReceiptsCompressionDeleteKey")
lastReceiptsCompressionDeleteValue = []byte("LastReceiptsCompressionDeleteValue")
lastCompressionCleared = []byte("NoLastCompressionkeyReserved")

// Create a writer that caches compressors.
// For this operation type we supply a nil Reader.
encoder, _ = zstd.NewWriter(nil)
Expand All @@ -59,6 +67,32 @@ func (typ CompressionType) String() string {
}
}

func getLsatCompressionDeleteKeyPrefix(typ CompressionType) []byte {
switch typ {
case HeaderCompressType:
return lastHeaderCompressionDeleteKey
case BodyCompressType:
return lastBodyCompressionDeleteKey
case ReceiptCompressType:
return lastReceiptsCompressionDeleteKey
default:
panic("unreacahble")
}
}

func getLsatCompressionDeleteValuePrefix(typ CompressionType) []byte {
switch typ {
case HeaderCompressType:
return lastHeaderCompressionDeleteValue
case BodyCompressType:
return lastBodyCompressionDeleteValue
case ReceiptCompressType:
return lastReceiptsCompressionDeleteValue
default:
panic("unreacahble")
}
}

// Compressed range is represented as `to-from`
func getCompressKey(typ CompressionType, from, to uint64) []byte {
bFrom, bTo := make([]byte, 8), make([]byte, 8)
Expand Down
3 changes: 2 additions & 1 deletion kaiax/compress/impl/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,10 @@ func runCompress(t *testing.T, nBlocks int) (*CompressModule, database.DBManager
})
)
assert.Nil(t, err)
dbm.SetCompressModule(mCompress)
headers, bodies, receipts := readOriginData(t, dbm, nBlocks)
mCompress.setCompressChunk(10)
go mCompress.Compress()
go mCompress.Start()
waitCompression(mCompress)
return mCompress, dbm, headers, bodies, receipts
}
Expand Down

0 comments on commit 315fdf7

Please sign in to comment.