Skip to content

Commit

Permalink
chore: sync merge
Browse files Browse the repository at this point in the history
  • Loading branch information
troykessler committed Nov 13, 2024
2 parents 72d5e82 + bb1f6d6 commit e2183b7
Show file tree
Hide file tree
Showing 30 changed files with 519 additions and 301 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: ci

on:
pull_request:

jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build and Test Docker Image
uses: docker/build-push-action@v6
with:
platforms: linux/amd64
tags: ksync:test
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM kyve/ksync-e2e-tests:latest

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .

RUN make build
RUN bats -T --print-output-on-failure --verbose-run tests
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ GO_VERSION := $(shell go version | cut -c 14- | cut -d' ' -f1 | cut -d'.' -f1,2)
build: ensure_version
go build -mod=readonly -o ./build/ksync ./cmd/ksync/main.go

###############################################################################
### Tests ###
###############################################################################

test:
docker build --platform linux/amd64 --no-cache -t ksync:test .

###############################################################################
### Checks ###
###############################################################################

ensure_version:
ifneq ($(GO_VERSION),1.22)
$(error ❌ Please run Go v1.22.x..)
endif
endif
51 changes: 20 additions & 31 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/KYVENetwork/ksync/bootstrap"
"github.com/KYVENetwork/ksync/types"
"github.com/KYVENetwork/ksync/utils"
"os"
"strings"
"time"
)
Expand All @@ -16,40 +15,31 @@ var (
logger = utils.KsyncLogger("block-sync")
)

func StartBlockSync(engine types.Engine, chainRest, storageRest string, blockRpcConfig *types.BlockRpcConfig, poolId *int64, targetHeight int64, backupCfg *types.BackupConfig) error {
return StartDBExecutor(engine, chainRest, storageRest, blockRpcConfig, poolId, targetHeight, 0, 0, false, false, backupCfg)
}

