Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor prepare proposal to efficiently fill the square #224

Merged
merged 25 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
136d617
refactor prepare proposal to efficiently fill the square
evan-forbes Feb 25, 2022
bb49e54
fix tests by sorting messages
evan-forbes Feb 26, 2022
1da4870
estimate square size and move erasure step to the app
evan-forbes Feb 28, 2022
e6a3585
fix prepare and process proposal tests
evan-forbes Mar 1, 2022
888d7a6
fix tests
evan-forbes Mar 1, 2022
063142f
linter
evan-forbes Mar 2, 2022
c49feca
use tagged version of celestia-core instead of commit
evan-forbes May 10, 2022
c4b7d60
clean up todos and remove panic
evan-forbes May 10, 2022
9e99916
update docs
evan-forbes May 10, 2022
8050630
fix remaining test
evan-forbes May 10, 2022
ec36e5d
linter
evan-forbes May 10, 2022
35fcff5
rename after rebase
evan-forbes May 10, 2022
266a1fb
WriteSquare -> splitshares
evan-forbes May 10, 2022
26861b9
WriteSquare -> splitshares
evan-forbes May 10, 2022
4e642c9
use correct function name for documentation
evan-forbes May 10, 2022
160c8dc
wording
evan-forbes May 11, 2022
3fbc45e
log errors better
evan-forbes May 12, 2022
d7be2b3
fix comment
evan-forbes May 12, 2022
ac75bde
remove ISRs from the share splitter
evan-forbes May 12, 2022
9dc8d12
improve comment
evan-forbes May 12, 2022
ff5a678
typo
evan-forbes May 12, 2022
c77ea6e
use a clearer method of calculating the delim length
evan-forbes May 12, 2022
a17beb0
use a switch statement instead of confusing if statement flow
evan-forbes May 12, 2022
efdf2d6
Merge branch 'evan/prepare-prop-refactor' of https://github.com/celes…
evan-forbes May 12, 2022
a063a95
fix wording
evan-forbes May 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 79 additions & 87 deletions app/prepare_proposal.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,91 @@
package app

import (
"bytes"
"crypto/sha256"
"sort"
"math"

"github.com/celestiaorg/celestia-app/x/payment/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/pkg/consts"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/proto/tendermint/types"
coretypes "github.com/tendermint/tendermint/types"
)

// PreprocessTxs fullfills the celestia-core version of the ACBI interface, by
// performing basic validation for the incoming txs, and by cleanly separating
// share messages from transactions
// PrepareProposal fullfills the celestia-core version of the ABCI interface by
// preparing the proposal block data. The square size is determined by first
// estimating it via the size of the passed block data. Then the included
// MsgWirePayForData messages are malleated into MsgPayForData messages by
// separating the message and transaction that pays for that message. Lastly,
// this method generates the data root for the proposal block and passes it the
// blockdata.
func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
squareSize := app.SquareSize()
shareCounter := uint64(0)
var shareMsgs []*core.Message
var processedTxs [][]byte
for _, rawTx := range req.BlockData.Txs {
squareSize := app.estimateSquareSize(req.BlockData)

dataSquare, data := SplitShares(app.txConfig, squareSize, req.BlockData)

eds, err := da.ExtendShares(squareSize, dataSquare)
if err != nil {
app.Logger().Error(
"failure to erasure the data square while creating a proposal block",
"error",
err.Error(),
)
panic(err)
}

dah := da.NewDataAvailabilityHeader(eds)
data.Hash = dah.Hash()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. So this is not done in core at all anymore, right? I'm wondering if we should move some of the code that is currently in core to this repo actually 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this, but atm I think its best to keep it there. This is mainly because it makes it easier to test things. This could be a good goal for our next upgrade tho.

data.OriginalSquareSize = squareSize

return abci.ResponsePrepareProposal{
BlockData: data,
}
}

// estimateSquareSize returns an estimate of the needed square size to fit the
// provided block data. It assumes that every malleatable tx has a viable commit
// for whatever square size that we end up picking.
func (app *App) estimateSquareSize(data *core.Data) uint64 {
txBytes := 0
for _, tx := range data.Txs {
txBytes += len(tx) + delimLen(uint64(len(tx)))
}
txShareEstimate := txBytes / consts.TxShareSize
if txBytes > 0 {
txShareEstimate++ // add one to round up
}

evdBytes := 0
for _, evd := range data.Evidence.Evidence {
evdBytes += evd.Size() + delimLen(uint64(evd.Size()))
}
evdShareEstimate := evdBytes / consts.TxShareSize
if evdBytes > 0 {
evdShareEstimate++ // add one to round up
}
adlerjohn marked this conversation as resolved.
Show resolved Hide resolved

msgShareEstimate := estimateMsgShares(app.txConfig, data.Txs)

totalShareEstimate := txShareEstimate + evdShareEstimate + msgShareEstimate

estimatedSize := types.NextPowerOf2(uint64(math.Sqrt(float64(totalShareEstimate))))

switch {
case estimatedSize > consts.MaxSquareSize:
return consts.MaxSquareSize
case estimatedSize < consts.MinSquareSize:
return consts.MinSquareSize
default:
return estimatedSize
}
}

