diff --git a/kaiax/compress/impl/compress.go b/kaiax/compress/impl/compress.go index 3f0222434..757c748d8 100644 --- a/kaiax/compress/impl/compress.go +++ b/kaiax/compress/impl/compress.go @@ -17,6 +17,7 @@ package compress import ( + "bytes" "errors" "fmt" "path/filepath" @@ -119,6 +120,43 @@ func (c *CompressModule) FindReceiptsFromChunkWithBlkHash(dbm database.DBManager return findReceiptsFromChunkWithBlkHash(dbm, number, hash) } +func (c *CompressModule) restoreFragmentByRewind() { + for _, compressTyp := range []CompressionType{HeaderCompressType, BodyCompressType, ReceiptCompressType} { + var ( + lastCompressDeleteKeyPrefix, lastCompressDeleteValuePrefix = getLsatCompressionDeleteKeyPrefix(compressTyp), getLsatCompressionDeleteValuePrefix(compressTyp) + miscDB = c.Dbm.GetMiscDB() + ) + key, err := miscDB.Get(lastCompressDeleteKeyPrefix) + if err != nil { + return + } + value, err := miscDB.Get(lastCompressDeleteValuePrefix) + if err != nil { + return + } + if bytes.Equal(key, lastCompressionCleared) && bytes.Equal(value, lastCompressionCleared) { + // No reserved last compression data. Noting to do + return + } + + var ( + db = getCompressDB(c.Dbm, compressTyp) + from, to = parseCompressKey(compressTyp, key) + ) + // 1. restore last compression data + if err := db.Put(key, value); err != nil { + logger.Crit(fmt.Sprintf("Failed to restore last compressed data. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to)) + } + // 2. clear last compression data + if err := miscDB.Put(lastCompressDeleteKeyPrefix, lastCompressionCleared); err != nil { + logger.Crit("Failed to clear last decompression key") + } + if err := miscDB.Put(lastCompressDeleteValuePrefix, lastCompressionCleared); err != nil { + logger.Crit("Failed to clear last decompression value") + } + } +} + // TODO-hyunsooda: Move to `compress_test.go` func (c *CompressModule) testCopyOriginData(compressTyp CompressionType, copyTestDB database.Batch, from, to uint64) { // Copy origin diff --git a/kaiax/compress/impl/compress_test.go b/kaiax/compress/impl/compress_test.go index 77f5f3795..096f03ff2 100644 --- a/kaiax/compress/impl/compress_test.go +++ b/kaiax/compress/impl/compress_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/kaiachain/kaia/blockchain/types" + "github.com/stretchr/testify/assert" ) func TestCompressStorage(t *testing.T) { @@ -86,6 +87,27 @@ func TestRewind(t *testing.T) { // 3. Restart the compression module // - start compression from block number 50 to 100 + /* + [Phase1: Setup] + Compression completed + 0 ------------ 50 ------------ 99 + ^ + next compression block number + Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10 + + [Phase2: Rewind] + Once `setHead` is invoked, + 0 ------------ 50 ---- 55 + ^ + next compression block number + Chunks = C1|C2|C3|C4|C5 + + [Phase3: Compress again] + compressed data range 50-59 is restored and Sync is started from 55. Finally, + 0 ------------ 50 ------------ 99 + Chunks = C1|C2|C3|C4|C5|C6|C7|C8|C9|C10 + */ + for i := nBlocks - 1; i >= setHeadTo; i-- { num := uint64(i) hash := dbm.ReadCanonicalHash(num) @@ -103,6 +125,7 @@ func TestRewind(t *testing.T) { newBodies []*types.Body newReceipts []types.Receipts ) + for i := setHeadTo; i < nBlocks; i++ { h := genHeader() h.Number = big.NewInt(int64(i)) @@ -122,8 +145,18 @@ func TestRewind(t *testing.T) { canonicalBodies := append(bodies[:setHeadTo], newBodies[:]...) canonicalReceipts := append(receipts[:setHeadTo], newReceipts[:]...) - go mCompress.Compress() + // expected next compression block number should be equal or less than `setHeadTo`. + // Given the value of `setHeadTo` is 55 and chunk size is 10, + // The rewinded next compression block number should be 50. + nextCompressionNumber := readSubsequentCompressionBlkNumber(dbm, HeaderCompressType) + assert.Equal(t, int(nextCompressionNumber), setHeadTo-(setHeadTo%int(mCompress.getCompressChunk()))) + + go mCompress.Start() // fragment restore invoked before starting compression time.Sleep(time.Second) waitCompression(mCompress) checkCompressedIntegrity(t, dbm, 0, nBlocks-1, canonicalHeaders, canonicalBodies, canonicalReceipts, false) + + // Once completed the compression, next compression block number reaches to `nBlocks - 1` + nextCompressionNumber = readSubsequentCompressionBlkNumber(dbm, HeaderCompressType) + assert.Equal(t, int(nextCompressionNumber), nBlocks-1) } diff --git a/kaiax/compress/impl/db.go b/kaiax/compress/impl/db.go index fd18a1347..492e2b9e9 100644 --- a/kaiax/compress/impl/db.go +++ b/kaiax/compress/impl/db.go @@ -311,6 +311,15 @@ func deleteDataFromChunk(dbm database.DBManager, compressTyp CompressionType, nu return 0, nil } if from <= number && number <= to { + // Temporally store compression to MiscDB to restore `from` to setHead block + miscDB := dbm.GetMiscDB() + lastCompressDeleteKeyPrefix, lastCompressDeleteValuePrefix := getLsatCompressionDeleteKeyPrefix(compressTyp), getLsatCompressionDeleteValuePrefix(compressTyp) + if err := miscDB.Put(lastCompressDeleteKeyPrefix, it.Key()); err != nil { + logger.Crit(fmt.Sprintf("Failed to store temporal compressed data by rewind. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to)) + } + if err := miscDB.Put(lastCompressDeleteValuePrefix, it.Value()); err != nil { + logger.Crit(fmt.Sprintf("Failed to store temporal compressed data by rewind. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to)) + } // delete compression and return the starting number so that the compression moduel can start work from there if err := db.Delete(it.Key()); err != nil { logger.Crit(fmt.Sprintf("Failed to delete compressed data. err(%s) type(%s), from=%d, to=%d", err.Error(), compressTyp.String(), from, to)) diff --git a/kaiax/compress/impl/init.go b/kaiax/compress/impl/init.go index c8ff7dcb4..5c3437169 100644 --- a/kaiax/compress/impl/init.go +++ b/kaiax/compress/impl/init.go @@ -154,6 +154,7 @@ func (c *CompressModule) Init(opts *InitOpts) error { func (c *CompressModule) Start() error { logger.Info("[Compression] Compression started") + c.restoreFragmentByRewind() go c.Compress() return nil } diff --git a/kaiax/compress/impl/schema.go b/kaiax/compress/impl/schema.go index 7ba6a537a..f51bf2b92 100644 --- a/kaiax/compress/impl/schema.go +++ b/kaiax/compress/impl/schema.go @@ -40,6 +40,14 @@ var ( compressReceiptPrefix = []byte("CompressdReceipt-") compressBodyPrefix = []byte("CompressdBody-") + lastHeaderCompressionDeleteKey = []byte("LastHeaderCompressionDeleteKey") + lastHeaderCompressionDeleteValue = []byte("LastHeaderCompressionDeleteValue") + lastBodyCompressionDeleteKey = []byte("LastBodyCompressionDeleteKey") + lastBodyCompressionDeleteValue = []byte("LastBodyCompressionDeleteValue") + lastReceiptsCompressionDeleteKey = []byte("LastReceiptsCompressionDeleteKey") + lastReceiptsCompressionDeleteValue = []byte("LastReceiptsCompressionDeleteValue") + lastCompressionCleared = []byte("NoLastCompressionkeyReserved") + // Create a writer that caches compressors. // For this operation type we supply a nil Reader. encoder, _ = zstd.NewWriter(nil) @@ -59,6 +67,32 @@ func (typ CompressionType) String() string { } } +func getLsatCompressionDeleteKeyPrefix(typ CompressionType) []byte { + switch typ { + case HeaderCompressType: + return lastHeaderCompressionDeleteKey + case BodyCompressType: + return lastBodyCompressionDeleteKey + case ReceiptCompressType: + return lastReceiptsCompressionDeleteKey + default: + panic("unreacahble") + } +} + +func getLsatCompressionDeleteValuePrefix(typ CompressionType) []byte { + switch typ { + case HeaderCompressType: + return lastHeaderCompressionDeleteValue + case BodyCompressType: + return lastBodyCompressionDeleteValue + case ReceiptCompressType: + return lastReceiptsCompressionDeleteValue + default: + panic("unreacahble") + } +} + // Compressed range is represented as `to-from` func getCompressKey(typ CompressionType, from, to uint64) []byte { bFrom, bTo := make([]byte, 8), make([]byte, 8) diff --git a/kaiax/compress/impl/util_test.go b/kaiax/compress/impl/util_test.go index c89c12327..e61d93ea1 100644 --- a/kaiax/compress/impl/util_test.go +++ b/kaiax/compress/impl/util_test.go @@ -479,9 +479,10 @@ func runCompress(t *testing.T, nBlocks int) (*CompressModule, database.DBManager }) ) assert.Nil(t, err) + dbm.SetCompressModule(mCompress) headers, bodies, receipts := readOriginData(t, dbm, nBlocks) mCompress.setCompressChunk(10) - go mCompress.Compress() + go mCompress.Start() waitCompression(mCompress) return mCompress, dbm, headers, bodies, receipts }