func PerformBlockSyncValidationChecks(engine types.Engine, chainRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, targetHeight int64, checkEndHeight, userInput bool) (continuationHeight int64, err error) {
continuationHeight, err = engine.GetContinuationHeight()
if err != nil {
return continuationHeight, fmt.Errorf("failed to get continuation height from engine: %w", err)
}

func PerformBlockSyncValidationChecks(chainRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, continuationHeight, targetHeight int64, checkEndHeight, userInput bool) error {
logger.Info().Msg(fmt.Sprintf("loaded current block height of node: %d", continuationHeight-1))

// perform boundary checks
_, startHeight, endHeight, err := helpers.GetBlockBoundaries(chainRest, blockRpcConfig, blockPoolId)
if err != nil {
return continuationHeight, fmt.Errorf("failed to get block boundaries: %w", err)
return fmt.Errorf("failed to get block boundaries: %w", err)
}

logger.Info().Msg(fmt.Sprintf("retrieved block boundaries, earliest block height = %d, latest block height %d", startHeight, endHeight))

if continuationHeight < startHeight {
return continuationHeight, fmt.Errorf("app is currently at height %d but first available block on pool is %d", continuationHeight, startHeight)
return fmt.Errorf("app is currently at height %d but first available block on pool is %d", continuationHeight, startHeight)
}

if continuationHeight > endHeight {
return continuationHeight, fmt.Errorf("app is currently at height %d but last available block on pool is %d", continuationHeight, endHeight)
return fmt.Errorf("app is currently at height %d but last available block on pool is %d", continuationHeight, endHeight)
}

if targetHeight > 0 && continuationHeight > targetHeight {
return continuationHeight, fmt.Errorf("requested target height is %d but app is already at block height %d", targetHeight, continuationHeight)
return fmt.Errorf("requested target height is %d but app is already at block height %d", targetHeight, continuationHeight)
}

if checkEndHeight && targetHeight > 0 && targetHeight > endHeight {
return continuationHeight, fmt.Errorf("requested target height is %d but last available block on pool is %d", targetHeight, endHeight)
return fmt.Errorf("requested target height is %d but last available block on pool is %d", targetHeight, endHeight)
}

if targetHeight == 0 {
Expand All @@ -66,34 +56,32 @@ func PerformBlockSyncValidationChecks(engine types.Engine, chainRest string, blo
}

if _, err := fmt.Scan(&answer); err != nil {
return continuationHeight, fmt.Errorf("failed to read in user input: %s", err)
return fmt.Errorf("failed to read in user input: %s", err)
}

if strings.ToLower(answer) != "y" {
return continuationHeight, errors.New("aborted block-sync")
return errors.New("aborted block-sync")
}
}

return
return nil
}

func StartBlockSyncWithBinary(engine types.Engine, binaryPath, homePath, chainId, chainRest, storageRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, targetHeight int64, backupCfg *types.BackupConfig, appFlags string, rpcServer, optOut, debug bool) {
func StartBlockSyncWithBinary(engine types.Engine, binaryPath, homePath, chainId, chainRest, storageRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, targetHeight int64, backupCfg *types.BackupConfig, appFlags string, rpcServer, optOut, debug bool) error {
logger.Info().Msg("starting block-sync")

if err := bootstrap.StartBootstrapWithBinary(engine, binaryPath, homePath, chainRest, storageRest, blockRpcConfig, blockPoolId, appFlags, debug); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to bootstrap node: %s", err))
os.Exit(1)
return fmt.Errorf("failed to bootstrap node: %w", err)
}

// start binary process thread
processId, err := utils.StartBinaryProcessForDB(engine, binaryPath, debug, strings.Split(appFlags, ","))
if err != nil {
panic(err)
return fmt.Errorf("failed to start binary process: %w", err)
}

if err := engine.OpenDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
return fmt.Errorf("failed to open dbs in engine: %w", err)
}

if rpcServer {
Expand All @@ -107,29 +95,30 @@ func StartBlockSyncWithBinary(engine types.Engine, binaryPath, homePath, chainId
currentHeight := engine.GetHeight()

// db executes blocks against app until target height is reached
if err := StartBlockSync(engine, chainRest, storageRest, blockRpcConfig, blockPoolId, targetHeight, backupCfg); err != nil {
if err := StartBlockSyncExecutor(engine, chainRest, storageRest, blockRpcConfig, blockPoolId, targetHeight, 0, 0, false, false, backupCfg); err != nil {
logger.Error().Msg(fmt.Sprintf("%s", err))

// stop binary process thread
if err := utils.StopProcessByProcessId(processId); err != nil {
panic(err)
return fmt.Errorf("failed to stop process by process id: %w", err)
}
os.Exit(1)

return fmt.Errorf("failed to start block-sync executor: %w", err)
}

elapsed := time.Since(start).Seconds()
utils.TrackSyncCompletedEvent(0, targetHeight-currentHeight, targetHeight, elapsed, optOut)

// stop binary process thread
if err := utils.StopProcessByProcessId(processId); err != nil {
panic(err)
return fmt.Errorf("failed to stop process by process id: %w", err)
}

if err := engine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
os.Exit(1)
return fmt.Errorf("failed to close dbs in engine: %w", err)
}

logger.Info().Msg(fmt.Sprintf("block-synced from %d to %d (%d blocks) in %.2f seconds", currentHeight, targetHeight, targetHeight-currentHeight, elapsed))
logger.Info().Msg(fmt.Sprintf("successfully finished block-sync"))
return nil
}
2 changes: 1 addition & 1 deletion blocksync/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
errorCh = make(chan error)
)

func StartDBExecutor(engine types.Engine, chainRest, storageRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, targetHeight int64, snapshotPoolId, snapshotInterval int64, pruning, skipWaiting bool, backupCfg *types.BackupConfig) error {
func StartBlockSyncExecutor(engine types.Engine, chainRest, storageRest string, blockRpcConfig *types.BlockRpcConfig, blockPoolId *int64, targetHeight int64, snapshotPoolId, snapshotInterval int64, pruning, skipWaiting bool, backupCfg *types.BackupConfig) error {
continuationHeight, err := engine.GetContinuationHeight()
if err != nil {
return fmt.Errorf("failed to get continuation height from engine: %w", err)
Expand Down
23 changes: 11 additions & 12 deletions cmd/ksync/commands/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ func init() {

backupCmd.Flags().BoolVar(&optOut, "opt-out", false, "disable the collection of anonymous usage data")

rootCmd.AddCommand(backupCmd)
RootCmd.AddCommand(backupCmd)
}

var backupCmd = &cobra.Command{
Use: "backup",
Short: "Backup data directory",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
utils.TrackBackupEvent(backupCompression, backupKeepRecent, optOut)

// if no home path was given get the default one
Expand All @@ -42,45 +42,44 @@ var backupCmd = &cobra.Command{
// load tendermint config
config, err := tendermint_v34.LoadConfig(homePath)
if err != nil {
logger.Error().Str("err", err.Error()).Msg("failed to load config.toml")
return
return fmt.Errorf("failed to load config.toml: %w", err)
}

// load block store
blockStoreDB, blockStore, err := tendermint_v34.GetBlockstoreDBs(config)
defer blockStoreDB.Close()

if err != nil {
logger.Error().Str("err", err.Error()).Msg("failed to load blockstore db")
return
return fmt.Errorf("fail to load blockstore db: %w", err)
}

// load state store
stateDB, _, err := tendermint_v34.GetStateDBs(config)
defer stateDB.Close()

if err != nil {
logger.Error().Str("err", err.Error()).Msg("failed to load state db")
return
return fmt.Errorf("fail to load state db: %w", err)
}

// load genesis file
defaultDocProvider := nm.DefaultGenesisDocProviderFunc(config)
_, genDoc, err := nm.LoadStateFromDBOrGenesisDocProvider(stateDB, defaultDocProvider)
if err != nil {
return fmt.Errorf("fail to load state from database: %w", err)
}

// create backup config
backupCfg, err := backup.GetBackupConfig(homePath, 2, backupKeepRecent, backupCompression, backupDest)
if err != nil {
logger.Error().Str("err", err.Error()).Msg("failed to create backup config")
return
return fmt.Errorf("fail to load backup config: %w", err)
}

// create backup
if err = backup.CreateBackup(backupCfg, genDoc.ChainID, blockStore.Height(), false); err != nil {
logger.Error().Str("err", err.Error()).Msg("failed to create backup")
return
return fmt.Errorf("fail to create backup: %w", err)
}

logger.Info().Int64("height", blockStore.Height()).Msg("finished backup at block height")
return nil
},
}
57 changes: 29 additions & 28 deletions cmd/ksync/commands/blocksync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package commands

import (
"errors"
"fmt"
"github.com/KYVENetwork/ksync/backup"
"github.com/KYVENetwork/ksync/blocksync"
"github.com/KYVENetwork/ksync/engines"
"github.com/KYVENetwork/ksync/sources"
"github.com/KYVENetwork/ksync/utils"
"github.com/spf13/cobra"
"os"
"strings"
)

Expand Down Expand Up @@ -46,88 +46,89 @@ func init() {
blockSyncCmd.Flags().BoolVarP(&debug, "debug", "d", false, "show logs from tendermint app")
blockSyncCmd.Flags().BoolVarP(&y, "yes", "y", false, "automatically answer yes for all questions")

rootCmd.AddCommand(blockSyncCmd)
RootCmd.AddCommand(blockSyncCmd)
}

var blockSyncCmd = &cobra.Command{
Use: "block-sync",
Short: "Start fast syncing blocks with KSYNC",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
chainRest = utils.GetChainRest(chainId, chainRest)
storageRest = strings.TrimSuffix(storageRest, "/")

// if no binary was provided at least the home path needs to be defined
if binaryPath == "" && homePath == "" {
logger.Error().Msg(fmt.Sprintf("flag 'home' is required"))
os.Exit(1)
return errors.New("flag 'home' is required")
}

if binaryPath == "" {
logger.Info().Msg("To start the syncing process, start your chain binary with --with-tendermint=false")
logger.Info().Msg("to start the syncing process, start your chain binary with --with-tendermint=false")
}

if homePath == "" {
homePath = utils.GetHomePathFromBinary(binaryPath)
logger.Info().Msgf("Loaded home path \"%s\" from binary path", homePath)
logger.Info().Msgf("loaded home path \"%s\" from binary path", homePath)
}

defaultEngine := engines.EngineFactory(engine, homePath, rpcServerPort)

if source == "" && blockPoolId == "" {
s, err := defaultEngine.GetChainId()
if err != nil {
logger.Error().Msgf("Failed to load chain-id from engine: %s", err.Error())
os.Exit(1)
return fmt.Errorf("failed to load chain-id from engine: %w", err)
}
source = s
logger.Info().Msgf("Loaded source \"%s\" from genesis file", source)
logger.Info().Msgf("loaded source \"%s\" from genesis file", source)
}

if engine == "" && binaryPath != "" {
engine = utils.GetEnginePathFromBinary(binaryPath)
logger.Info().Msgf("Loaded engine \"%s\" from binary path", engine)
logger.Info().Msgf("loaded engine \"%s\" from binary path", engine)
}

bId, _, err := sources.GetPoolIds(chainId, source, blockPoolId, "", registryUrl, true, false)
if err != nil {
logger.Error().Msg(fmt.Sprintf("failed to load pool-ids: %s", err))
os.Exit(1)
return fmt.Errorf("failed to load pool-ids: %w", err)
}

backupCfg, err := backup.GetBackupConfig(homePath, backupInterval, backupKeepRecent, backupCompression, backupDest)
if err != nil {
logger.Error().Str("err", err.Error()).Msg("could not get backup config")
return
return fmt.Errorf("could not get backup config: %w", err)
}

if reset {
if err := defaultEngine.ResetAll(true); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to reset tendermint application: %s", err))
os.Exit(1)
return fmt.Errorf("could not reset tendermint application: %w", err)
}
}

if err := defaultEngine.OpenDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
return fmt.Errorf("failed to open dbs in engine: %w", err)
}

// perform validation checks before booting state-sync process
continuationHeight, err := blocksync.PerformBlockSyncValidationChecks(defaultEngine, chainRest, nil, &bId, targetHeight, true, !y)
continuationHeight, err := defaultEngine.GetContinuationHeight()
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
return fmt.Errorf("failed to get continuation height: %w", err)
}

// perform validation checks before booting state-sync process
if err := blocksync.PerformBlockSyncValidationChecks(chainRest, nil, &bId, continuationHeight, targetHeight, true, !y); err != nil {
return fmt.Errorf("block-sync validation checks failed: %w", err)
}

if err := defaultEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
os.Exit(1)
return fmt.Errorf("failed to close dbs in engine: %w", err)
}

sources.IsBinaryRecommendedVersion(binaryPath, registryUrl, source, continuationHeight, !y)
if err := sources.IsBinaryRecommendedVersion(binaryPath, registryUrl, source, continuationHeight, !y); err != nil {
return fmt.Errorf("failed to check if binary has the recommended version: %w", err)
}

consensusEngine := engines.EngineSourceFactory(engine, homePath, registryUrl, source, rpcServerPort, continuationHeight)
consensusEngine, err := engines.EngineSourceFactory(engine, homePath, registryUrl, source, rpcServerPort, continuationHeight)
if err != nil {
return fmt.Errorf("failed to create consensus engine for source: %w", err)
}

blocksync.StartBlockSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, nil, &bId, targetHeight, backupCfg, appFlags, rpcServer, optOut, debug)
return blocksync.StartBlockSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, nil, &bId, targetHeight, backupCfg, appFlags, rpcServer, optOut, debug)
},
}
Loading

0 comments on commit e2183b7

Please sign in to comment.