func estimateMsgShares(txConf client.TxConfig, txs [][]byte) int {
msgShares := uint64(0)
for _, rawTx := range txs {
// decode the Tx
tx, err := app.txConfig.TxDecoder()(rawTx)
tx, err := txConf.TxDecoder()(rawTx)
if err != nil {
continue
}
Expand All @@ -34,14 +95,12 @@ func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePr
continue
}

// don't process the tx if the transaction doesn't contain a
// MsgPayForData sdk.Msg
// skip txs that don't contain messages
if !hasWirePayForData(authTx) {
processedTxs = append(processedTxs, rawTx)
continue
}

// only support transactions that contain a single sdk.Msg
// only support malleated transactions that contain a single sdk.Msg
if len(authTx.GetMsgs()) != 1 {
continue
}
Expand All @@ -52,76 +111,9 @@ func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePr
continue
}

// run basic validation on the transaction
err = authTx.ValidateBasic()
if err != nil {
continue
}
msgShares += (wireMsg.MessageSize / consts.MsgShareSize) + 1 // plus one to round up

// parse wire message and create a single message
coreMsg, unsignedPFD, sig, err := types.ProcessWirePayForData(wireMsg, app.SquareSize())
if err != nil {
continue
}

// create the signed PayForData using the fees, gas limit, and sequence from
// the original transaction, along with the appropriate signature.
signedTx, err := types.BuildPayForDataTxFromWireTx(authTx, app.txConfig.NewTxBuilder(), sig, unsignedPFD)
if err != nil {
app.Logger().Error("failure to create signed PayForData", err)
continue
}

// increment the share counter by the number of shares taken by the message
sharesTaken := uint64(len(coreMsg.Data) / types.ShareSize)
shareCounter += sharesTaken

// if there are too many shares stop processing and return the transactions
if shareCounter > squareSize*squareSize {
break
}

rawProcessedTx, err := app.txConfig.TxEncoder()(signedTx)
if err != nil {
continue
}

parentHash := sha256.Sum256(rawTx)
wrappedTx, err := coretypes.WrapMalleatedTx(parentHash[:], rawProcessedTx)
if err != nil {
app.Logger().Error("failure to wrap child transaction with parent hash", "Error:", err)
}

shareMsgs = append(shareMsgs, coreMsg)
processedTxs = append(processedTxs, wrappedTx)
}

// sort messages lexigraphically
sort.Slice(shareMsgs, func(i, j int) bool {
return bytes.Compare(shareMsgs[i].NamespaceId, shareMsgs[j].NamespaceId) < 0
})

return abci.ResponsePrepareProposal{
BlockData: &core.Data{
Txs: processedTxs,
Evidence: req.BlockData.Evidence,
Messages: core.Messages{MessagesList: shareMsgs},
},
}
}

func hasWirePayForData(tx sdk.Tx) bool {
for _, msg := range tx.GetMsgs() {
msgName := sdk.MsgTypeURL(msg)
if msgName == types.URLMsgWirePayForData {
return true
}
}
return false
}

// SquareSize returns the current square size. Currently, the square size is
// hardcoded. todo(evan): don't hardcode the square size
func (app *App) SquareSize() uint64 {
return consts.MaxSquareSize
return int(msgShares)
}
49 changes: 48 additions & 1 deletion app/process_proposal.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package app

import (
"bytes"

"github.com/celestiaorg/celestia-app/x/payment/types"
sdk "github.com/cosmos/cosmos-sdk/types"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/pkg/da"
coretypes "github.com/tendermint/tendermint/types"
)

const (
Expand Down Expand Up @@ -59,7 +63,7 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
// iterate through all of the messages and ensure that a PFD with the exact
// commitment exists
for _, msg := range req.BlockData.Messages.MessagesList {
commit, err := types.CreateCommitment(app.SquareSize(), msg.NamespaceId, msg.Data)
commit, err := types.CreateCommitment(req.BlockData.OriginalSquareSize, msg.NamespaceId, msg.Data)
if err != nil {
app.Logger().Error(
rejectedPropBlockLog,
Expand All @@ -82,6 +86,49 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
}
}

data, err := coretypes.DataFromProto(req.BlockData)
if err != nil {
app.Logger().Error(rejectedPropBlockLog, "reason", "failure to unmarshal block data:", "error", err)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

shares, _, err := data.ComputeShares(req.BlockData.OriginalSquareSize)
if err != nil {
app.Logger().Error(rejectedPropBlockLog, "reason", "failure to compute shares from block data:", "error", err)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, shares.RawShares())
if err != nil {
app.Logger().Error(
rejectedPropBlockLog,
"reason",
"failure to erasure the data square",
"error",
err,
)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

dah := da.NewDataAvailabilityHeader(eds)

if !bytes.Equal(dah.Hash(), req.Header.DataHash) {
app.Logger().Error(
rejectedPropBlockLog,
"reason",
"proposed data root differs from calculated data root",
)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_ACCEPT,
}
Expand Down
Loading