Skip to content

Commit

Permalink
da: replace go-cnc with celestia-openrpc (cosmos#986)
Browse files Browse the repository at this point in the history
## Overview

This PR replaces `go-cnc` with `celestia-openrpc` for the client. Fixes
cosmos#979

## Checklist

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
tuxcanfly authored Jul 6, 2023
1 parent 9a80ef7 commit db5390c
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 279 deletions.
111 changes: 77 additions & 34 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package celestia

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"time"

"cosmossdk.io/math"
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"

"github.com/celestiaorg/go-cnc"

openrpc "github.com/rollkit/celestia-openrpc"
"github.com/rollkit/celestia-openrpc/types/blob"
"github.com/rollkit/celestia-openrpc/types/share"

openrpcns "github.com/rollkit/celestia-openrpc/types/namespace"
"github.com/rollkit/rollkit/da"
"github.com/rollkit/rollkit/log"
"github.com/rollkit/rollkit/types"
Expand All @@ -22,30 +25,32 @@ import (

// DataAvailabilityLayerClient use celestia-node public API.
type DataAvailabilityLayerClient struct {
_ *openrpc.Client
client *cnc.Client
rpc *openrpc.Client

namespaceID types.NamespaceID
config Config
logger log.Logger
namespace openrpcns.Namespace
config Config
logger log.Logger
}

var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
var _ da.BlockRetriever = &DataAvailabilityLayerClient{}

// Config stores Celestia DALC configuration parameters.
type Config struct {
BaseURL string `json:"base_url"`
Timeout time.Duration `json:"timeout"`
Fee int64 `json:"fee"`
GasLimit uint64 `json:"gas_limit"`
AuthToken string `json:"auth_token"`
BaseURL string `json:"base_url"`
Timeout time.Duration `json:"timeout"`
Fee int64 `json:"fee"`
GasLimit uint64 `json:"gas_limit"`
}

// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(
namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger,
) error {
c.namespaceID = namespaceID
func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error {
namespace, err := share.NewBlobNamespaceV0(namespaceID[:])
if err != nil {
return err
}
c.namespace = namespace.ToAppNamespace()
c.logger = logger

if len(config) > 0 {
Expand All @@ -59,7 +64,7 @@ func (c *DataAvailabilityLayerClient) Init(
func (c *DataAvailabilityLayerClient) Start() error {
c.logger.Info("starting Celestia Data Availability Layer Client", "baseURL", c.config.BaseURL)
var err error
c.client, err = cnc.NewClient(c.config.BaseURL, cnc.WithTimeout(c.config.Timeout))
c.rpc, err = openrpc.NewClient(context.Background(), c.config.BaseURL, c.config.AuthToken)
return err
}

Expand All @@ -71,7 +76,17 @@ func (c *DataAvailabilityLayerClient) Stop() error {

// SubmitBlock submits a block to DA layer.
func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
blob, err := block.MarshalBinary()
data, err := block.MarshalBinary()
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}

blockBlob, err := blob.NewBlobV0(c.namespace.Bytes(), data)
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Expand All @@ -81,8 +96,9 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
}
}

txResponse, err := c.client.SubmitPFB(ctx, c.namespaceID, blob, c.config.Fee, c.config.GasLimit)
blobs := []*blob.Blob{blockBlob}

txResponse, err := c.rpc.State.SubmitPayForBlob(ctx, math.NewInt(c.config.Fee), c.config.GasLimit, blobs)
if err != nil {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Expand All @@ -92,6 +108,10 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
}
}

c.logger.Debug("successfully submitted PayForBlob transaction",
"fee", c.config.Fee, "gasLimit", c.config.GasLimit,
"daHeight", txResponse.Height, "daTxHash", txResponse.TxHash)

if txResponse.Code != 0 {
return da.ResultSubmitBlock{
BaseResult: da.BaseResult{
Expand All @@ -111,34 +131,56 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
}

// CheckBlockAvailability queries DA layer to check data availability of block at given height.
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(
ctx context.Context, dataLayerHeight uint64,
) da.ResultCheckBlock {
shares, err := c.client.NamespacedShares(ctx, c.namespaceID, dataLayerHeight)
code := dataRequestErrorToStatus(err)
if code != da.StatusSuccess {
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(ctx context.Context, dataLayerHeight uint64) da.ResultCheckBlock {
header, err := c.rpc.Header.GetByHeight(ctx, dataLayerHeight)
if err != nil {
return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}
if header.DAH == nil {
return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
DataAvailable: false,
}
}
err = c.rpc.Share.SharesAvailable(ctx, header.DAH)
if err != nil {
if strings.Contains(err.Error(), share.ErrNotAvailable.Error()) {
return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
DataAvailable: false,
}
}
return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}

return da.ResultCheckBlock{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
DataAvailable: len(shares) > 0,
DataAvailable: true,
}
}

// RetrieveBlocks gets a batch of blocks from DA layer.
func (c *DataAvailabilityLayerClient) RetrieveBlocks(
ctx context.Context, dataLayerHeight uint64,
) da.ResultRetrieveBlocks {
data, err := c.client.NamespacedData(ctx, c.namespaceID, dataLayerHeight)
func (c *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, dataLayerHeight uint64) da.ResultRetrieveBlocks {
c.logger.Debug("trying to retrieve blob using Blob.GetAll", "daHeight", dataLayerHeight, "namespace", hex.EncodeToString(c.namespace.Bytes()))
blobs, err := c.rpc.Blob.GetAll(ctx, dataLayerHeight, []share.Namespace{c.namespace.Bytes()})
status := dataRequestErrorToStatus(err)
if status != da.StatusSuccess {
return da.ResultRetrieveBlocks{
Expand All @@ -149,10 +191,10 @@ func (c *DataAvailabilityLayerClient) RetrieveBlocks(
}
}

blocks := make([]*types.Block, len(data))
for i, msg := range data {
blocks := make([]*types.Block, len(blobs))
for i, blob := range blobs {
var block pb.Block
err = proto.Unmarshal(msg, &block)
err = proto.Unmarshal(blob.Data, &block)
if err != nil {
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
Expand Down Expand Up @@ -187,7 +229,8 @@ func dataRequestErrorToStatus(err error) da.StatusCode {
strings.Contains(err.Error(), da.ErrNamespaceNotFound.Error()):
return da.StatusSuccess
case strings.Contains(err.Error(), da.ErrDataNotFound.Error()),
strings.Contains(err.Error(), da.ErrEDSNotFound.Error()):
strings.Contains(err.Error(), da.ErrEDSNotFound.Error()),
strings.Contains(err.Error(), da.ErrBlobNotFound.Error()):
return da.StatusNotFound
default:
return da.StatusError
Expand Down
93 changes: 0 additions & 93 deletions da/celestia/mock/messages.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,8 @@
package mock

import (
"bytes"
"encoding/binary"
)

// This code is extracted from celestia-app. It's here to build shares from messages (serialized blocks).
// TODO(tzdybal): if we stop using `/namespaced_shares` we can get rid of this file.

const (
shareSize = 256
namespaceSize = 8
msgShareSize = shareSize - namespaceSize
)

// splitMessage breaks the data in a message into the minimum number of
// namespaced shares
func splitMessage(rawData []byte, nid []byte) []NamespacedShare {
shares := make([]NamespacedShare, 0)
firstRawShare := append(append(
make([]byte, 0, shareSize),
nid...),
rawData[:msgShareSize]...,
)
shares = append(shares, NamespacedShare{firstRawShare, nid})
rawData = rawData[msgShareSize:]
for len(rawData) > 0 {
shareSizeOrLen := min(msgShareSize, len(rawData))
rawShare := append(append(
make([]byte, 0, shareSize),
nid...),
rawData[:shareSizeOrLen]...,
)
paddedShare := zeroPadIfNecessary(rawShare, shareSize)
share := NamespacedShare{paddedShare, nid}
shares = append(shares, share)
rawData = rawData[shareSizeOrLen:]
}
return shares
}

// Share contains the raw share data without the corresponding namespace.
type Share []byte

Expand All @@ -48,59 +11,3 @@ type NamespacedShare struct {
Share
ID []byte
}

func min(a, b int) int {
if a <= b {
return a
}
return b
}

func zeroPadIfNecessary(share []byte, width int) []byte {
oldLen := len(share)
if oldLen < width {
missingBytes := width - oldLen
padByte := []byte{0}
padding := bytes.Repeat(padByte, missingBytes)
share = append(share, padding...)
return share
}
return share
}

// marshalDelimited marshals the raw data (excluding the namespace) of this
// message and prefixes it with the length of that encoding.
func marshalDelimited(data []byte) ([]byte, error) {
lenBuf := make([]byte, binary.MaxVarintLen64)
length := uint64(len(data))
n := binary.PutUvarint(lenBuf, length)
return append(lenBuf[:n], data...), nil
}

// appendToShares appends raw data as shares.
// Used to build shares from blocks/messages.
func appendToShares(shares []NamespacedShare, nid []byte, rawData []byte) []NamespacedShare {
if len(rawData) <= msgShareSize {
rawShare := append(append(
make([]byte, 0, len(nid)+len(rawData)),
nid...),
rawData...,
)
paddedShare := zeroPadIfNecessary(rawShare, shareSize)
share := NamespacedShare{paddedShare, nid}
shares = append(shares, share)
} else { // len(rawData) > msgShareSize
shares = append(shares, splitMessage(rawData, nid)...)
}
return shares
}

type namespacedSharesResponse struct {
Shares []Share `json:"shares"`
Height uint64 `json:"height"`
}

type namespacedDataResponse struct {
Data [][]byte `json:"data"`
Height uint64 `json:"height"`
}
Loading

0 comments on commit db5390c

Please sign in to comment.