Skip to content

Commit

Permalink
Merge pull request #27 from alex-semenyuk/main
Browse files Browse the repository at this point in the history
Fix issues reported by go report
  • Loading branch information
denisandreenko authored Dec 12, 2023
2 parents 1094048 + d85cbda commit a2233eb
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 53 deletions.
130 changes: 77 additions & 53 deletions internal/tezos/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,39 +87,15 @@ func (bl *blockListener) listenLoop() {
failCount := 0
gapPotential := true
for {
if failCount > 0 {
if bl.c.doFailureDelay(bl.ctx, failCount) {
log.L(bl.ctx).Debugf("Block listener loop exiting")
return
}
} else {
// Sleep for the polling interval
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return
}
if isSuccessfullyHandled := bl.handleFailCount(failCount); !isSuccessfullyHandled {
return
}

// (re)connect
if mon == nil {
mon = rpc.NewBlockHeaderMonitor()

// register the block monitor with our client
if err := bl.c.client.MonitorBlockHeader(bl.ctx, mon); err != nil {
mon.Close()
mon = nil
if ErrorStatus(err) == 404 {
log.L(bl.ctx).Errorf("monitor: event mode unsupported. %s", err.Error())
} else {
log.L(bl.ctx).Debugf("monitor: %s", err.Error())

<-bl.ctx.Done()
return
}
continue
}
isSuccessfullyReconnected, isGoingToNextBlockHeader := bl.reconnect(mon)
if !isSuccessfullyReconnected {
return
} else if isGoingToNextBlockHeader {
continue
}

// wait for new block headers
Expand All @@ -136,36 +112,84 @@ func (bl *blockListener) listenLoop() {
}

update := &ffcapi.BlockHashEvent{GapPotential: gapPotential}
var notifyPos *list.Element
notifyPos := bl.getNotifyPosition(blockHead)
bl.notifyAndUpdate(notifyPos, update)

// Reset retry count when we have a full successful loop
failCount = 0
gapPotential = false
}
}

candidate := bl.reconcileCanonicalChain(blockHead)
// Check this is the lowest position to notify from
if candidate != nil && (notifyPos == nil || candidate.Value.(*minimalBlockInfo).number < notifyPos.Value.(*minimalBlockInfo).number) {
notifyPos = candidate
func (bl *blockListener) handleFailCount(failCount int) bool {
if failCount > 0 {
if bl.c.doFailureDelay(bl.ctx, failCount) {
log.L(bl.ctx).Debugf("Block listener loop exiting")
return false
}
} else {
// Sleep for the polling interval
select {
case <-time.After(bl.blockPollingInterval):
case <-bl.ctx.Done():
log.L(bl.ctx).Debugf("Block listener loop stopping")
return false
}
}
return true
}

if notifyPos != nil {
// We notify for all hashes from the point of change in the chain onwards
for notifyPos != nil {
update.BlockHashes = append(update.BlockHashes, notifyPos.Value.(*minimalBlockInfo).hash)
notifyPos = notifyPos.Next()
}
func (bl *blockListener) reconnect(mon *rpc.BlockHeaderMonitor) (bool, bool) {
if mon == nil {
mon = rpc.NewBlockHeaderMonitor()

// Take a copy of the consumers in the lock
bl.mux.Lock()
consumers := make([]*blockUpdateConsumer, 0, len(bl.consumers))
for _, c := range bl.consumers {
consumers = append(consumers, c)
// register the block monitor with our client
if err := bl.c.client.MonitorBlockHeader(bl.ctx, mon); err != nil {
mon.Close()
mon = nil
if ErrorStatus(err) == 404 {
log.L(bl.ctx).Errorf("monitor: event mode unsupported. %s", err.Error())
} else {
log.L(bl.ctx).Debugf("monitor: %s", err.Error())

<-bl.ctx.Done()
return true, false
}
bl.mux.Unlock()
return false, true
}
}
return false, false
}

func (bl *blockListener) getNotifyPosition(blockHead *rpc.BlockHeaderLogEntry) *list.Element {
var notifyPos *list.Element

candidate := bl.reconcileCanonicalChain(blockHead)
// Check this is the lowest position to notify from
if candidate != nil && (notifyPos == nil || candidate.Value.(*minimalBlockInfo).number < notifyPos.Value.(*minimalBlockInfo).number) {
notifyPos = candidate
}
return notifyPos
}

// Spin through delivering the block update
bl.dispatchToConsumers(consumers, update)
func (bl *blockListener) notifyAndUpdate(notifyPos *list.Element, update *ffcapi.BlockHashEvent) {
if notifyPos != nil {
// We notify for all hashes from the point of change in the chain onwards
for notifyPos != nil {
update.BlockHashes = append(update.BlockHashes, notifyPos.Value.(*minimalBlockInfo).hash)
notifyPos = notifyPos.Next()
}

// Reset retry count when we have a full successful loop
failCount = 0
gapPotential = false
// Take a copy of the consumers in the lock
bl.mux.Lock()
consumers := make([]*blockUpdateConsumer, 0, len(bl.consumers))
for _, c := range bl.consumers {
consumers = append(consumers, c)
}
bl.mux.Unlock()

// Spin through delivering the block update
bl.dispatchToConsumers(consumers, update)
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/tezos/get_block_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ func TestGetBlockInfoByNumberOK(t *testing.T) {
res, reason, err = c.BlockInfoByNumber(ctx, req) // cached
assert.NoError(t, err)
assert.Equal(t, "BMBeYrMJpLWrqCs7UTcFaUQCeWBqsjCLejX5D8zE8m9syHqHnZg", res.BlockHash)
assert.Equal(t, ffcapi.ErrorReason(""), reason)

req.ExpectedParentHash = "BMWDjzorc6GFb2DnengeB2TRikAENukebRwubnu6ghfZceicmig"
res, reason, err = c.BlockInfoByNumber(ctx, req) // cache miss
assert.NoError(t, err)
assert.Equal(t, "BMBeYrMJpLWrqCs7UTcFaUQCeWBqsjCLejX5D8zE8m9syHqHnZg", res.BlockHash)
assert.Equal(t, ffcapi.ErrorReason(""), reason)
}

func TestGetBlockInfoByNumberBlockNotFoundError(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions internal/tezos/tezos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestConnectorInit(t *testing.T) {
conf.Set(BlockchainRPC, "")
cc, err := NewTezosConnector(context.Background(), conf)
assert.Regexp(t, "FF23051", err)
assert.Nil(t, cc)

conf.Set(BlockchainRPC, "https://ghostnet.ecadinfra.com")
conf.Set(EventsCatchupThreshold, 1)
Expand All @@ -48,14 +49,17 @@ func TestConnectorInit(t *testing.T) {
conf.Set(BlockchainRPC, "wrong rpc")
cc, err = NewTezosConnector(context.Background(), conf)
assert.Regexp(t, "FF23052", err)
assert.Nil(t, cc)

conf.Set(ConfigDataFormat, "map")
conf.Set(BlockCacheSize, "-1")
cc, err = NewTezosConnector(context.Background(), conf)
assert.Regexp(t, "FF23040", err)
assert.Nil(t, cc)

conf.Set(BlockCacheSize, "1")
conf.Set(TxCacheSize, "-1")
cc, err = NewTezosConnector(context.Background(), conf)
assert.Regexp(t, "FF23040", err)
assert.Nil(t, cc)
}

0 comments on commit a2233eb

Please sign in to comment.