Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipld: move from DAGService to BlockService #730

Merged
merged 4 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"testing"
"time"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
format "github.com/ipfs/go-ipld-format"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -23,10 +23,10 @@ var timeout = time.Second * 15
// the DASer checkpoint is updated to network head.
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)
mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand Down Expand Up @@ -61,10 +61,10 @@ func TestDASerLifecycle(t *testing.T) {

func TestDASer_Restart(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)
mockGet, shareServ, sub := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
Expand All @@ -85,10 +85,10 @@ func TestDASer_Restart(t *testing.T) {
require.NoError(t, err)

// reset mockGet, generate 15 "past" headers, building off chain head which is 30
mockGet.generateHeaders(t, dag, 30, 45)
mockGet.generateHeaders(t, bServ, 30, 45)
mockGet.doneCh = make(chan struct{})
// reset dummy subscriber
mockGet.fillSubWithHeaders(t, sub, dag, 45, 60)
mockGet.fillSubWithHeaders(t, sub, bServ, 45, 60)
// manually set mockGet head to trigger stop at 45
mockGet.head = int64(45)

Expand Down Expand Up @@ -124,9 +124,9 @@ func TestDASer_Restart(t *testing.T) {

func TestDASer_catchUp(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 5, 0)
mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 5, 0)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down Expand Up @@ -165,9 +165,9 @@ func TestDASer_catchUp(t *testing.T) {
// difference of 1
func TestDASer_catchUp_oneHeader(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
bServ := mdutils.Bserv()

mockGet, shareServ, _ := createDASerSubcomponents(t, dag, 6, 0)
mockGet, shareServ, _ := createDASerSubcomponents(t, bServ, 6, 0)
daser := NewDASer(shareServ, nil, mockGet, ds)

// store checkpoint
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestDASer_catchUp_oneHeader(t *testing.T) {

func TestDASer_catchUp_fails(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
dag := mdutils.Mock()
dag := mdutils.Bserv()

mockGet, _, _ := createDASerSubcomponents(t, dag, 6, 0)
daser := NewDASer(share.NewBrokenAvailability(), nil, mockGet, ds)
Expand Down Expand Up @@ -256,21 +256,21 @@ func TestDASer_catchUp_fails(t *testing.T) {
// mockGetter, share.Service, and mock header.Subscriber.
func createDASerSubcomponents(
t *testing.T,
dag format.DAGService,
bServ blockservice.BlockService,
numGetter,
numSub int,
) (*mockGetter, *share.Service, *header.DummySubscriber) {
shareServ := share.NewService(dag, share.NewLightAvailability(dag))
shareServ := share.NewService(bServ, share.NewLightAvailability(bServ))

mockGet := &mockGetter{
headers: make(map[int64]*header.ExtendedHeader),
doneCh: make(chan struct{}),
}

mockGet.generateHeaders(t, dag, 0, numGetter)
mockGet.generateHeaders(t, bServ, 0, numGetter)

sub := new(header.DummySubscriber)
mockGet.fillSubWithHeaders(t, sub, dag, numGetter, numGetter+numSub)
mockGet.fillSubWithHeaders(t, sub, bServ, numGetter, numGetter+numSub)

return mockGet, shareServ, sub
}
Expand All @@ -279,15 +279,15 @@ func createDASerSubcomponents(
func (m *mockGetter) fillSubWithHeaders(
t *testing.T,
sub *header.DummySubscriber,
dag format.DAGService,
bServ blockservice.BlockService,
startHeight,
endHeight int,
) {
sub.Headers = make([]*header.ExtendedHeader, endHeight-startHeight)

index := 0
for i := startHeight; i < endHeight; i++ {
dah := share.RandFillDAG(t, 16, dag)
dah := share.RandFillDAG(t, 16, bServ)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
Expand All @@ -309,9 +309,9 @@ type mockGetter struct {
headers map[int64]*header.ExtendedHeader
}

func (m *mockGetter) generateHeaders(t *testing.T, dag format.DAGService, startHeight, endHeight int) {
func (m *mockGetter) generateHeaders(t *testing.T, bServ blockservice.BlockService, startHeight, endHeight int) {
for i := startHeight; i < endHeight; i++ {
dah := share.RandFillDAG(t, 16, dag)
dah := share.RandFillDAG(t, 16, bServ)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
Expand Down
6 changes: 3 additions & 3 deletions fraud/bad_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
)

func TestFraudProofValidation(t *testing.T) {
dag := mdutils.Mock()
bServ := mdutils.Bserv()
eds := ipld.RandEDS(t, 2)

shares := ipld.ExtractEDS(eds)
copy(shares[3][8:], shares[4][8:])
eds, err := ipld.ImportShares(context.Background(), shares, dag)
eds, err := ipld.ImportShares(context.Background(), shares, bServ)
require.NoError(t, err)
da := da.NewDataAvailabilityHeader(eds)
r := ipld.NewRetriever(dag)
r := ipld.NewRetriever(bServ)
_, err = r.Retrieve(context.Background(), &da)
var errByz *ipld.ErrByzantine
require.True(t, errors.As(err, &errByz))
Expand Down
8 changes: 4 additions & 4 deletions header/core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"

tmbytes "github.com/tendermint/tendermint/libs/bytes"
Expand All @@ -18,14 +18,14 @@ var log = logging.Logger("header/core")

type Exchange struct {
fetcher *core.BlockFetcher
shareStore format.DAGService
shareStore blockservice.BlockService
construct header.ConstructFn
}

func NewExchange(fetcher *core.BlockFetcher, dag format.DAGService, construct header.ConstructFn) *Exchange {
func NewExchange(fetcher *core.BlockFetcher, bServ blockservice.BlockService, construct header.ConstructFn) *Exchange {
return &Exchange{
fetcher: fetcher,
shareStore: dag,
shareStore: bServ,
construct: construct,
}
}
Expand Down
2 changes: 1 addition & 1 deletion header/core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
t.Cleanup(cancel)

fetcher := createCoreFetcher(ctx, t)
store := mdutils.Mock()
store := mdutils.Bserv()

// generate 10 blocks
generateBlocks(t, fetcher)
Expand Down
10 changes: 5 additions & 5 deletions header/core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/types"

Expand All @@ -22,21 +22,21 @@ import (
type Listener struct {
bcast header.Broadcaster
fetcher *core.BlockFetcher
dag format.DAGService
bServ blockservice.BlockService
construct header.ConstructFn
cancel context.CancelFunc
}

func NewListener(
bcast header.Broadcaster,
fetcher *core.BlockFetcher,
dag format.DAGService,
bServ blockservice.BlockService,
construct header.ConstructFn,
) *Listener {
return &Listener{
bcast: bcast,
fetcher: fetcher,
dag: dag,
bServ: bServ,
construct: construct,
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
return
}

eh, err := cl.construct(ctx, b, comm, vals, cl.dag)
eh, err := cl.construct(ctx, b, comm, vals, cl.bServ)
if err != nil {
log.Errorw("listener: making extended header", "err", err)
return
Expand Down
2 changes: 1 addition & 1 deletion header/core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ func createListener(
require.NoError(t, err)
})

return NewListener(p2pSub, fetcher, mdutils.Mock(), header.MakeExtendedHeader)
return NewListener(p2pSub, fetcher, mdutils.Bserv(), header.MakeExtendedHeader)
}
6 changes: 3 additions & 3 deletions header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"context"
"fmt"

"github.com/ipfs/go-blockservice"
logging "github.com/ipfs/go-log/v2"

format "github.com/ipfs/go-ipld-format"
bts "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -43,15 +43,15 @@ func MakeExtendedHeader(
b *core.Block,
comm *core.Commit,
vals *core.ValidatorSet,
dag format.NodeAdder,
bServ blockservice.BlockService,
) (*ExtendedHeader, error) {
var dah DataAvailabilityHeader
if len(b.Txs) > 0 {
namespacedShares, _, err := b.Data.ComputeShares(b.OriginalSquareSize)
if err != nil {
return nil, err
}
extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), dag)
extended, err := ipld.AddShares(ctx, namespacedShares.RawShares(), bServ)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion header/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
_, client := core.StartTestClient(ctx, t)
fetcher := core.NewBlockFetcher(client)

store := mdutils.Mock()
store := mdutils.Bserv()

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-blockservice"
pubsub "github.com/libp2p/go-libp2p-pubsub"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
core "github.com/tendermint/tendermint/types"
Expand All @@ -17,7 +17,7 @@ type ConstructFn = func(
*core.Block,
*core.Commit,
*core.ValidatorSet,
format.NodeAdder,
blockservice.BlockService,
) (*ExtendedHeader, error)

// Validator aliases a func that validates ExtendedHeader.
Expand Down
20 changes: 14 additions & 6 deletions ipld/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"

"github.com/ipfs/go-blockservice"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/nmt"
Expand All @@ -13,15 +14,19 @@ import (
"github.com/tendermint/tendermint/pkg/wrapper"
)

// AddShares erasures and extends shares to IPLD DAG using the provided ipld.NodeAdder.
func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) {
// AddShares erasures and extends shares to blockservice.BlockService using the provided ipld.NodeAdder.
func AddShares(
ctx context.Context,
shares []Share,
adder blockservice.BlockService,
) (*rsmt2d.ExtendedDataSquare, error) {
if len(shares) == 0 {
return nil, fmt.Errorf("empty data") // empty block is not an empty Data
}
squareSize := int(math.Sqrt(float64(len(shares))))
// create nmt adder wrapping batch adder with calculated size
bs := batchSize(squareSize * 2)
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, adder, ipld.MaxSizeBatchOption(bs)))
batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs))
// create the nmt wrapper to generate row and col commitments
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize), nmt.NodeVisitor(batchAdder.Visit))
// recompute the eds
Expand All @@ -35,15 +40,18 @@ func AddShares(ctx context.Context, shares []Share, adder ipld.NodeAdder) (*rsmt
return eds, batchAdder.Commit()
}

// ImportShares imports flattend chunks of data into Extended Data square and saves it in IPLD DAG
func ImportShares(ctx context.Context, shares [][]byte, na ipld.NodeAdder) (*rsmt2d.ExtendedDataSquare, error) {
// ImportShares imports flattend chunks of data into Extended Data square and saves it in blockservice.BlockService
func ImportShares(
ctx context.Context,
shares [][]byte,
adder blockservice.BlockService) (*rsmt2d.ExtendedDataSquare, error) {
if len(shares) == 0 {
return nil, fmt.Errorf("ipld: importing empty data")
}
squareSize := int(math.Sqrt(float64(len(shares))))
// create nmt adder wrapping batch adder with calculated size
bs := batchSize(squareSize * 2)
batchAdder := NewNmtNodeAdder(ctx, ipld.NewBatch(ctx, na, ipld.MaxSizeBatchOption(bs)))
batchAdder := NewNmtNodeAdder(ctx, adder, ipld.MaxSizeBatchOption(bs))
// create the nmt wrapper to generate row and col commitments
tree := wrapper.NewErasuredNamespacedMerkleTree(uint64(squareSize/2), nmt.NodeVisitor(batchAdder.Visit))
// recompute the eds
Expand Down
Loading