Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): Implement time based handle transaction #1942

Merged
merged 18 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const (
// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

// defaultTxnBatchSize is the default size for the transaction batch
defaultTxnBatchSize = 100
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
)

Expand Down Expand Up @@ -98,7 +97,7 @@ type Config struct {

batchSize int // internal option

// SlotDuration is slot duration to produce block in milliseconds
// SlotDuration is slot duration to produce block
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
SlotDuration time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
}
err := st.Close()
if err != nil {
logger.Tracef("Failed to close stream", "protocol", pID, "error", err)
logger.Tracef("Failed to close stream for protocol %s: %s", pID, err)
}
}
}
Expand Down
79 changes: 42 additions & 37 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var (
_ NotificationsMessage = &transactionHandshake{}
)

// txnBatchChTimeout is the timeout for adding a transaction to the batch processing channel
const txnBatchChTimeout = time.Millisecond * 200

// TransactionMessage is a network message that is sent to notify of new transactions entering the network
type TransactionMessage struct {
Extrinsics []types.Extrinsic
Expand Down Expand Up @@ -107,58 +110,60 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
return &transactionHandshake{}, nil
}

func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler {
go func() {
protocolID := s.host.protocolID + transactionsID
ticker := time.NewTicker(s.cfg.SlotDuration)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
timeOut := time.NewTimer(s.cfg.SlotDuration / 3)
var completed bool
for !completed {
select {
case <-timeOut.C:
completed = true
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}

if s.noGossip || !propagate {
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
func (s *Service) startTxnBatchProcessing(txnBatchCh chan *BatchMessage) {
protocolID := s.host.protocolID + transactionsID
ticker := time.NewTicker(s.cfg.SlotDuration)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
timeOut := time.NewTimer(s.cfg.SlotDuration / 3)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
var completed bool
for !completed {
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-timeOut.C:
completed = true
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}

if s.noGossip || !propagate {
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
}
}
}()
}
}

func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler {
go s.startTxnBatchProcessing(txnBatchCh)

return func(peer peer.ID, msg NotificationsMessage) {
data := &BatchMessage{
msg: msg,
peer: peer,
}

timeOut := time.NewTimer(time.Millisecond * 200)
timer := time.NewTimer(txnBatchChTimeout)

select {
case txnBatchCh <- data:
if !timeOut.Stop() {
<-timeOut.C
if !timer.Stop() {
<-timer.C
}
case <-timeOut.C:
logger.Debugf("transaction message not included into batch", "peer", peer.String(), "msg", msg.String())
case <-timer.C:
logger.Debugf("transaction message %s for peer %s not included into batch", msg, peer)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi

slotDuration, err := stateSrvc.Epoch.GetSlotDuration()
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot get slot duration: %w", err)
}

// network service configuation
Expand Down