Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary introduction of RowSet with supporting changes #427

Closed
wants to merge 11 commits into from
Closed
5 changes: 5 additions & 0 deletions blockchain/msgs_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package blockchain

import (
"context"
"encoding/hex"
"math"
"testing"

"github.com/gogo/protobuf/proto"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -83,6 +85,9 @@ func TestBlockchainMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil, types.Messages{}, nil)
block.Version.Block = 11 // overwrite updated protocol version

_, err := block.RowSet(context.TODO(), mdutils.Mock())
require.NoError(t, err)

bpb, err := block.ToProto()
require.NoError(t, err)

Expand Down
9 changes: 7 additions & 2 deletions blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -302,9 +303,13 @@ func makeTxs(height int64) (txs []types.Tx) {
}

func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), nil,
b := state.MakeBlock(height, makeTxs(height), nil,
nil, types.Messages{}, lastCommit, state.Validators.GetProposer().Address)
return block
_, err := b.RowSet(context.TODO(), mdutils.Mock())
if err != nil {
panic(err)
}
return b
}

type testApp struct {
Expand Down
4 changes: 2 additions & 2 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1, blockParts1, _ := cs.createProposalBlock()
polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID, &block1.DataAvailabilityHeader)
p1, err := proposal1.ToProto()
Expand All @@ -399,7 +399,7 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2, blockParts2, _ := cs.createProposalBlock()
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID, &block2.DataAvailabilityHeader)
p2, err := proposal2.ToProto()
Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func decideProposal(
round int32,
) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block, blockParts, _ := cs1.createProposalBlock()
validRound := cs1.ValidRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
Expand Down
7 changes: 4 additions & 3 deletions consensus/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lazyledger/lazyledger-core/libs/bits"
tmrand "github.com/lazyledger/lazyledger-core/libs/rand"
"github.com/lazyledger/lazyledger-core/p2p"
"github.com/lazyledger/lazyledger-core/p2p/ipld"
tmcons "github.com/lazyledger/lazyledger-core/proto/tendermint/consensus"
tmproto "github.com/lazyledger/lazyledger-core/proto/tendermint/types"
"github.com/lazyledger/lazyledger-core/types"
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestMsgToProto(t *testing.T) {
pbParts, err := parts.ToProto()
require.NoError(t, err)

roots, err := types.NmtRootsFromBytes([][]byte{tmrand.Bytes(2*consts.NamespaceSize + tmhash.Size)})
roots, err := ipld.NmtRootsFromBytes([][]byte{tmrand.Bytes(2*consts.NamespaceSize + tmhash.Size)})
require.NoError(t, err)
proposal := types.Proposal{
Type: tmproto.ProposalType,
Expand All @@ -58,7 +59,7 @@ func TestMsgToProto(t *testing.T) {
BlockID: bi,
Timestamp: time.Now(),
Signature: tmrand.Bytes(20),
DAHeader: &types.DataAvailabilityHeader{
DAHeader: &ipld.DataAvailabilityHeader{
RowsRoots: roots,
ColumnRoots: roots,
},
Expand Down Expand Up @@ -361,7 +362,7 @@ func TestConsMsgsVectors(t *testing.T) {
BlockID: bi,
Timestamp: date,
Signature: []byte("add_more_exclamation"),
DAHeader: &types.DataAvailabilityHeader{},
DAHeader: &ipld.DataAvailabilityHeader{},
}
pbProposal, err := proposal.ToProto()
require.NoError(t, err)
Expand Down
16 changes: 10 additions & 6 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock, _, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

Expand Down Expand Up @@ -399,7 +399,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock, _, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

Expand Down Expand Up @@ -437,7 +437,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock, _, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss := make([]*validatorStub, nVals+1)
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock, _, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss = make([]*validatorStub, nVals+3)
Expand Down Expand Up @@ -1001,8 +1001,7 @@ func makeBlock(state sm.State, lastBlock *types.Block, lastBlockMeta *types.Bloc
lastCommit = types.NewCommit(vote.Height, vote.Round,
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
}

return state.MakeBlock(
block := state.MakeBlock(
height,
[]types.Tx{},
nil,
Expand All @@ -1011,6 +1010,11 @@ func makeBlock(state sm.State, lastBlock *types.Block, lastBlockMeta *types.Bloc
lastCommit,
state.Validators.GetProposer().Address,
)
_, err := block.RowSet(context.TODO(), mdutils.Mock())
if err != nil {
panic(err)
}
return block, block.MakePartSet(types.BlockPartSizeBytes)
}

type badApp struct {
Expand Down
49 changes: 32 additions & 17 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ type State struct {
metrics *Metrics

// context of the recent proposed block
proposalCtx context.Context
proposalCancel context.CancelFunc
provideCtx context.Context
provideCancel context.CancelFunc
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -658,12 +658,15 @@ func (cs *State) updateToState(state sm.State) {
cs.Proposal = nil
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.ProposalBlockRows = nil
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedBlockRows = nil
cs.ValidRound = -1
cs.ValidBlock = nil
cs.ValidBlockParts = nil
cs.ValidBlockRows = nil
cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
cs.CommitRound = -1
cs.LastValidators = state.LastValidators
Expand Down Expand Up @@ -962,6 +965,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
cs.Proposal = nil
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
cs.ProposalBlockRows = nil
}
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
Expand Down Expand Up @@ -1078,14 +1082,15 @@ func (cs *State) isProposer(address []byte) bool {
func (cs *State) defaultDecideProposal(height int64, round int32) {
var block *types.Block
var blockParts *types.PartSet
var blockRows *types.RowSet

// Decide on block
if cs.ValidBlock != nil {
// If there is valid block, choose that.
block, blockParts = cs.ValidBlock, cs.ValidBlockParts
block, blockParts, blockRows = cs.ValidBlock, cs.ValidBlockParts, cs.ValidBlockRows
} else {
// Create a new proposal block from state/txs from the mempool.
block, blockParts = cs.createProposalBlock()
block, blockParts, blockRows = cs.createProposalBlock()
if block == nil {
return
}
Expand Down Expand Up @@ -1121,9 +1126,9 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
cs.Logger.Error("enterPropose: Error signing proposal", "height", height, "round", round, "err", err)
}

// cancel ctx for previous proposal block to ensure block putting/providing does not queues up
if cs.proposalCancel != nil {
// FIXME(ismail): below commented out cancel tries to prevent block putting
// cancel ctx for previous proposal block to ensure block providing does not queues up
if cs.provideCancel != nil {
// FIXME(ismail): below commented out cancel tries to prevent block providing
// and providing no to queue up endlessly.
// But in a real network proposers should have enough time in between.
// And even if not, queuing up to a problematic extent will take a lot of time:
Expand All @@ -1139,22 +1144,20 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
// the provide timeout could still be larger than just the time between
// two consecutive proposals.
//
cs.proposalCancel()
cs.provideCancel()
}
cs.proposalCtx, cs.proposalCancel = context.WithCancel(context.TODO())
go func(ctx context.Context) {
cs.Logger.Info("Putting Block to IPFS", "height", block.Height)
err = ipld.PutBlock(ctx, cs.dag, block, cs.croute, cs.Logger)
cs.provideCtx, cs.provideCancel = context.WithCancel(context.TODO())
go func(ctx context.Context, dah *ipld.DataAvailabilityHeader) {
err = ipld.ProvideData(ctx, dah, cs.croute, cs.Logger.With("height", block.Height))
if err != nil {
if errors.Is(err, context.Canceled) {
cs.Logger.Error("Putting Block didn't finish in time and was terminated", "height", block.Height)
cs.Logger.Error("Providing Block didn't finish in time and was terminated", "height", block.Height)
return
}
cs.Logger.Error("Failed to put Block to IPFS", "err", err, "height", block.Height)
cs.Logger.Error("Failed to provide Block to DHT", "err", err, "height", block.Height)
return
}
cs.Logger.Info("Finished putting block to IPFS", "height", block.Height)
}(cs.proposalCtx)
}(cs.provideCtx, blockRows.DAHeader)
}

// Returns true if the proposal block is complete &&
Expand All @@ -1180,7 +1183,7 @@ func (cs *State) isProposalComplete() bool {
//
// NOTE: keep it side-effect free for clarity.
// CONTRACT: cs.privValidator is not nil.
func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.PartSet) {
func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.PartSet, blockRows *types.RowSet) {
if cs.privValidator == nil {
panic("entered createProposalBlock with privValidator being nil")
}
Expand Down Expand Up @@ -1365,6 +1368,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedBlockRows = nil
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
cs.Logger.Error("Error publishing event unlock", "err", err)
}
Expand Down Expand Up @@ -1413,6 +1417,7 @@ func (cs *State) enterPrecommit(height int64, round int32) {
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
cs.ProposalBlockRows = nil
}
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
cs.Logger.Error("Error publishing event unlock", "err", err)
Expand Down Expand Up @@ -1487,6 +1492,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) {
logger.Info("Commit is for locked block. Set ProposalBlock=LockedBlock", "blockHash", blockID.Hash)
cs.ProposalBlock = cs.LockedBlock
cs.ProposalBlockParts = cs.LockedBlockParts
cs.ProposalBlockRows = cs.LockedBlockRows
}

// If we don't have the block being committed, set up to get it.
Expand All @@ -1502,6 +1508,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) {
// Set up ProposalBlockParts and keep waiting.
cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
cs.ProposalBlockRows = nil
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil {
cs.Logger.Error("Error publishing valid block", "err", err)
}
Expand Down Expand Up @@ -1864,6 +1871,10 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
}

cs.ProposalBlock = block
cs.ProposalBlockRows, err = block.RowSet(context.TODO(), cs.dag)
if err != nil {
return false, err
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil {
Expand All @@ -1880,6 +1891,7 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
cs.ValidBlockRows = cs.ProposalBlockRows
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
Expand Down Expand Up @@ -2046,6 +2058,7 @@ func (cs *State) addVote(
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedBlockRows = nil
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil {
return added, err
}
Expand All @@ -2061,12 +2074,14 @@ func (cs *State) addVote(
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
cs.ValidBlockRows = cs.ProposalBlockRows
} else {
cs.Logger.Info(
"Valid block we don't know about. Set ProposalBlock=nil",
"proposal", cs.ProposalBlock.Hash(), "blockID", blockID.Hash)
// We're getting the wrong block.
cs.ProposalBlock = nil
cs.ProposalBlockRows = nil
}
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
Expand Down
4 changes: 2 additions & 2 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestStateBadProposal(t *testing.T) {
proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal)
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)

propBlock, _ := cs1.createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock, _, _ := cs1.createProposalBlock() // changeProposer(t, cs1, vs2)

// make the second validator the proposer by incrementing round
round++
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestStateOversizedBlock(t *testing.T) {
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)

propBlock, _ := cs1.createProposalBlock()
propBlock, _, _ := cs1.createProposalBlock()
propBlock.Data.Txs = []types.Tx{tmrand.Bytes(2001)}
propBlock.Header.DataHash = propBlock.DataAvailabilityHeader.Hash()

Expand Down
7 changes: 5 additions & 2 deletions consensus/types/round_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ type RoundState struct {
Proposal *types.Proposal `json:"proposal"`
ProposalBlock *types.Block `json:"proposal_block"`
ProposalBlockParts *types.PartSet `json:"proposal_block_parts"`
ProposalBlockRows *types.RowSet `json:"proposal_block_rows"`
LockedRound int32 `json:"locked_round"`
LockedBlock *types.Block `json:"locked_block"`
LockedBlockParts *types.PartSet `json:"locked_block_parts"`
LockedBlockRows *types.RowSet `json:"locked_block_rows"`

// Last known round with POL for non-nil valid block.
ValidRound int32 `json:"valid_round"`
ValidBlock *types.Block `json:"valid_block"` // Last known block of POL mentioned above.
ValidRound int32 `json:"valid_round"`
ValidBlock *types.Block `json:"valid_block"` // Last known block of POL mentioned above.
ValidBlockRows *types.RowSet `json:"valid_block_rows"`

// Last known block parts of POL mentioned above.
ValidBlockParts *types.PartSet `json:"valid_block_parts"`
Expand Down
Loading