Skip to content

Commit

Permalink
Merge pull request #5650 from onflow/yurii/5136-querying-kvstore
Browse files Browse the repository at this point in the history
[Dynamic Protocol State] Querying of KV Store using `Snapshot` API
  • Loading branch information
durkmurder authored Apr 12, 2024
2 parents 01fcf31 + 3ed352f commit 83ef8ad
Show file tree
Hide file tree
Showing 70 changed files with 612 additions and 430 deletions.
12 changes: 7 additions & 5 deletions cmd/bootstrap/cmd/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/signature"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/state/protocol/protocol_state/kvstore"
)

// constructRootHeader constructs a header for the root block.
Expand All @@ -28,11 +29,12 @@ func constructRootBlock(rootHeader *flow.Header, setup *flow.EpochSetup, commit
Payload: nil,
}
block.SetPayload(flow.Payload{
Guarantees: nil,
Seals: nil,
Receipts: nil,
Results: nil,
ProtocolStateID: inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID(),
Guarantees: nil,
Seals: nil,
Receipts: nil,
Results: nil,
// TODO: shortcut in bootstrapping; we will probably have to start with a non-empty KV store in the future
ProtocolStateID: kvstore.NewDefaultKVStore(inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID()).ID(),
})
return block
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func main() {
}).
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
mutableProtocolState := protocol_state.NewMutableProtocolState(
node.Storage.ProtocolState,
node.Storage.EpochProtocolState,
node.Storage.ProtocolKVStore,
node.State.Params(),
node.Storage.Headers,
Expand Down
6 changes: 3 additions & 3 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func (fnb *FlowNodeBuilder) initStorage() error {
Setups: setups,
EpochCommits: epochCommits,
VersionBeacons: versionBeacons,
ProtocolState: protocolState,
EpochProtocolState: protocolState,
ProtocolKVStore: protocolKVStores,
Commits: commits,
}
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (fnb *FlowNodeBuilder) initState() error {
fnb.Storage.QuorumCertificates,
fnb.Storage.Setups,
fnb.Storage.EpochCommits,
fnb.Storage.ProtocolState,
fnb.Storage.EpochProtocolState,
fnb.Storage.ProtocolKVStore,
fnb.Storage.VersionBeacons,
)
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func (fnb *FlowNodeBuilder) initState() error {
fnb.Storage.QuorumCertificates,
fnb.Storage.Setups,
fnb.Storage.EpochCommits,
fnb.Storage.ProtocolState,
fnb.Storage.EpochProtocolState,
fnb.Storage.ProtocolKVStore,
fnb.Storage.VersionBeacons,
fnb.RootSnapshot,
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/common/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func InitProtocolState(db *badger.DB, storages *storage.All) (protocol.State, er
storages.QuorumCertificates,
storages.Setups,
storages.EpochCommits,
storages.ProtocolState,
storages.EpochProtocolState,
storages.ProtocolKVStore,
storages.VersionBeacons,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/read-badger/cmd/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var protocolStateCmd = &cobra.Command{
}

log.Info().Msgf("getting protocol state by block id: %v", blockID)
protocolState, err := storages.ProtocolState.ByBlockID(blockID)
protocolState, err := storages.EpochProtocolState.ByBlockID(blockID)
if err != nil {
log.Error().Err(err).Msgf("could not get protocol state for block id: %v", blockID)
return
Expand Down
20 changes: 15 additions & 5 deletions consensus/integration/epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/state/protocol/protocol_state/kvstore"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/unittest"
)

// should be able to reach consensus when identity table contains nodes which are joining in next epoch.
func TestUnweightedNode(t *testing.T) {
//unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
// stop after building 2 blocks to ensure we can tolerate 0-weight (joining next
// epoch) identities, but don't cross an epoch boundary
stopper := NewStopper(2, 0)
Expand Down Expand Up @@ -245,22 +246,31 @@ func withNextEpoch(
encodableSnapshot.LatestSeal.ResultID = encodableSnapshot.LatestResult.ID()

// update protocol state
protocolState := encodableSnapshot.ProtocolState
epochProtocolState := encodableSnapshot.EpochProtocolState

// setup ID has changed, need to update it
convertedEpochSetup, _ := protocol.ToEpochSetup(inmem.NewEpoch(*currEpoch))
protocolState.CurrentEpoch.SetupID = convertedEpochSetup.ID()
epochProtocolState.CurrentEpoch.SetupID = convertedEpochSetup.ID()
// create next epoch protocol state
convertedEpochSetup, _ = protocol.ToEpochSetup(inmem.NewEpoch(*encodableSnapshot.Epochs.Next))
convertedEpochCommit, _ := protocol.ToEpochCommit(inmem.NewEpoch(*encodableSnapshot.Epochs.Next))
protocolState.NextEpoch = &flow.EpochStateContainer{
epochProtocolState.NextEpoch = &flow.EpochStateContainer{
SetupID: convertedEpochSetup.ID(),
CommitID: convertedEpochCommit.ID(),
ActiveIdentities: flow.DynamicIdentityEntryListFromIdentities(nextEpochIdentities),
}

// need to fix genesis block to contain the correct protocol state ID
encodableSnapshot.SealingSegment.Blocks[0].Payload.ProtocolStateID = protocolState.ID()
updatedKVStore := kvstore.NewDefaultKVStore(epochProtocolState.ID())
version, data, err := updatedKVStore.VersionedEncode()
if err != nil {
panic(err)
}
encodableSnapshot.KVStore = storage.KeyValueStoreData{
Version: version,
Data: data,
}
encodableSnapshot.SealingSegment.Blocks[0].Payload.ProtocolStateID = updatedKVStore.ID()

return inmem.SnapshotFromEncodable(encodableSnapshot)
}
4 changes: 3 additions & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/state/protocol/protocol_state/kvstore"
protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state/state"
"github.com/onflow/flow-go/state/protocol/util"
storage "github.com/onflow/flow-go/storage/badger"
Expand Down Expand Up @@ -280,7 +281,8 @@ func createRootBlockData(participantData *run.ParticipantData) (*flow.Block, *fl
},
)

root.SetPayload(flow.Payload{ProtocolStateID: inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID()})
epochProtocolStateID := inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID()
root.SetPayload(flow.Payload{ProtocolStateID: kvstore.NewDefaultKVStore(epochProtocolStateID).ID()})
result := unittest.BootstrapExecutionResultFixture(root, unittest.GenesisStateCommitment)
result.ServiceEvents = []flow.ServiceEvent{setup.ServiceEvent(), commit.ServiceEvent()}

Expand Down
3 changes: 1 addition & 2 deletions consensus/recovery/protocol/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import (
// as a consensus follower, when a block is received and saved,
// if it's not finalized yet, this block should be returned by latest
func TestSaveBlockAsReplica(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
participants := unittest.IdentityListFixture(5, unittest.WithAllRoles())
rootSnapshot := unittest.RootSnapshotFixture(participants)
protocolState, err := rootSnapshot.ProtocolState()
require.NoError(t, err)
rootProtocolStateID := protocolState.Entry().ID()
rootProtocolStateID := protocolState.ID()
b0, err := rootSnapshot.Head()
require.NoError(t, err)
util.RunWithFullProtocolState(t, rootSnapshot, func(db *badger.DB, state *protocol.ParticipantState) {
Expand Down
9 changes: 4 additions & 5 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type Suite struct {
}

func TestHandler(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
suite.Run(t, new(Suite))
}

Expand Down Expand Up @@ -527,7 +526,7 @@ func (suite *Suite) TestGetProtocolStateSnapshotByBlockID_BlockNotFinalizedAtHei
rootSnapshot := unittest.RootSnapshotFixture(identities)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(suite.T(), err)
rootProtocolStateID := rootProtocolState.Entry().ID()
rootProtocolStateID := rootProtocolState.ID()
util.RunWithFullProtocolState(suite.T(), rootSnapshot, func(db *badger.DB, state *bprotocol.ParticipantState) {
rootBlock, err := rootSnapshot.Head()
suite.Require().NoError(err)
Expand Down Expand Up @@ -566,7 +565,7 @@ func (suite *Suite) TestGetProtocolStateSnapshotByBlockID_DifferentBlockFinalize
rootSnapshot := unittest.RootSnapshotFixture(identities)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(suite.T(), err)
rootProtocolStateID := rootProtocolState.Entry().ID()
rootProtocolStateID := rootProtocolState.ID()
util.RunWithFullProtocolState(suite.T(), rootSnapshot, func(db *badger.DB, state *bprotocol.ParticipantState) {
rootBlock, err := rootSnapshot.Head()
suite.Require().NoError(err)
Expand Down Expand Up @@ -617,7 +616,7 @@ func (suite *Suite) TestGetProtocolStateSnapshotByBlockID_UnexpectedErrorBlockID
rootSnapshot := unittest.RootSnapshotFixture(identities)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(suite.T(), err)
rootProtocolStateID := rootProtocolState.Entry().ID()
rootProtocolStateID := rootProtocolState.ID()
util.RunWithFullProtocolState(suite.T(), rootSnapshot, func(db *badger.DB, state *bprotocol.ParticipantState) {
rootBlock, err := rootSnapshot.Head()
suite.Require().NoError(err)
Expand Down Expand Up @@ -770,7 +769,7 @@ func (suite *Suite) TestGetProtocolStateSnapshotByHeight_NonFinalizedBlocks() {
rootSnapshot := unittest.RootSnapshotFixture(identities)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(suite.T(), err)
rootProtocolStateID := rootProtocolState.Entry().ID()
rootProtocolStateID := rootProtocolState.ID()
util.RunWithFullProtocolState(suite.T(), rootSnapshot, func(db *badger.DB, state *bprotocol.ParticipantState) {
rootBlock, err := rootSnapshot.Head()
suite.Require().NoError(err)
Expand Down
6 changes: 4 additions & 2 deletions engine/collection/test/cluster_switchover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
bcluster "github.com/onflow/flow-go/state/cluster/badger"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/state/protocol/protocol_state/kvstore"
protocol_state "github.com/onflow/flow-go/state/protocol/protocol_state/state"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -98,7 +99,8 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf)
commit.ClusterQCs = rootClusterQCs

seal.ResultID = result.ID()
root.Payload.ProtocolStateID = inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID()
root.Payload.ProtocolStateID = kvstore.NewDefaultKVStore(
inmem.ProtocolStateFromEpochServiceEvents(setup, commit).ID()).ID()
tc.root, err = inmem.SnapshotFromBootstrapState(root, result, seal, qc)
require.NoError(t, err)

Expand Down Expand Up @@ -135,7 +137,7 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf)
// take first collection node and use its storage as data source for stateMutator
refNode := tc.nodes[0]
stateMutator := protocol_state.NewMutableProtocolState(
refNode.ProtocolStateSnapshots,
refNode.EpochProtocolState,
refNode.ProtocolKVStore,
refNode.State.Params(),
refNode.Headers,
Expand Down
5 changes: 2 additions & 3 deletions engine/common/follower/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
// Each worker submits batchesPerWorker*blocksPerBatch blocks
// In total we will submit workers*batchesPerWorker*blocksPerBatch
func TestFollowerHappyPath(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
allIdentities := unittest.CompleteIdentitySet()
rootSnapshot := unittest.RootSnapshotFixture(allIdentities)
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
Expand All @@ -64,7 +63,7 @@ func TestFollowerHappyPath(t *testing.T) {
all.QuorumCertificates,
all.Setups,
all.EpochCommits,
all.ProtocolState,
all.EpochProtocolState,
all.ProtocolKVStore,
all.VersionBeacons,
rootSnapshot,
Expand All @@ -90,7 +89,7 @@ func TestFollowerHappyPath(t *testing.T) {
require.NoError(t, err)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(t, err)
rootProtocolStateID := rootProtocolState.Entry().ID()
rootProtocolStateID := rootProtocolState.ID()

// Hack EFM.
// Since root snapshot is created with 1000 views for first epoch, we will forcefully enter EFM to avoid errors
Expand Down
3 changes: 0 additions & 3 deletions engine/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func sendBlock(exeNode *testmock.ExecutionNode, from flow.Identifier, proposal *
// create another child block which will trigger the parent
// block to be incorporated and be passed to the ingestion engine
func TestExecutionFlow(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
hub := stub.NewNetworkHub()

chainID := flow.Testnet
Expand Down Expand Up @@ -368,7 +367,6 @@ func makeSuccessBlock(t *testing.T, conID *flow.Identity, colID *flow.Identity,
// Test a successful tx should change the statecommitment,
// but a failed Tx should not change the statecommitment.
func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
hub := stub.NewNetworkHub()

chainID := flow.Emulator
Expand Down Expand Up @@ -533,7 +531,6 @@ func mockCollectionEngineToReturnCollections(t *testing.T, collectionNode *testm

// Test the receipt will be sent to multiple verification nodes
func TestBroadcastToMultipleVerificationNodes(t *testing.T) {
unittest.SkipUnless(t, unittest.TEST_TODO, "kvstore: temporary broken")
hub := stub.NewNetworkHub()

chainID := flow.Emulator
Expand Down
46 changes: 23 additions & 23 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,29 @@ type GenericNode struct {
Cancel context.CancelFunc
Errs <-chan error

Log zerolog.Logger
Metrics *metrics.NoopCollector
Tracer module.Tracer
PublicDB *badger.DB
SecretsDB *badger.DB
Headers storage.Headers
Guarantees storage.Guarantees
Seals storage.Seals
Payloads storage.Payloads
Blocks storage.Blocks
QuorumCertificates storage.QuorumCertificates
Results storage.ExecutionResults
Setups storage.EpochSetups
EpochCommits storage.EpochCommits
ProtocolStateSnapshots storage.ProtocolState
ProtocolKVStore storage.ProtocolKVStore
State protocol.ParticipantState
Index storage.Index
Me module.Local
Net *stub.Network
DBDir string
ChainID flow.ChainID
ProtocolEvents *events.Distributor
Log zerolog.Logger
Metrics *metrics.NoopCollector
Tracer module.Tracer
PublicDB *badger.DB
SecretsDB *badger.DB
Headers storage.Headers
Guarantees storage.Guarantees
Seals storage.Seals
Payloads storage.Payloads
Blocks storage.Blocks
QuorumCertificates storage.QuorumCertificates
Results storage.ExecutionResults
Setups storage.EpochSetups
EpochCommits storage.EpochCommits
EpochProtocolState storage.ProtocolState
ProtocolKVStore storage.ProtocolKVStore
State protocol.ParticipantState
Index storage.Index
Me module.Local
Net *stub.Network
DBDir string
ChainID flow.ChainID
ProtocolEvents *events.Distributor
}

func (g *GenericNode) Done() {
Expand Down
Loading

0 comments on commit 83ef8ad

Please sign in to comment.