Skip to content

Commit

Permalink
refactor prepare proposal to efficiently fill the square (#224)
Browse files Browse the repository at this point in the history
* refactor prepare proposal to efficiently fill the square

* fix tests by sorting messages

* estimate square size and move erasure step to the app

* fix prepare and process proposal tests

* fix tests

* linter

* use tagged version of celestia-core instead of commit

* clean up todos and remove panic

* update docs

* fix remaining test

* linter

* rename after rebase

* WriteSquare -> splitshares

Co-authored-by: CHAMI Rachid <[email protected]>

* WriteSquare -> splitshares

Co-authored-by: CHAMI Rachid <[email protected]>

* use correct function name for documentation

Co-authored-by: CHAMI Rachid <[email protected]>

* wording

Co-authored-by: Ismail Khoffi <[email protected]>

* log errors better

* fix comment

* remove ISRs from the share splitter

* improve comment

* typo

Co-authored-by: John Adler <[email protected]>

* use a clearer method of calculating the delim length

Co-authored-by: John Adler <[email protected]>

* use a switch statement instead of confusing if statement flow

Co-authored-by: John Adler <[email protected]>

* fix wording

Co-authored-by: Ismail Khoffi <[email protected]>

Co-authored-by: CHAMI Rachid <[email protected]>
Co-authored-by: Ismail Khoffi <[email protected]>
Co-authored-by: John Adler <[email protected]>
  • Loading branch information
4 people authored May 16, 2022
1 parent bf8bb10 commit 6080046
Show file tree
Hide file tree
Showing 15 changed files with 617 additions and 171 deletions.
166 changes: 79 additions & 87 deletions app/prepare_proposal.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,91 @@
package app

import (
"bytes"
"crypto/sha256"
"sort"
"math"

"github.com/celestiaorg/celestia-app/x/payment/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/pkg/consts"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/proto/tendermint/types"
coretypes "github.com/tendermint/tendermint/types"
)

// PreprocessTxs fullfills the celestia-core version of the ACBI interface, by
// performing basic validation for the incoming txs, and by cleanly separating
// share messages from transactions
// PrepareProposal fullfills the celestia-core version of the ABCI interface by
// preparing the proposal block data. The square size is determined by first
// estimating it via the size of the passed block data. Then the included
// MsgWirePayForData messages are malleated into MsgPayForData messages by
// separating the message and transaction that pays for that message. Lastly,
// this method generates the data root for the proposal block and passes it the
// blockdata.
func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
squareSize := app.SquareSize()
shareCounter := uint64(0)
var shareMsgs []*core.Message
var processedTxs [][]byte
for _, rawTx := range req.BlockData.Txs {
squareSize := app.estimateSquareSize(req.BlockData)

dataSquare, data := SplitShares(app.txConfig, squareSize, req.BlockData)

eds, err := da.ExtendShares(squareSize, dataSquare)
if err != nil {
app.Logger().Error(
"failure to erasure the data square while creating a proposal block",
"error",
err.Error(),
)
panic(err)
}

dah := da.NewDataAvailabilityHeader(eds)
data.Hash = dah.Hash()
data.OriginalSquareSize = squareSize

return abci.ResponsePrepareProposal{
BlockData: data,
}
}

// estimateSquareSize returns an estimate of the needed square size to fit the
// provided block data. It assumes that every malleatable tx has a viable commit
// for whatever square size that we end up picking.
func (app *App) estimateSquareSize(data *core.Data) uint64 {
txBytes := 0
for _, tx := range data.Txs {
txBytes += len(tx) + delimLen(uint64(len(tx)))
}
txShareEstimate := txBytes / consts.TxShareSize
if txBytes > 0 {
txShareEstimate++ // add one to round up
}

evdBytes := 0
for _, evd := range data.Evidence.Evidence {
evdBytes += evd.Size() + delimLen(uint64(evd.Size()))
}
evdShareEstimate := evdBytes / consts.TxShareSize
if evdBytes > 0 {
evdShareEstimate++ // add one to round up
}

msgShareEstimate := estimateMsgShares(app.txConfig, data.Txs)

totalShareEstimate := txShareEstimate + evdShareEstimate + msgShareEstimate

estimatedSize := types.NextPowerOf2(uint64(math.Sqrt(float64(totalShareEstimate))))

switch {
case estimatedSize > consts.MaxSquareSize:
return consts.MaxSquareSize
case estimatedSize < consts.MinSquareSize:
return consts.MinSquareSize
default:
return estimatedSize
}
}

func estimateMsgShares(txConf client.TxConfig, txs [][]byte) int {
msgShares := uint64(0)
for _, rawTx := range txs {
// decode the Tx
tx, err := app.txConfig.TxDecoder()(rawTx)
tx, err := txConf.TxDecoder()(rawTx)
if err != nil {
continue
}
Expand All @@ -34,14 +95,12 @@ func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePr
continue
}

// don't process the tx if the transaction doesn't contain a
// MsgPayForData sdk.Msg
// skip txs that don't contain messages
if !hasWirePayForData(authTx) {
processedTxs = append(processedTxs, rawTx)
continue
}

// only support transactions that contain a single sdk.Msg
// only support malleated transactions that contain a single sdk.Msg
if len(authTx.GetMsgs()) != 1 {
continue
}
Expand All @@ -52,76 +111,9 @@ func (app *App) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePr
continue
}

// run basic validation on the transaction
err = authTx.ValidateBasic()
if err != nil {
continue
}
msgShares += (wireMsg.MessageSize / consts.MsgShareSize) + 1 // plus one to round up

// parse wire message and create a single message
coreMsg, unsignedPFD, sig, err := types.ProcessWirePayForData(wireMsg, app.SquareSize())
if err != nil {
continue
}

// create the signed PayForData using the fees, gas limit, and sequence from
// the original transaction, along with the appropriate signature.
signedTx, err := types.BuildPayForDataTxFromWireTx(authTx, app.txConfig.NewTxBuilder(), sig, unsignedPFD)
if err != nil {
app.Logger().Error("failure to create signed PayForData", err)
continue
}

// increment the share counter by the number of shares taken by the message
sharesTaken := uint64(len(coreMsg.Data) / types.ShareSize)
shareCounter += sharesTaken

// if there are too many shares stop processing and return the transactions
if shareCounter > squareSize*squareSize {
break
}

rawProcessedTx, err := app.txConfig.TxEncoder()(signedTx)
if err != nil {
continue
}

parentHash := sha256.Sum256(rawTx)
wrappedTx, err := coretypes.WrapMalleatedTx(parentHash[:], rawProcessedTx)
if err != nil {
app.Logger().Error("failure to wrap child transaction with parent hash", "Error:", err)
}

shareMsgs = append(shareMsgs, coreMsg)
processedTxs = append(processedTxs, wrappedTx)
}

