Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix closing batch + logs #2348

Merged
merged 7 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
EventID_FinalizerRestart EventID = "FINALIZER RESTART"
// EventID_FinalizerBreakEvenGasPriceBigDifference is triggered when the finalizer recalculates the break even gas price and detects a big difference
EventID_FinalizerBreakEvenGasPriceBigDifference EventID = "FINALIZER BREAK EVEN GAS PRICE BIG DIFFERENCE"
// EventID_SynchonizerRestart is triggered when the Synchonizer restarts
EventID_SynchonizerRestart EventID = "SYNCHRONIZER RESTART"
// EventID_SynchronizerRestart is triggered when the Synchonizer restarts
EventID_SynchronizerRestart EventID = "SYNCHRONIZER RESTART"
// Source_Node is the source of the event
Source_Node Source = "node"

Expand Down
70 changes: 57 additions & 13 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,8 +1251,22 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx

// Find txs to be processed and included in the trusted state
if *s.trustedState.lastStateRoot == batches[1].StateRoot {
// Delete txs that were stored before restart. We need to reprocess all txs because the intermediary stateRoot is only store in memory
ARR552 marked this conversation as resolved.
Show resolved Hide resolved
err := s.state.ResetTrustedState(s.ctx, uint64(trustedBatch.Number)-1, dbTx)
if err != nil {
log.Error("error resetting trusted state. Error: ", err)
return nil, nil, err
}
// All txs need to be processed
request.Transactions = trustedBatchL2Data
// Reopen batch
err = s.openBatch(trustedBatch, dbTx)
if err != nil {
log.Error("error openning batch. Error: ", err)
return nil, nil, err
}
request.GlobalExitRoot = trustedBatch.GlobalExitRoot
request.Transactions = trustedBatchL2Data
} else {
// Only new txs need to be processed
storedTxs, syncedTxs, _, syncedEfficiencyPercentages, err := s.decodeTxs(trustedBatchL2Data, batches)
Expand Down Expand Up @@ -1290,7 +1304,30 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx
log.Errorf("error closing batch %d", trustedBatch.Number)
return nil, nil, err
} else {
log.Warnf("CASE 02: the batch [%d] looks like were not close but in STATE was closed", trustedBatch.Number)
log.Warnf("CASE 02: the batch [%d] was already closed", trustedBatch.Number)
log.Info("batches[0].BatchNumber: ", batches[0].BatchNumber)
log.Info("batches[0].AccInputHash: ", batches[0].AccInputHash)
log.Info("batches[0].StateRoot: ", batches[0].StateRoot)
log.Info("batches[0].LocalExitRoot: ", batches[0].LocalExitRoot)
log.Info("batches[0].GlobalExitRoot: ", batches[0].GlobalExitRoot)
log.Info("batches[0].Coinbase: ", batches[0].Coinbase)
log.Info("batches[0].ForcedBatchNum: ", batches[0].ForcedBatchNum)
log.Info("####################################")
log.Info("batches[1].BatchNumber: ", batches[1].BatchNumber)
log.Info("batches[1].AccInputHash: ", batches[1].AccInputHash)
log.Info("batches[1].StateRoot: ", batches[1].StateRoot)
log.Info("batches[1].LocalExitRoot: ", batches[1].LocalExitRoot)
log.Info("batches[1].GlobalExitRoot: ", batches[1].GlobalExitRoot)
log.Info("batches[1].Coinbase: ", batches[1].Coinbase)
log.Info("batches[1].ForcedBatchNum: ", batches[1].ForcedBatchNum)
log.Info("###############################")
log.Info("trustedBatch.BatchNumber: ", trustedBatch.Number)
log.Info("trustedBatch.AccInputHash: ", trustedBatch.AccInputHash)
log.Info("trustedBatch.StateRoot: ", trustedBatch.StateRoot)
log.Info("trustedBatch.LocalExitRoot: ", trustedBatch.LocalExitRoot)
log.Info("trustedBatch.GlobalExitRoot: ", trustedBatch.GlobalExitRoot)
log.Info("trustedBatch.Coinbase: ", trustedBatch.Coinbase)
log.Info("trustedBatch.ForcedBatchNum: ", trustedBatch.ForcedBatchNumber)
}
}
batches[0].AccInputHash = trustedBatch.AccInputHash
Expand Down Expand Up @@ -1345,9 +1382,16 @@ func (s *ClientSynchronizer) processTrustedBatch(trustedBatch *types.Batch, dbTx
log.Errorf("error closing batch %d", trustedBatch.Number)
return nil, nil, err
} else {
log.Warnf("CASE 01: the batch [%d] looks like were not close but in STATE was closed", trustedBatch.Number)
log.Warnf("CASE 01: batch [%d] was already closed", trustedBatch.Number)
}
}
log.Info("Batch closed right after processing some tx")
if batches[0] != nil {
log.Debug("Updating batches[0] values...")
batches[0].AccInputHash = trustedBatch.AccInputHash
batches[0].StateRoot = trustedBatch.StateRoot
batches[0].LocalExitRoot = trustedBatch.LocalExitRoot
}
}

