Skip to content

Commit

Permalink
make streams lookup modular (#11368)
Browse files Browse the repository at this point in the history
* make streams lookup modular

* polish

* address comment to use pointer instead of array/map

* rebase

* get rid of slice
  • Loading branch information
shileiwill authored Dec 1, 2023
1 parent bb03a45 commit fa0f16a
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 666 deletions.
150 changes: 93 additions & 57 deletions core/scripts/chaincli/handler/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ import (

ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types"

evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21"

"github.com/smartcontractkit/chainlink/core/scripts/chaincli/config"
"github.com/smartcontractkit/chainlink/core/scripts/common"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1"
iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams"
"github.com/smartcontractkit/chainlink/v2/core/utils"
bigmath "github.com/smartcontractkit/chainlink/v2/core/utils/big_math"
Expand All @@ -36,12 +41,7 @@ import (
const (
ConditionTrigger uint8 = iota
LogTrigger

blockNumber = "blockNumber"
expectedTypeAndVersion = "KeeperRegistry 2.1.0"
feedIdHex = "feedIdHex"
feedIDs = "feedIDs"
timestamp = "timestamp"
)

var packer = encoding.NewAbiPacker()
Expand Down Expand Up @@ -125,6 +125,8 @@ func (k *Keeper) Debug(ctx context.Context, args []string) {
var checkResult iregistry21.CheckUpkeep
var blockNum uint64
var performData []byte
var workID [32]byte
var trigger ocr2keepers.Trigger
upkeepNeeded := false
// check upkeep
if triggerType == ConditionTrigger {
Expand Down Expand Up @@ -177,7 +179,8 @@ func (k *Keeper) Debug(ctx context.Context, args []string) {
}
// check that tx for this upkeep / tx was not already performed
message(fmt.Sprintf("LogTrigger{blockNum: %d, blockHash: %s, txHash: %s, logIndex: %d}", blockNum, receipt.BlockHash.Hex(), txHash, logIndex))
workID := mustUpkeepWorkID(upkeepID, blockNum, receipt.BlockHash, txHash, logIndex)
trigger = mustAutomationTrigger(txHash, logIndex, blockNum, receipt.BlockHash)
workID = mustUpkeepWorkID(upkeepID, trigger)
message(fmt.Sprintf("workID computed: %s", hex.EncodeToString(workID[:])))
hasKey, err := keeperRegistry21.HasDedupKey(latestCallOpts, workID)
if err != nil {
Expand Down Expand Up @@ -229,73 +232,82 @@ func (k *Keeper) Debug(ctx context.Context, args []string) {
if checkResult.UpkeepFailureReason != 0 {
message(fmt.Sprintf("checkUpkeep failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason))
}

if checkResult.UpkeepFailureReason == uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
// TODO use the new streams lookup lib
//mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}
//mercuryConfig := evm.NewMercuryConfig(mc, core.StreamsCompatibleABI)
//lggr, _ := logger.NewLogger()
//blockSub := &blockSubscriber{k.client}
//_ = streams.NewStreamsLookup(packer, mercuryConfig, blockSub, keeperRegistry21, k.rpcClient, lggr)
mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}
mercuryConfig := evm21.NewMercuryConfig(mc, core.StreamsCompatibleABI)
lggr, _ := logger.NewLogger()
blockSub := &blockSubscriber{k.client}
streams := streams.NewStreamsLookup(packer, mercuryConfig, blockSub, k.rpcClient, keeperRegistry21, lggr)

streamsLookupErr, err := packer.DecodeStreamsLookupRequest(checkResult.PerformData)
if err == nil {
message("upkeep reverted with StreamsLookup")
message(fmt.Sprintf("StreamsLookup data: {FeedParamKey: %s, Feeds: %v, TimeParamKey: %s, Time: %d, ExtraData: %s}", streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time.Uint64(), hexutil.Encode(streamsLookupErr.ExtraData)))
if streamsLookupErr.FeedParamKey == feedIdHex && streamsLookupErr.TimeParamKey == blockNumber {

streamsLookup := &mercury.StreamsLookup{
StreamsLookupError: &mercury.StreamsLookupError{
FeedParamKey: streamsLookupErr.FeedParamKey,
Feeds: streamsLookupErr.Feeds,
TimeParamKey: streamsLookupErr.TimeParamKey,
Time: streamsLookupErr.Time,
ExtraData: streamsLookupErr.ExtraData,
},
UpkeepId: upkeepID,
Block: blockNum,
}

if streamsLookup.IsMercuryV02() {
message("using mercury lookup v0.2")
// handle v0.2
cfg, err := keeperRegistry21.GetUpkeepPrivilegeConfig(triggerCallOpts, upkeepID)
// check if upkeep is allowed to use mercury v0.2
_, _, _, allowed, err := streams.AllowedToUseMercury(latestCallOpts, upkeepID)
if err != nil {
failUnknown("failed to get upkeep privilege config ", err)
}
allowed := false
if len(cfg) > 0 {
var privilegeConfig streams.UpkeepPrivilegeConfig
if err := json.Unmarshal(cfg, &privilegeConfig); err != nil {
failUnknown("failed to unmarshal privilege config ", err)
}
allowed = privilegeConfig.MercuryEnabled
failUnknown("failed to check if upkeep is allowed to use mercury", err)
}
if !allowed {
resolveIneligible("upkeep reverted with StreamsLookup but is not allowed to access streams")
}
} else if streamsLookupErr.FeedParamKey != feedIDs || streamsLookupErr.TimeParamKey != timestamp {
} else if streamsLookup.IsMercuryV03() {
// handle v0.3
resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid")
} else {
message("using mercury lookup v0.3")
} else {
resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid")
}
streamsLookup := &StreamsLookup{streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time, streamsLookupErr.ExtraData, upkeepID, blockNum}

if k.cfg.MercuryLegacyURL == "" || k.cfg.MercuryURL == "" || k.cfg.MercuryID == "" || k.cfg.MercuryKey == "" {
failCheckConfig("Mercury configs not set properly, check your MERCURY_LEGACY_URL, MERCURY_URL, MERCURY_ID and MERCURY_KEY", nil)
}
handler := NewMercuryLookupHandler(&MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}, k.rpcClient)
state, failureReason, values, _, err := handler.doMercuryRequest(ctx, streamsLookup)
if failureReason == UpkeepFailureReasonInvalidRevertDataInput {

// do mercury request
automationCheckResult := mustAutomationCheckResult(upkeepID, checkResult, trigger)
values, err := streams.DoMercuryRequest(ctx, streamsLookup, &automationCheckResult)

if automationCheckResult.IneligibilityReason == uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) {
resolveIneligible("upkeep used invalid revert data")
}
if state == InvalidMercuryRequest {
if automationCheckResult.PipelineExecutionState == uint8(mercury.InvalidMercuryRequest) {
resolveIneligible("the mercury request data is invalid")
}
if err != nil {
failCheckConfig("failed to do mercury request ", err)
resolveIneligible("failed to DoMercuryRequest")
}
callbackResult, err := keeperRegistry21.CheckCallback(triggerCallOpts, upkeepID, values, streamsLookup.extraData)

// do checkCallback
err = streams.CheckCallback(ctx, values, streamsLookup, &automationCheckResult)
if err != nil {
failUnknown("failed to execute mercury callback ", err)
}
if callbackResult.UpkeepFailureReason != 0 {
message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason))
if automationCheckResult.IneligibilityReason != 0 {
message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", automationCheckResult.IneligibilityReason))
}
upkeepNeeded, performData = callbackResult.UpkeepNeeded, callbackResult.PerformData
// do tenderly simulations
rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.extraData)
upkeepNeeded, performData = automationCheckResult.Eligible, automationCheckResult.PerformData
// do tenderly simulations for checkCallback
rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.ExtraData)
if err != nil {
failUnknown("failed to pack raw checkCallback call", err)
}
addLink("checkCallback simulation", tenderlySimLink(k.cfg, chainID, blockNum, rawCall, registryAddress))
rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.extraData)
rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.ExtraData)
if err != nil {
failUnknown("failed to pack raw checkCallback (direct) call", err)
}
Expand All @@ -317,6 +329,23 @@ func (k *Keeper) Debug(ctx context.Context, args []string) {
}
}

func mustAutomationCheckResult(upkeepID *big.Int, checkResult iregistry21.CheckUpkeep, trigger ocr2keepers.Trigger) ocr2keepers.CheckResult {
upkeepIdentifier := mustUpkeepIdentifier(upkeepID)
checkResult2 := ocr2keepers.CheckResult{
Eligible: checkResult.UpkeepNeeded,
IneligibilityReason: checkResult.UpkeepFailureReason,
UpkeepID: upkeepIdentifier,
Trigger: trigger,
WorkID: core.UpkeepWorkID(upkeepIdentifier, trigger),
GasAllocated: 0,
PerformData: checkResult.PerformData,
FastGasWei: checkResult.FastGasWei,
LinkNative: checkResult.LinkNative,
}

return checkResult2
}

type blockSubscriber struct {
ethClient *ethclient.Client
}
Expand Down Expand Up @@ -370,9 +399,27 @@ func packTriggerData(log *types.Log, blockTime uint64) ([]byte, error) {
return b, nil
}

func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, txHash [32]byte, logIndex int64) [32]byte {
// TODO - this is a copy of the code in core.UpkeepWorkID
// We should refactor that code to be more easily exported ex not rely on Trigger structs
func mustUpkeepWorkID(upkeepID *big.Int, trigger ocr2keepers.Trigger) [32]byte {
upkeepIdentifier := mustUpkeepIdentifier(upkeepID)

workID := core.UpkeepWorkID(upkeepIdentifier, trigger)
workIDBytes, err := hex.DecodeString(workID)
if err != nil {
failUnknown("failed to decode workID", err)
}

var result [32]byte
copy(result[:], workIDBytes[:])
return result
}

func mustUpkeepIdentifier(upkeepID *big.Int) ocr2keepers.UpkeepIdentifier {
upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{}
upkeepIdentifier.FromBigInt(upkeepID)
return *upkeepIdentifier
}

func mustAutomationTrigger(txHash [32]byte, logIndex int64, blockNum uint64, blockHash [32]byte) ocr2keepers.Trigger {
trigger := ocr2keepers.Trigger{
LogTriggerExtension: &ocr2keepers.LogTriggerExtension{
TxHash: txHash,
Expand All @@ -381,16 +428,7 @@ func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, tx
BlockHash: blockHash,
},
}
upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{}
upkeepIdentifier.FromBigInt(upkeepID)
workID := core.UpkeepWorkID(*upkeepIdentifier, trigger)
workIDBytes, err := hex.DecodeString(workID)
if err != nil {
failUnknown("failed to decode workID", err)
}
var result [32]byte
copy(result[:], workIDBytes[:])
return result
return trigger
}

func message(msg string) {
Expand All @@ -402,11 +440,11 @@ func warning(msg string) {
}

func resolveIneligible(msg string) {
exit(fmt.Sprintf("✅ %s: this upkeep is not currently elligible", msg), nil, 0)
exit(fmt.Sprintf("✅ %s: this upkeep is not currently eligible", msg), nil, 0)
}

func resolveEligible() {
exit("❌ this upkeep is currently elligible", nil, 0)
exit("❌ this upkeep is currently eligible", nil, 0)
}

func rerun(msg string, err error) {
Expand Down Expand Up @@ -507,5 +545,3 @@ func tenderlySimLink(cfg *config.Config, chainID int64, blockNumber uint64, inpu
}
return common.TenderlySimLink(responseJSON.Simulation.Id)
}

// TODO - link to performUpkeep tx if exists
Loading

0 comments on commit fa0f16a

Please sign in to comment.