Skip to content

Commit

Permalink
Implement InboundTracker; minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Oct 31, 2024
1 parent 6ef5c57 commit 1a9d185
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 60 deletions.
100 changes: 85 additions & 15 deletions zetaclient/chains/ton/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,37 @@ import (
)

const (
// MaxTransactionsPerTick is the maximum number of transactions to process on a ticker
MaxTransactionsPerTick = 100
// maximum number of transactions to process on a ticker
maxTransactionsPerTick = 100
logSampleRate = 10
)

// watchInbound watches for new txs to Gateway's account.
func (ob *Observer) watchInbound(ctx context.Context) error {
return ob.inboundTicker(ctx, "WatchInbound", ob.observeGateway)
}

func (ob *Observer) watchInboundTracker(ctx context.Context) error {
return ob.inboundTicker(ctx, "WatchInboundTracker", ob.processInboundTrackers)
}

func (ob *Observer) inboundTicker(ctx context.Context, taskName string, taskFunc func(context.Context) error) error {
app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

var (
initialInterval = ticker.DurationFromUint64Seconds(ob.ChainParams().InboundTicker)
sampledLogger = ob.Logger().Inbound.Sample(&zerolog.BasicSampler{N: 10})
)

ob.Logger().Inbound.Info().Msgf("WatchInbound started")
initialInterval := ticker.DurationFromUint64Seconds(ob.ChainParams().InboundTicker)
sampledLogger := ob.Logger().Inbound.Sample(&zerolog.BasicSampler{N: logSampleRate})

task := func(ctx context.Context, t *ticker.Ticker) error {
if !app.IsInboundObservationEnabled() {
sampledLogger.Info().Msg("WatchInbound: inbound observation is disabled")
sampledLogger.Info().Msgf("%s: inbound observation is disabled", taskName)
return nil
}

if err := ob.observeGateway(ctx); err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeGateway error")
if err := taskFunc(ctx); err != nil {
ob.Logger().Inbound.Err(err).Msgf("%s failed", taskName)
}

newInterval := ticker.DurationFromUint64Seconds(ob.ChainParams().InboundTicker)
Expand All @@ -58,7 +63,7 @@ func (ob *Observer) watchInbound(ctx context.Context) error {
initialInterval,
task,
ticker.WithStopChan(ob.StopChannel()),
ticker.WithLogger(ob.Logger().Inbound, "WatchInbound"),
ticker.WithLogger(ob.Logger().Inbound, taskName),
)
}

Expand Down Expand Up @@ -86,11 +91,11 @@ func (ob *Observer) observeGateway(ctx context.Context) error {
case len(txs) == 0:
// noop
return nil
case len(txs) > MaxTransactionsPerTick:
case len(txs) > maxTransactionsPerTick:
ob.Logger().Inbound.Info().
Msgf("observeGateway: got %d transactions. Taking first %d", len(txs), MaxTransactionsPerTick)
Msgf("observeGateway: got %d transactions. Taking first %d", len(txs), maxTransactionsPerTick)

txs = txs[:MaxTransactionsPerTick]
txs = txs[:maxTransactionsPerTick]
default:
ob.Logger().Inbound.Info().Msgf("observeGateway: got %d transactions", len(txs))
}
Expand Down Expand Up @@ -156,6 +161,63 @@ func (ob *Observer) observeGateway(ctx context.Context) error {
return nil
}

// processInboundTrackers handles adhoc trackers that were somehow missed by
func (ob *Observer) processInboundTrackers(ctx context.Context) error {
trackers, err := ob.ZetacoreClient().GetInboundTrackersForChain(ctx, ob.Chain().ChainId)
if err != nil {
return errors.Wrap(err, "unable to get inbound trackers")
}

// noop
if len(trackers) == 0 {
return nil
}

gatewayAccountID := ob.gateway.AccountID()

// a single error should not block other trackers
for _, tracker := range trackers {
txHash := tracker.TxHash

lt, hash, err := liteapi.TransactionHashFromString(txHash)
if err != nil {
ob.logSkippedTracker(txHash, "unable_to_parse_hash", err)
continue
}

raw, err := ob.client.GetTransaction(ctx, gatewayAccountID, lt, hash)
if err != nil {
ob.logSkippedTracker(txHash, "unable_to_get_tx", err)
continue
}

tx, err := ob.gateway.ParseTransaction(raw)

switch {
case errors.Is(err, toncontracts.ErrParse) || errors.Is(err, toncontracts.ErrUnknownOp):
ob.logSkippedTracker(txHash, "unrelated_tx", err)
continue
case err != nil:
// should not happen
ob.logSkippedTracker(txHash, "unexpected_error", err)
continue
case tx.ExitCode != 0:
ob.logSkippedTracker(txHash, "failed_tx", nil)
continue
case tx.IsOutbound():
ob.logSkippedTracker(txHash, "outbound_tx", nil)
continue
}

if _, err := ob.voteInbound(ctx, tx); err != nil {
ob.logSkippedTracker(txHash, "vote_failed", err)
continue
}
}

return nil
}

// Sends PostVoteInbound to zetacore
func (ob *Observer) voteInbound(ctx context.Context, tx *toncontracts.Transaction) (string, error) {
// noop
Expand Down Expand Up @@ -283,6 +345,14 @@ func (ob *Observer) setLastScannedTX(tx *toncontracts.Transaction) {
Msg("setLastScannedTX: WriteLastTxScannedToDB")
}

func (ob *Observer) logSkippedTracker(hash string, reason string, err error) {
ob.Logger().Inbound.Warn().
Str("transaction.hash", hash).
Str("skip_reason", reason).
Err(err).
Msg("Skipping tracker")
}

func txLogFields(tx *toncontracts.Transaction) map[string]any {
return map[string]any{
"transaction.hash": liteapi.TransactionToHashString(tx.Transaction),
Expand Down
Loading

0 comments on commit 1a9d185

Please sign in to comment.