Skip to content

Commit

Permalink
Merge pull request #6247 from filecoin-project/fix/exchange
Browse files Browse the repository at this point in the history
fix: exchange: allow up to 10k messages per block
  • Loading branch information
LinZexiao authored Dec 11, 2023
2 parents c4c6137 + aa78ab0 commit 3255e76
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 127 deletions.
20 changes: 9 additions & 11 deletions pkg/net/exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package exchange

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -26,6 +25,10 @@ import (

var exchangeClientLogger = logging.Logger("exchange.client")

// Set the max exchange message size to 120MiB. Purely based on gas numbers, we can include ~8MiB of
// messages per block, so I've set this to 120MiB to be _very_ safe.
const maxExchangeMessageSize = (15 * 8) << 20

// client implements exchange.Client, using the libp2p ChainExchange protocol
// as the fetching mechanism.
type client struct {
Expand Down Expand Up @@ -426,20 +429,15 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *excha
}
_ = stream.SetWriteDeadline(time.Time{}) // clear deadline // FIXME: Needs
// its own API (https://github.com/libp2p/go-libp2p/core/issues/162).

// Read response.
_ = stream.SetReadDeadline(time.Time{})

// TODO Note: this will remove once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
respBytes, err := io.ReadAll(bufio.NewReader(NewInct(stream, ReadResMinSpeed, ReadResDeadline)))
if err != nil {
return nil, err
if err := stream.CloseWrite(); err != nil {
log.Warnw("CloseWrite err", "error", err)
}

// Read response, limiting the size of the response to maxExchangeMessageSize as we allow a
// lot of messages (10k+) but they'll mostly be quite small.
var res exchange.Response
err = cborutil.ReadCborRPC(
bytes.NewReader(respBytes),
bufio.NewReader(io.LimitReader(NewInct(stream, ReadResMinSpeed, ReadResDeadline), maxExchangeMessageSize)),
// bufio.NewReader(NewInct(stream, ReadResMinSpeed, ReadResDeadline)),
&res)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion venus-devtool/cborgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
types: []interface{}{
exchange.Request{},
exchange.Response{},
exchange.CompactedMessages{},
exchange.CompactedMessagesCBOR{},
exchange.BSTipSet{},
},
},
Expand Down
140 changes: 25 additions & 115 deletions venus-shared/libp2p/exchange/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions venus-shared/libp2p/exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type BSTipSet struct {
// FIXME: The logic to decompress this structure should belong
//
// to itself, not to the consumer.
//
// NOTE: Max messages is: BlockMessageLimit (10k) * MaxTipsetSize (15) = 150k
type CompactedMessages struct {
Bls []*types.Message
BlsIncludes [][]uint64
Expand Down
Loading

0 comments on commit 3255e76

Please sign in to comment.