// sort messages lexigraphically
sort.Slice(shareMsgs, func(i, j int) bool {
return bytes.Compare(shareMsgs[i].NamespaceId, shareMsgs[j].NamespaceId) < 0
})

return abci.ResponsePrepareProposal{
BlockData: &core.Data{
Txs: processedTxs,
Evidence: req.BlockData.Evidence,
Messages: core.Messages{MessagesList: shareMsgs},
},
}
}

func hasWirePayForData(tx sdk.Tx) bool {
for _, msg := range tx.GetMsgs() {
msgName := sdk.MsgTypeURL(msg)
if msgName == types.URLMsgWirePayForData {
return true
}
}
return false
}

// SquareSize returns the current square size. Currently, the square size is
// hardcoded. todo(evan): don't hardcode the square size
func (app *App) SquareSize() uint64 {
return consts.MaxSquareSize
return int(msgShares)
}
49 changes: 48 additions & 1 deletion app/process_proposal.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package app

import (
"bytes"

"github.com/celestiaorg/celestia-app/x/payment/types"
sdk "github.com/cosmos/cosmos-sdk/types"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/pkg/da"
coretypes "github.com/tendermint/tendermint/types"
)

const (
Expand Down Expand Up @@ -59,7 +63,7 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
// iterate through all of the messages and ensure that a PFD with the exact
// commitment exists
for _, msg := range req.BlockData.Messages.MessagesList {
commit, err := types.CreateCommitment(app.SquareSize(), msg.NamespaceId, msg.Data)
commit, err := types.CreateCommitment(req.BlockData.OriginalSquareSize, msg.NamespaceId, msg.Data)
if err != nil {
app.Logger().Error(
rejectedPropBlockLog,
Expand All @@ -82,6 +86,49 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
}
}

data, err := coretypes.DataFromProto(req.BlockData)
if err != nil {
app.Logger().Error(rejectedPropBlockLog, "reason", "failure to unmarshal block data:", "error", err)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

shares, _, err := data.ComputeShares(req.BlockData.OriginalSquareSize)
if err != nil {
app.Logger().Error(rejectedPropBlockLog, "reason", "failure to compute shares from block data:", "error", err)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, shares.RawShares())
if err != nil {
app.Logger().Error(
rejectedPropBlockLog,
"reason",
"failure to erasure the data square",
"error",
err,
)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

dah := da.NewDataAvailabilityHeader(eds)

if !bytes.Equal(dah.Hash(), req.Header.DataHash) {
app.Logger().Error(
rejectedPropBlockLog,
"reason",
"proposed data root differs from calculated data root",
)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_ACCEPT,
}
Expand Down
Loading

0 comments on commit 6080046

Please sign in to comment.