log.Infof("Batch %v synchronized", trustedBatch.Number)
Expand Down Expand Up @@ -1459,13 +1503,13 @@ func checkIfSynced(batches []*state.Batch, trustedBatch *types.Batch) bool {
matchCoinbase && matchTimestamp && matchL2Data {
return true
}
log.Info("matchNumber", matchNumber)
log.Info("matchGER", matchGER)
log.Info("matchLER", matchLER)
log.Info("matchSR", matchSR)
log.Info("matchCoinbase", matchCoinbase)
log.Info("matchTimestamp", matchTimestamp)
log.Info("matchL2Data", matchL2Data)
log.Infof("matchNumber %v %d %d", matchNumber, batches[0].BatchNumber, uint64(trustedBatch.Number))
log.Infof("matchGER %v %s %s", matchGER, batches[0].GlobalExitRoot.String(), trustedBatch.GlobalExitRoot.String())
log.Infof("matchLER %v %s %s", matchLER, batches[0].LocalExitRoot.String(), trustedBatch.LocalExitRoot.String())
log.Infof("matchSR %v %s %s", matchSR, batches[0].StateRoot.String(), trustedBatch.StateRoot.String())
log.Infof("matchCoinbase %v %s %s", matchCoinbase, batches[0].Coinbase.String(), trustedBatch.Coinbase.String())
log.Infof("matchTimestamp %v %d %d", matchTimestamp, uint64(batches[0].Timestamp.Unix()), uint64(trustedBatch.Timestamp))
log.Infof("matchL2Data %v", matchL2Data)
return false
}

Expand Down Expand Up @@ -1514,8 +1558,8 @@ func (s *ClientSynchronizer) updateAndCheckProverID(proverID string) {
Source: event.Source_Node,
Component: event.Component_Synchronizer,
Level: event.Level_Critical,
EventID: event.EventID_SynchonizerRestart,
Description: fmt.Sprintf("proverID changed from %s to %s, restarting Synchonizer ", s.proverID, proverID),
EventID: event.EventID_SynchronizerRestart,
Description: fmt.Sprintf("proverID changed from %s to %s, restarting Synchronizer ", s.proverID, proverID),
}

err := s.eventLog.LogEvent(context.Background(), event)
Expand Down Expand Up @@ -1547,7 +1591,7 @@ func (s *ClientSynchronizer) checkFlushID(dbTx pgx.Tx) error {
s.updateAndCheckProverID(proverID)
log.Debugf("storedFlushID (executor reported): %d, latestFlushID (pending): %d", storedFlushID, s.latestFlushID)
if storedFlushID < s.latestFlushID {
log.Infof("Synchornized BLOCKED!: Wating for the flushID to be stored. FlushID to be stored: %d. Latest flushID stored: %d", s.latestFlushID, storedFlushID)
log.Infof("Synchronized BLOCKED!: Wating for the flushID to be stored. FlushID to be stored: %d. Latest flushID stored: %d", s.latestFlushID, storedFlushID)
iteration := 0
start := time.Now()
for storedFlushID < s.latestFlushID {
Expand All @@ -1561,7 +1605,7 @@ func (s *ClientSynchronizer) checkFlushID(dbTx pgx.Tx) error {
}
iteration++
}
log.Infof("Synchornizer resumed, flushID stored: %d", s.latestFlushID)
log.Infof("Synchronizer resumed, flushID stored: %d", s.latestFlushID)
}
log.Infof("Pending Flushid fullfiled: %d, executor have write %d", s.latestFlushID, storedFlushID)
s.latestFlushIDIsFulfilled = true
Expand Down
17 changes: 17 additions & 0 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,23 @@ func expectedCallsForsyncTrustedState(t *testing.T, m *mocks, sync *ClientSynchr
Return(&stateBatchInPermissionLess, nil).
Once()
}

m.State.
On("ResetTrustedState", sync.ctx, batchNumber-1, m.DbTx).
Return(nil).
Once()

processCtx := state.ProcessingContext{
BatchNumber: uint64(batchInTrustedNode.Number),
Coinbase: common.HexToAddress(batchInTrustedNode.Coinbase.String()),
Timestamp: time.Unix(int64(batchInTrustedNode.Timestamp), 0),
GlobalExitRoot: batchInTrustedNode.GlobalExitRoot,
}
m.State.
On("OpenBatch", sync.ctx, processCtx, m.DbTx).
Return(nil).
Once()

m.State.
On("UpdateBatchL2Data", sync.ctx, batchNumber, stateBatchInTrustedNode.BatchL2Data, mock.Anything).
Return(nil).
Expand Down