From 6e784198f635bc7038b181b8ab6b339de4f94d45 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Mon, 18 Dec 2023 14:04:41 +0200 Subject: [PATCH] fix(share/byzantine): fix proof collection (#2957) This PR fixes an issue in the proof collection method. Previously, we were sending `len(dah.RowRoots)/2` requests in order to get proofs but there is a use case when the process can get stuck for a particular share(out-of-order case). The new approach sends n requests, where n is the number of non-empty shares received from rsmt2d library, and then waits until `len(dah.RowRoots)/2` will be received. I've also improved error handling as previously we were panicking in case any error appeared. The new approach will rely only on `context.DeadlineExceeded ` which basically means that for some reason we can't get proofs, so the data is not available. --------- Co-authored-by: ramin --- share/eds/byzantine/bad_encoding_test.go | 111 ++++++++++++++++++++--- share/eds/byzantine/byzantine.go | 46 +++++----- 2 files changed, 119 insertions(+), 38 deletions(-) diff --git a/share/eds/byzantine/bad_encoding_test.go b/share/eds/byzantine/bad_encoding_test.go index e7032107ca..adb23d3549 100644 --- a/share/eds/byzantine/bad_encoding_test.go +++ b/share/eds/byzantine/bad_encoding_test.go @@ -2,9 +2,15 @@ package byzantine import ( "context" + "crypto/sha256" + "hash" "testing" "time" + "github.com/ipfs/boxo/blockservice" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + mhcore "github.com/multiformats/go-multihash/core" "github.com/stretchr/testify/require" core "github.com/tendermint/tendermint/types" @@ -35,10 +41,11 @@ func TestBEFP_Validate(t *testing.T) { err = square.Repair(dah.RowRoots, dah.ColumnRoots) require.ErrorAs(t, err, &errRsmt2d) - errByz := NewErrByzantine(ctx, bServ, &dah, errRsmt2d) + byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d) + var errByz *ErrByzantine + require.ErrorAs(t, byzantine, &errByz) befp := CreateBadEncodingProof([]byte("hash"), 0, errByz) - var test = []struct { name string prepareFn func() error @@ -69,7 +76,9 @@ func TestBEFP_Validate(t *testing.T) { Shares: validShares[0:4], }, ) - invalidBefp := CreateBadEncodingProof([]byte("hash"), 0, errInvalidByz) + var errInvalid *ErrByzantine + require.ErrorAs(t, errInvalidByz, &errInvalid) + invalidBefp := CreateBadEncodingProof([]byte("hash"), 0, errInvalid) return invalidBefp.Validate(&header.ExtendedHeader{DAH: &validDah}) }, expectedResult: func(err error) { @@ -198,18 +207,21 @@ func TestIncorrectBadEncodingFraudProof(t *testing.T) { } func TestBEFP_ValidateOutOfOrderShares(t *testing.T) { - // skipping it for now because `malicious` package has a small issue: Constructor does not apply - // passed options, so it's not possible to store shares and thus get proofs for them. - // should be ok once app team will fix it. - t.Skip() - eds := edstest.RandEDS(t, 16) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + t.Cleanup(cancel) + + size := 4 + eds := edstest.RandEDS(t, size) + shares := eds.Flattened() - shares[0], shares[1] = shares[1], shares[0] // corrupting eds - bServ := ipld.NewMemBlockservice() - batchAddr := ipld.NewNmtNodeAdder(context.Background(), bServ, ipld.MaxSizeBatchOption(16*2)) + shares[0], shares[4] = shares[4], shares[0] // corrupting eds + + bServ := newNamespacedBlockService() + batchAddr := ipld.NewNmtNodeAdder(ctx, bServ, ipld.MaxSizeBatchOption(size*2)) + eds, err := rsmt2d.ImportExtendedDataSquare(shares, share.DefaultRSMT2DCodec(), - malicious.NewConstructor(16, nmt.NodeVisitor(batchAddr.Visit)), + malicious.NewConstructor(uint64(size), nmt.NodeVisitor(batchAddr.Visit)), ) require.NoError(t, err, "failure to recompute the extended data square") @@ -223,9 +235,80 @@ func TestBEFP_ValidateOutOfOrderShares(t *testing.T) { err = eds.Repair(dah.RowRoots, dah.ColumnRoots) require.ErrorAs(t, err, &errRsmt2d) - errByz := NewErrByzantine(context.Background(), bServ, &dah, errRsmt2d) + byzantine := NewErrByzantine(ctx, bServ, &dah, errRsmt2d) + var errByz *ErrByzantine + require.ErrorAs(t, byzantine, &errByz) befp := CreateBadEncodingProof([]byte("hash"), 0, errByz) err = befp.Validate(&header.ExtendedHeader{DAH: &dah}) - require.Error(t, err) + require.NoError(t, err) +} + +// namespacedBlockService wraps `BlockService` and extends the verification part +// to avoid returning blocks that has out of order namespaces. +type namespacedBlockService struct { + blockservice.BlockService + // the data structure that is used on the networking level, in order + // to verify the order of the namespaces + prefix *cid.Prefix +} + +func newNamespacedBlockService() *namespacedBlockService { + sha256NamespaceFlagged := uint64(0x7701) + // register the nmt hasher to validate the order of namespaces + mhcore.Register(sha256NamespaceFlagged, func() hash.Hash { + nh := nmt.NewNmtHasher(sha256.New(), share.NamespaceSize, true) + nh.Reset() + return nh + }) + + bs := &namespacedBlockService{} + bs.BlockService = ipld.NewMemBlockservice() + + bs.prefix = &cid.Prefix{ + Version: 1, + Codec: sha256NamespaceFlagged, + MhType: sha256NamespaceFlagged, + // equals to NmtHasher.Size() + MhLength: sha256.New().Size() + 2*share.NamespaceSize, + } + return bs +} + +func (n *namespacedBlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + block, err := n.BlockService.GetBlock(ctx, c) + if err != nil { + return nil, err + } + + _, err = n.prefix.Sum(block.RawData()) + if err != nil { + return nil, err + } + return block, nil +} + +func (n *namespacedBlockService) GetBlocks(ctx context.Context, cids []cid.Cid) <-chan blocks.Block { + blockCh := n.BlockService.GetBlocks(ctx, cids) + resultCh := make(chan blocks.Block) + + go func() { + for { + select { + case <-ctx.Done(): + close(resultCh) + return + case block, ok := <-blockCh: + if !ok { + close(resultCh) + return + } + if _, err := n.prefix.Sum(block.RawData()); err != nil { + continue + } + resultCh <- block + } + } + }() + return resultCh } diff --git a/share/eds/byzantine/byzantine.go b/share/eds/byzantine/byzantine.go index dfdf681f04..e7aec28959 100644 --- a/share/eds/byzantine/byzantine.go +++ b/share/eds/byzantine/byzantine.go @@ -4,11 +4,9 @@ import ( "context" "fmt" - "github.com/ipfs/boxo/blockservice" - "golang.org/x/sync/errgroup" - "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/rsmt2d" + "github.com/ipfs/boxo/blockservice" "github.com/celestiaorg/celestia-node/share/ipld" ) @@ -35,7 +33,7 @@ func NewErrByzantine( bGetter blockservice.BlockGetter, dah *da.DataAvailabilityHeader, errByz *rsmt2d.ErrByzantineData, -) *ErrByzantine { +) error { // changing the order to collect proofs against an orthogonal axis roots := [][][]byte{ dah.ColumnRoots, @@ -43,41 +41,41 @@ func NewErrByzantine( }[errByz.Axis] sharesWithProof := make([]*ShareWithProof, len(errByz.Shares)) - sharesAmount := 0 - errGr, ctx := errgroup.WithContext(ctx) + type result struct { + share *ShareWithProof + index int + } + resultCh := make(chan *result) for index, share := range errByz.Shares { - // skip further shares if we already requested half of them, which is enough to recompute the row - // or col - if sharesAmount == len(dah.RowRoots)/2 { - break - } - if share == nil { continue } - sharesAmount++ index := index - errGr.Go(func() error { + go func() { share, err := getProofsAt( ctx, bGetter, ipld.MustCidFromNamespacedSha256(roots[index]), int(errByz.Index), len(errByz.Shares), ) - sharesWithProof[index] = share - return err - }) + if err != nil { + log.Warn("requesting proof failed", "root", roots[index], "err", err) + return + } + resultCh <- &result{share, index} + }() } - if err := errGr.Wait(); err != nil { - // Fatal as rsmt2d proved that error is byzantine, - // but we cannot properly collect the proof, - // so verification will fail and thus services won't be stopped - // while we still have to stop them. - // TODO(@Wondertan): Find a better way to handle - log.Fatalw("getting proof for ErrByzantine", "err", err) + for i := 0; i < len(dah.RowRoots)/2; i++ { + select { + case t := <-resultCh: + sharesWithProof[t.index] = t.share + case <-ctx.Done(): + return ipld.ErrNodeNotFound + } } + return &ErrByzantine{ Index: uint32(errByz.Index), Shares: sharesWithProof,