diff --git a/core/scripts/chaincli/handler/debug.go b/core/scripts/chaincli/handler/debug.go index fec8c6cd414..0075862d95d 100644 --- a/core/scripts/chaincli/handler/debug.go +++ b/core/scripts/chaincli/handler/debug.go @@ -22,12 +22,17 @@ import ( ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" + "github.com/smartcontractkit/chainlink/core/scripts/chaincli/config" "github.com/smartcontractkit/chainlink/core/scripts/common" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" "github.com/smartcontractkit/chainlink/v2/core/utils" bigmath "github.com/smartcontractkit/chainlink/v2/core/utils/big_math" @@ -36,12 +41,7 @@ import ( const ( ConditionTrigger uint8 = iota LogTrigger - - blockNumber = "blockNumber" expectedTypeAndVersion = "KeeperRegistry 2.1.0" - feedIdHex = "feedIdHex" - feedIDs = "feedIDs" - timestamp = "timestamp" ) var packer = encoding.NewAbiPacker() @@ -125,6 +125,8 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { var checkResult iregistry21.CheckUpkeep var blockNum uint64 var performData []byte + var workID [32]byte + var trigger ocr2keepers.Trigger upkeepNeeded := false // check upkeep if triggerType == ConditionTrigger { @@ -177,7 +179,8 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } // check that tx for this upkeep / tx was not already performed message(fmt.Sprintf("LogTrigger{blockNum: %d, blockHash: %s, txHash: %s, logIndex: %d}", blockNum, receipt.BlockHash.Hex(), txHash, logIndex)) - workID := mustUpkeepWorkID(upkeepID, blockNum, receipt.BlockHash, txHash, logIndex) + trigger = mustAutomationTrigger(txHash, logIndex, blockNum, receipt.BlockHash) + workID = mustUpkeepWorkID(upkeepID, trigger) message(fmt.Sprintf("workID computed: %s", hex.EncodeToString(workID[:]))) hasKey, err := keeperRegistry21.HasDedupKey(latestCallOpts, workID) if err != nil { @@ -229,73 +232,82 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { if checkResult.UpkeepFailureReason != 0 { message(fmt.Sprintf("checkUpkeep failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) } + if checkResult.UpkeepFailureReason == uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { - // TODO use the new streams lookup lib - //mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey} - //mercuryConfig := evm.NewMercuryConfig(mc, core.StreamsCompatibleABI) - //lggr, _ := logger.NewLogger() - //blockSub := &blockSubscriber{k.client} - //_ = streams.NewStreamsLookup(packer, mercuryConfig, blockSub, keeperRegistry21, k.rpcClient, lggr) + mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey} + mercuryConfig := evm21.NewMercuryConfig(mc, core.StreamsCompatibleABI) + lggr, _ := logger.NewLogger() + blockSub := &blockSubscriber{k.client} + streams := streams.NewStreamsLookup(packer, mercuryConfig, blockSub, k.rpcClient, keeperRegistry21, lggr) streamsLookupErr, err := packer.DecodeStreamsLookupRequest(checkResult.PerformData) if err == nil { message("upkeep reverted with StreamsLookup") message(fmt.Sprintf("StreamsLookup data: {FeedParamKey: %s, Feeds: %v, TimeParamKey: %s, Time: %d, ExtraData: %s}", streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time.Uint64(), hexutil.Encode(streamsLookupErr.ExtraData))) - if streamsLookupErr.FeedParamKey == feedIdHex && streamsLookupErr.TimeParamKey == blockNumber { + + streamsLookup := &mercury.StreamsLookup{ + StreamsLookupError: &mercury.StreamsLookupError{ + FeedParamKey: streamsLookupErr.FeedParamKey, + Feeds: streamsLookupErr.Feeds, + TimeParamKey: streamsLookupErr.TimeParamKey, + Time: streamsLookupErr.Time, + ExtraData: streamsLookupErr.ExtraData, + }, + UpkeepId: upkeepID, + Block: blockNum, + } + + if streamsLookup.IsMercuryV02() { message("using mercury lookup v0.2") - // handle v0.2 - cfg, err := keeperRegistry21.GetUpkeepPrivilegeConfig(triggerCallOpts, upkeepID) + // check if upkeep is allowed to use mercury v0.2 + _, _, _, allowed, err := streams.AllowedToUseMercury(latestCallOpts, upkeepID) if err != nil { - failUnknown("failed to get upkeep privilege config ", err) - } - allowed := false - if len(cfg) > 0 { - var privilegeConfig streams.UpkeepPrivilegeConfig - if err := json.Unmarshal(cfg, &privilegeConfig); err != nil { - failUnknown("failed to unmarshal privilege config ", err) - } - allowed = privilegeConfig.MercuryEnabled + failUnknown("failed to check if upkeep is allowed to use mercury", err) } if !allowed { resolveIneligible("upkeep reverted with StreamsLookup but is not allowed to access streams") } - } else if streamsLookupErr.FeedParamKey != feedIDs || streamsLookupErr.TimeParamKey != timestamp { + } else if streamsLookup.IsMercuryV03() { // handle v0.3 - resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid") - } else { message("using mercury lookup v0.3") + } else { + resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid") } - streamsLookup := &StreamsLookup{streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time, streamsLookupErr.ExtraData, upkeepID, blockNum} if k.cfg.MercuryLegacyURL == "" || k.cfg.MercuryURL == "" || k.cfg.MercuryID == "" || k.cfg.MercuryKey == "" { failCheckConfig("Mercury configs not set properly, check your MERCURY_LEGACY_URL, MERCURY_URL, MERCURY_ID and MERCURY_KEY", nil) } - handler := NewMercuryLookupHandler(&MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}, k.rpcClient) - state, failureReason, values, _, err := handler.doMercuryRequest(ctx, streamsLookup) - if failureReason == UpkeepFailureReasonInvalidRevertDataInput { + + // do mercury request + automationCheckResult := mustAutomationCheckResult(upkeepID, checkResult, trigger) + values, err := streams.DoMercuryRequest(ctx, streamsLookup, &automationCheckResult) + + if automationCheckResult.IneligibilityReason == uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) { resolveIneligible("upkeep used invalid revert data") } - if state == InvalidMercuryRequest { + if automationCheckResult.PipelineExecutionState == uint8(mercury.InvalidMercuryRequest) { resolveIneligible("the mercury request data is invalid") } if err != nil { - failCheckConfig("failed to do mercury request ", err) + resolveIneligible("failed to DoMercuryRequest") } - callbackResult, err := keeperRegistry21.CheckCallback(triggerCallOpts, upkeepID, values, streamsLookup.extraData) + + // do checkCallback + err = streams.CheckCallback(ctx, values, streamsLookup, &automationCheckResult) if err != nil { failUnknown("failed to execute mercury callback ", err) } - if callbackResult.UpkeepFailureReason != 0 { - message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) + if automationCheckResult.IneligibilityReason != 0 { + message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", automationCheckResult.IneligibilityReason)) } - upkeepNeeded, performData = callbackResult.UpkeepNeeded, callbackResult.PerformData - // do tenderly simulations - rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.extraData) + upkeepNeeded, performData = automationCheckResult.Eligible, automationCheckResult.PerformData + // do tenderly simulations for checkCallback + rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.ExtraData) if err != nil { failUnknown("failed to pack raw checkCallback call", err) } addLink("checkCallback simulation", tenderlySimLink(k.cfg, chainID, blockNum, rawCall, registryAddress)) - rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.extraData) + rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.ExtraData) if err != nil { failUnknown("failed to pack raw checkCallback (direct) call", err) } @@ -317,6 +329,23 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } } +func mustAutomationCheckResult(upkeepID *big.Int, checkResult iregistry21.CheckUpkeep, trigger ocr2keepers.Trigger) ocr2keepers.CheckResult { + upkeepIdentifier := mustUpkeepIdentifier(upkeepID) + checkResult2 := ocr2keepers.CheckResult{ + Eligible: checkResult.UpkeepNeeded, + IneligibilityReason: checkResult.UpkeepFailureReason, + UpkeepID: upkeepIdentifier, + Trigger: trigger, + WorkID: core.UpkeepWorkID(upkeepIdentifier, trigger), + GasAllocated: 0, + PerformData: checkResult.PerformData, + FastGasWei: checkResult.FastGasWei, + LinkNative: checkResult.LinkNative, + } + + return checkResult2 +} + type blockSubscriber struct { ethClient *ethclient.Client } @@ -370,9 +399,27 @@ func packTriggerData(log *types.Log, blockTime uint64) ([]byte, error) { return b, nil } -func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, txHash [32]byte, logIndex int64) [32]byte { - // TODO - this is a copy of the code in core.UpkeepWorkID - // We should refactor that code to be more easily exported ex not rely on Trigger structs +func mustUpkeepWorkID(upkeepID *big.Int, trigger ocr2keepers.Trigger) [32]byte { + upkeepIdentifier := mustUpkeepIdentifier(upkeepID) + + workID := core.UpkeepWorkID(upkeepIdentifier, trigger) + workIDBytes, err := hex.DecodeString(workID) + if err != nil { + failUnknown("failed to decode workID", err) + } + + var result [32]byte + copy(result[:], workIDBytes[:]) + return result +} + +func mustUpkeepIdentifier(upkeepID *big.Int) ocr2keepers.UpkeepIdentifier { + upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} + upkeepIdentifier.FromBigInt(upkeepID) + return *upkeepIdentifier +} + +func mustAutomationTrigger(txHash [32]byte, logIndex int64, blockNum uint64, blockHash [32]byte) ocr2keepers.Trigger { trigger := ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ TxHash: txHash, @@ -381,16 +428,7 @@ func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, tx BlockHash: blockHash, }, } - upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} - upkeepIdentifier.FromBigInt(upkeepID) - workID := core.UpkeepWorkID(*upkeepIdentifier, trigger) - workIDBytes, err := hex.DecodeString(workID) - if err != nil { - failUnknown("failed to decode workID", err) - } - var result [32]byte - copy(result[:], workIDBytes[:]) - return result + return trigger } func message(msg string) { @@ -402,11 +440,11 @@ func warning(msg string) { } func resolveIneligible(msg string) { - exit(fmt.Sprintf("✅ %s: this upkeep is not currently elligible", msg), nil, 0) + exit(fmt.Sprintf("✅ %s: this upkeep is not currently eligible", msg), nil, 0) } func resolveEligible() { - exit("❌ this upkeep is currently elligible", nil, 0) + exit("❌ this upkeep is currently eligible", nil, 0) } func rerun(msg string, err error) { @@ -507,5 +545,3 @@ func tenderlySimLink(cfg *config.Config, chainID int64, blockNumber uint64, inpu } return common.TenderlySimLink(responseJSON.Simulation.Id) } - -// TODO - link to performUpkeep tx if exists diff --git a/core/scripts/chaincli/handler/mercury_lookup_handler.go b/core/scripts/chaincli/handler/mercury_lookup_handler.go deleted file mode 100644 index 1bd4b2e183c..00000000000 --- a/core/scripts/chaincli/handler/mercury_lookup_handler.go +++ /dev/null @@ -1,534 +0,0 @@ -package handler - -import ( - "context" - "crypto/hmac" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "math/big" - "net/http" - "net/url" - "strconv" - "strings" - "time" - - "github.com/avast/retry-go" - ethabi "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/rpc" - "github.com/pkg/errors" -) - -// MercuryLookupHandler is responsible for initiating the calls to the Mercury server -// to determine whether the upkeeps are eligible -type MercuryLookupHandler struct { - credentials *MercuryCredentials - httpClient HttpClient - rpcClient *rpc.Client -} - -func NewMercuryLookupHandler( - credentials *MercuryCredentials, - rpcClient *rpc.Client, -) *MercuryLookupHandler { - return &MercuryLookupHandler{ - credentials: credentials, - httpClient: http.DefaultClient, - rpcClient: rpcClient, - } -} - -type MercuryVersion string - -type StreamsLookup struct { - feedParamKey string - feeds []string - timeParamKey string - time *big.Int - extraData []byte - upkeepId *big.Int - block uint64 -} - -//go:generate mockery --quiet --name HttpClient --output ./mocks/ --case=underscore -type HttpClient interface { - Do(req *http.Request) (*http.Response, error) -} - -type MercuryCredentials struct { - LegacyURL string - URL string - ClientID string - ClientKey string -} - -func (mc *MercuryCredentials) Validate() bool { - return mc.URL != "" && mc.ClientID != "" && mc.ClientKey != "" -} - -type MercuryData struct { - Index int - Error error - Retryable bool - Bytes [][]byte - State PipelineExecutionState -} - -// MercuryV02Response represents a JSON structure used by Mercury v0.2 -type MercuryV02Response struct { - ChainlinkBlob string `json:"chainlinkBlob"` -} - -// MercuryV03Response represents a JSON structure used by Mercury v0.3 -type MercuryV03Response struct { - Reports []MercuryV03Report `json:"reports"` -} - -type MercuryV03Report struct { - FeedID string `json:"feedID"` // feed id in hex encoded - ValidFromTimestamp uint32 `json:"validFromTimestamp"` - ObservationsTimestamp uint32 `json:"observationsTimestamp"` - FullReport string `json:"fullReport"` // the actual hex encoded mercury report of this feed, can be sent to verifier -} - -const ( - // DefaultAllowListExpiration decides how long an upkeep's allow list info will be valid for. - DefaultAllowListExpiration = 20 * time.Minute - // CleanupInterval decides when the expired items in cache will be deleted. - CleanupInterval = 25 * time.Minute -) - -const ( - ApplicationJson = "application/json" - BlockNumber = "blockNumber" // valid for v0.2 - FeedIDs = "feedIDs" // valid for v0.3 - FeedIdHex = "feedIdHex" // valid for v0.2 - HeaderAuthorization = "Authorization" - HeaderContentType = "Content-Type" - HeaderTimestamp = "X-Authorization-Timestamp" - HeaderSignature = "X-Authorization-Signature-SHA256" - HeaderUpkeepId = "X-Authorization-Upkeep-Id" - MercuryPathV2 = "/client?" // only used to access mercury v0.2 server - MercuryBatchPathV3 = "/api/v1/reports/bulk?" // only used to access mercury v0.3 server - RetryDelay = 500 * time.Millisecond - Timestamp = "timestamp" // valid for v0.3 - TotalAttempt = 3 - UserId = "userId" -) - -type UpkeepFailureReason uint8 -type PipelineExecutionState uint8 - -const ( - // upkeep failure onchain reasons - UpkeepFailureReasonNone UpkeepFailureReason = 0 - UpkeepFailureReasonUpkeepCancelled UpkeepFailureReason = 1 - UpkeepFailureReasonUpkeepPaused UpkeepFailureReason = 2 - UpkeepFailureReasonTargetCheckReverted UpkeepFailureReason = 3 - UpkeepFailureReasonUpkeepNotNeeded UpkeepFailureReason = 4 - UpkeepFailureReasonPerformDataExceedsLimit UpkeepFailureReason = 5 - UpkeepFailureReasonInsufficientBalance UpkeepFailureReason = 6 - UpkeepFailureReasonMercuryCallbackReverted UpkeepFailureReason = 7 - UpkeepFailureReasonRevertDataExceedsLimit UpkeepFailureReason = 8 - UpkeepFailureReasonRegistryPaused UpkeepFailureReason = 9 - // leaving a gap here for more onchain failure reasons in the future - // upkeep failure offchain reasons - UpkeepFailureReasonMercuryAccessNotAllowed UpkeepFailureReason = 32 - UpkeepFailureReasonTxHashNoLongerExists UpkeepFailureReason = 33 - UpkeepFailureReasonInvalidRevertDataInput UpkeepFailureReason = 34 - UpkeepFailureReasonSimulationFailed UpkeepFailureReason = 35 - UpkeepFailureReasonTxHashReorged UpkeepFailureReason = 36 - - // pipeline execution error - NoPipelineError PipelineExecutionState = 0 - CheckBlockTooOld PipelineExecutionState = 1 - CheckBlockInvalid PipelineExecutionState = 2 - RpcFlakyFailure PipelineExecutionState = 3 - MercuryFlakyFailure PipelineExecutionState = 4 - PackUnpackDecodeFailed PipelineExecutionState = 5 - MercuryUnmarshalError PipelineExecutionState = 6 - InvalidMercuryRequest PipelineExecutionState = 7 - InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses - UpkeepNotAuthorized PipelineExecutionState = 9 -) - -// UpkeepPrivilegeConfig represents the administrative offchain config for each upkeep. It can be set by s_upkeepPrivilegeManager -// role on the registry. Upkeeps allowed to use Mercury server will have this set to true. -type UpkeepPrivilegeConfig struct { - MercuryEnabled bool `json:"mercuryEnabled"` -} - -// generateHMAC calculates a user HMAC for Mercury server authentication. -func (mlh *MercuryLookupHandler) generateHMAC(method string, path string, body []byte, clientId string, secret string, ts int64) string { - bodyHash := sha256.New() - bodyHash.Write(body) - hashString := fmt.Sprintf("%s %s %s %s %d", - method, - path, - hex.EncodeToString(bodyHash.Sum(nil)), - clientId, - ts) - signedMessage := hmac.New(sha256.New, []byte(secret)) - signedMessage.Write([]byte(hashString)) - userHmac := hex.EncodeToString(signedMessage.Sum(nil)) - return userHmac -} - -// singleFeedRequest sends a v0.2 Mercury request for a single feed report. -func (mlh *MercuryLookupHandler) singleFeedRequest(ctx context.Context, ch chan<- MercuryData, index int, ml *StreamsLookup) { - q := url.Values{ - ml.feedParamKey: {ml.feeds[index]}, - ml.timeParamKey: {ml.time.String()}, - } - mercuryURL := mlh.credentials.LegacyURL - reqUrl := fmt.Sprintf("%s%s%s", mercuryURL, MercuryPathV2, q.Encode()) - // mlh.logger.Debugf("request URL for upkeep %s feed %s: %s", ml.upkeepId.String(), ml.feeds[index], reqUrl) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) - if err != nil { - ch <- MercuryData{Index: index, Error: err, Retryable: false, State: InvalidMercuryRequest} - return - } - - ts := time.Now().UTC().UnixMilli() - signature := mlh.generateHMAC(http.MethodGet, MercuryPathV2+q.Encode(), []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) - req.Header.Set(HeaderContentType, ApplicationJson) - req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) - req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) - req.Header.Set(HeaderSignature, signature) - - // in the case of multiple retries here, use the last attempt's data - state := NoPipelineError - retryable := false - sent := false - retryErr := retry.Do( - func() error { - retryable = false - resp, err1 := mlh.httpClient.Do(req) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup GET request failed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) - retryable = true - state = MercuryFlakyFailure - return err1 - } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - // mlh.logger.Errorf("Encountered error when closing the body of the response in single feed: %s", err) - } - }(resp.Body) - - body, err1 := io.ReadAll(resp.Body) - if err1 != nil { - retryable = false - state = InvalidMercuryResponse - return err1 - } - - if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError { - // mlh.logger.Errorw("StreamsLookup received retryable status code", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "statusCode", resp.StatusCode, "feed", ml.feeds[index]) - retryable = true - state = MercuryFlakyFailure - return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10)) - } else if resp.StatusCode != http.StatusOK { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("StreamsLookup upkeep %s block %s received status code %d for feed %s", ml.upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index]) - } - - // mlh.logger.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", ml.time.String(), ml.upkeepId.String(), resp.StatusCode, hexutil.Encode(body)) - - var m MercuryV02Response - err1 = json.Unmarshal(body, &m) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) - retryable = false - state = MercuryUnmarshalError - return err1 - } - blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to decode chainlinkBlob for feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "blob", m.ChainlinkBlob, "feed", ml.feeds[index], "error", err1) - retryable = false - state = InvalidMercuryResponse - return err1 - } - ch <- MercuryData{ - Index: index, - Bytes: [][]byte{blobBytes}, - Retryable: false, - State: NoPipelineError, - } - sent = true - return nil - }, - // only retry when the error is 404 Not Found or 500 Internal Server Error - retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) - }), - retry.Context(ctx), - retry.Delay(RetryDelay), - retry.Attempts(TotalAttempt)) - - if !sent { - md := MercuryData{ - Index: index, - Bytes: [][]byte{}, - Retryable: retryable, - Error: fmt.Errorf("failed to request feed for %s: %w", ml.feeds[index], retryErr), - State: state, - } - ch <- md - } -} - -// multiFeedsRequest sends a Mercury v0.3 request for a multi-feed report -func (mlh *MercuryLookupHandler) multiFeedsRequest(ctx context.Context, ch chan<- MercuryData, ml *StreamsLookup) { - params := fmt.Sprintf("%s=%s&%s=%s", FeedIDs, strings.Join(ml.feeds, ","), Timestamp, ml.time.String()) - reqUrl := fmt.Sprintf("%s%s%s", mlh.credentials.URL, MercuryBatchPathV3, params) - // mlh.logger.Debugf("request URL for upkeep %s userId %s: %s", ml.upkeepId.String(), mlh.credentials.ClientID, reqUrl) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) - if err != nil { - ch <- MercuryData{Index: 0, Error: err, Retryable: false, State: InvalidMercuryRequest} - return - } - - ts := time.Now().UTC().UnixMilli() - signature := mlh.generateHMAC(http.MethodGet, MercuryBatchPathV3+params, []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) - req.Header.Set(HeaderContentType, ApplicationJson) - // username here is often referred to as user id - req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) - req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) - req.Header.Set(HeaderSignature, signature) - // mercury will inspect authorization headers above to make sure this user (in automation's context, this node) is eligible to access mercury - // and if it has an automation role. it will then look at this upkeep id to check if it has access to all the requested feeds. - req.Header.Set(HeaderUpkeepId, ml.upkeepId.String()) - - // in the case of multiple retries here, use the last attempt's data - state := NoPipelineError - retryable := false - sent := false - retryErr := retry.Do( - func() error { - retryable = false - resp, err1 := mlh.httpClient.Do(req) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup GET request fails for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) - retryable = true - state = MercuryFlakyFailure - return err1 - } - defer func(Body io.ReadCloser) { - err := Body.Close() - if err != nil { - // mlh.logger.Errorf("Encountered error when closing the body of the response in the multi feed: %s", err) - } - }(resp.Body) - body, err1 := io.ReadAll(resp.Body) - if err1 != nil { - retryable = false - state = InvalidMercuryResponse - return err1 - } - - // mlh.logger.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - if resp.StatusCode == http.StatusUnauthorized { - retryable = false - state = UpkeepNotAuthorized - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode == http.StatusBadRequest { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode == http.StatusInternalServerError { - retryable = true - state = MercuryFlakyFailure - return fmt.Errorf("%d", http.StatusInternalServerError) - } else if resp.StatusCode == 420 { - // in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } else if resp.StatusCode != http.StatusOK { - retryable = false - state = InvalidMercuryRequest - return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) - } - - var response MercuryV03Response - err1 = json.Unmarshal(body, &response) - if err1 != nil { - // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) - retryable = false - state = MercuryUnmarshalError - return err1 - } - // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract - // hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated - if len(response.Reports) != len(ml.feeds) { - // TODO: AUTO-5044: calculate what reports are missing and log a warning - retryable = true - state = MercuryFlakyFailure - return fmt.Errorf("%d", http.StatusNotFound) - } - var reportBytes [][]byte - for _, rsp := range response.Reports { - b, err := hexutil.Decode(rsp.FullReport) - if err != nil { - retryable = false - state = InvalidMercuryResponse - return err - } - reportBytes = append(reportBytes, b) - } - ch <- MercuryData{ - Index: 0, - Bytes: reportBytes, - Retryable: false, - State: NoPipelineError, - } - sent = true - return nil - }, - // only retry when the error is 404 Not Found or 500 Internal Server Error - retry.RetryIf(func(err error) bool { - return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) - }), - retry.Context(ctx), - retry.Delay(RetryDelay), - retry.Attempts(TotalAttempt)) - - if !sent { - md := MercuryData{ - Index: 0, - Bytes: [][]byte{}, - Retryable: retryable, - Error: retryErr, - State: state, - } - ch <- md - } -} - -// doMercuryRequest sends requests to Mercury API to retrieve ChainlinkBlob. -func (mlh *MercuryLookupHandler) doMercuryRequest(ctx context.Context, ml *StreamsLookup) (PipelineExecutionState, UpkeepFailureReason, [][]byte, bool, error) { - var isMercuryV03 bool - resultLen := len(ml.feeds) - ch := make(chan MercuryData, resultLen) - if len(ml.feeds) == 0 { - return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) - } - if ml.feedParamKey == FeedIdHex && ml.timeParamKey == BlockNumber { - // only v0.2 - for i := range ml.feeds { - go mlh.singleFeedRequest(ctx, ch, i, ml) - } - } else if ml.feedParamKey == FeedIDs && ml.timeParamKey == Timestamp { - // only v0.3 - resultLen = 1 - isMercuryV03 = true - ch = make(chan MercuryData, resultLen) - go mlh.multiFeedsRequest(ctx, ch, ml) - } else { - return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) - } - - var reqErr error - results := make([][]byte, len(ml.feeds)) - retryable := true - allSuccess := true - // in v0.2, use the last execution error as the state, if no execution errors, state will be no error - state := NoPipelineError - for i := 0; i < resultLen; i++ { - m := <-ch - if m.Error != nil { - if reqErr == nil { - reqErr = errors.New(m.Error.Error()) - } else { - reqErr = errors.New(reqErr.Error() + m.Error.Error()) - } - retryable = retryable && m.Retryable - allSuccess = false - if m.State != NoPipelineError { - state = m.State - } - continue - } - if isMercuryV03 { - results = m.Bytes - } else { - results[m.Index] = m.Bytes[0] - } - } - // only retry when not all successful AND none are not retryable - return state, UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr -} - -// decodeStreamsLookup decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string timeParamKey, uint256 time, byte[] extraData) -// func (mlh *MercuryLookupHandler) decodeStreamsLookup(data []byte) (*StreamsLookup, error) { -// e := mlh.mercuryConfig.Abi.Errors["StreamsLookup"] -// unpack, err := e.Unpack(data) -// if err != nil { -// return nil, fmt.Errorf("unpack error: %w", err) -// } -// errorParameters := unpack.([]interface{}) - -// return &StreamsLookup{ -// feedParamKey: *abi.ConvertType(errorParameters[0], new(string)).(*string), -// feeds: *abi.ConvertType(errorParameters[1], new([]string)).(*[]string), -// timeParamKey: *abi.ConvertType(errorParameters[2], new(string)).(*string), -// time: *abi.ConvertType(errorParameters[3], new(*big.Int)).(**big.Int), -// extraData: *abi.ConvertType(errorParameters[4], new([]byte)).(*[]byte), -// }, nil -// } - -// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if -// this upkeep is allowed to use Mercury service. -// func (mlh *MercuryLookupHandler) allowedToUseMercury(upkeep models.Upkeep) (bool, error) { -// allowed, ok := mlh.mercuryConfig.AllowListCache.Get(upkeep.Admin.Hex()) -// if ok { -// return allowed.(bool), nil -// } - -// if upkeep.UpkeepPrivilegeConfig == nil { -// return false, fmt.Errorf("the upkeep privilege config was not retrieved for upkeep with ID %s", upkeep.UpkeepID) -// } - -// if len(upkeep.UpkeepPrivilegeConfig) == 0 { -// return false, fmt.Errorf("the upkeep privilege config is empty") -// } - -// var a UpkeepPrivilegeConfig -// err := json.Unmarshal(upkeep.UpkeepPrivilegeConfig, &a) -// if err != nil { -// return false, fmt.Errorf("failed to unmarshal privilege config for upkeep ID %s: %v", upkeep.UpkeepID, err) -// } - -// mlh.mercuryConfig.AllowListCache.Set(upkeep.Admin.Hex(), a.MercuryEnabled, cache.DefaultExpiration) -// return a.MercuryEnabled, nil -// } - -func (mlh *MercuryLookupHandler) CheckCallback(ctx context.Context, values [][]byte, lookup *StreamsLookup, registryABI ethabi.ABI, registryAddress common.Address) (hexutil.Bytes, error) { - payload, err := registryABI.Pack("checkCallback", lookup.upkeepId, values, lookup.extraData) - if err != nil { - return nil, err - } - - var theBytes hexutil.Bytes - args := map[string]interface{}{ - "to": registryAddress.Hex(), - "data": hexutil.Bytes(payload), - } - - // call checkCallback function at the block which OCR3 has agreed upon - err = mlh.rpcClient.CallContext(ctx, &theBytes, "eth_call", args, hexutil.EncodeUint64(lookup.block)) - if err != nil { - return nil, err - } - return theBytes, nil -} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index c7af0541c12..21f00df02de 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -7,7 +7,6 @@ replace github.com/smartcontractkit/chainlink/v2 => ../../ require ( github.com/ava-labs/coreth v0.12.1 - github.com/avast/retry-go v3.0.0+incompatible github.com/docker/docker v24.0.7+incompatible github.com/docker/go-connections v0.4.0 github.com/ethereum/go-ethereum v1.12.0 @@ -19,7 +18,6 @@ require ( github.com/montanaflynn/stats v0.7.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pelletier/go-toml/v2 v2.1.0 - github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 @@ -283,6 +281,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pressly/goose/v3 v3.16.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 7cea79eb76e..36504924e16 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -152,8 +152,6 @@ github.com/ava-labs/avalanchego v1.10.1 h1:lBeamJ1iNq+p2oKg2nAs+A65m8vhSDjkiTDbw github.com/ava-labs/avalanchego v1.10.1/go.mod h1:ZvSXWlbkUKlbk3BsWx29a+8eVHe/WBsOxh55BSGoeRk= github.com/ava-labs/coreth v0.12.1 h1:EWSkFGHGVUxmu1pnSK/2pdcxaAVHbGspHqO3Ag+i7sA= github.com/ava-labs/coreth v0.12.1/go.mod h1:/5x54QlIKjlPebkdzTA5ic9wXdejbWOnQosztkv9jxo= -github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= -github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index aec23431921..cb9e2dd6752 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -16,7 +16,6 @@ import ( "github.com/patrickmn/go-cache" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - "github.com/smartcontractkit/chainlink-common/pkg/services" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" @@ -92,8 +91,9 @@ func NewStreamsLookup( // Lookup looks through check upkeep results looking for any that need off chain lookup func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult { lookups := map[int]*mercury.StreamsLookup{} - for i, checkResult := range checkResults { - s.buildResult(ctx, i, checkResult, checkResults, lookups) + for _, checkResult := range checkResults { + copyCheckResult := checkResult + s.buildResult(ctx, ©CheckResult, lookups) } var wg sync.WaitGroup @@ -101,7 +101,7 @@ func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckRe wg.Add(1) func(i int, lookup *mercury.StreamsLookup) { s.threadCtrl.Go(func(ctx context.Context) { - s.doLookup(ctx, &wg, lookup, i, checkResults) + s.doLookup(ctx, &wg, lookup, &checkResults[i]) }) }(i, lookup) } @@ -112,7 +112,7 @@ func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckRe } // buildResult checks if the upkeep is allowed by Mercury and builds a streams lookup request from the check result -func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keepers.CheckResult, checkResults []ocr2keepers.CheckResult, lookups map[int]*mercury.StreamsLookup) { +func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.CheckResult, lookups map[int]*mercury.StreamsLookup) { lookupLggr := s.lggr.With("where", "StreamsLookup") if checkResult.IneligibilityReason != uint8(mercury.MercuryUpkeepFailureReasonTargetCheckReverted) { // Streams Lookup only works when upkeep target check reverts @@ -129,7 +129,7 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper // Try to decode the revert error into streams lookup format. User upkeeps can revert with any reason, see if they // tried to call mercury - lookupLggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResults[i].PerformData)) + lookupLggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResult.PerformData)) streamsLookupErr, err := s.packer.DecodeStreamsLookupRequest(checkResult.PerformData) if err != nil { lookupLggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err) @@ -139,7 +139,7 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper streamsLookupResponse := &mercury.StreamsLookup{StreamsLookupError: streamsLookupErr} if len(streamsLookupResponse.Feeds) == 0 { - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) lookupLggr.Debugf("at block %s upkeep %s has empty feeds array", block, upkeepId) return } @@ -148,21 +148,21 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper if streamsLookupResponse.IsMercuryV02() { // check permission on the registry for mercury v0.2 opts := s.buildCallOpts(ctx, block) - if state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId.BigInt()); err != nil { + if state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId.BigInt()); err != nil { lookupLggr.Warnf("at block %s upkeep %s failed to query mercury allow list: %s", block, upkeepId, err) - checkResults[i].PipelineExecutionState = uint8(state) - checkResults[i].IneligibilityReason = uint8(reason) - checkResults[i].Retryable = retryable + checkResult.PipelineExecutionState = uint8(state) + checkResult.IneligibilityReason = uint8(reason) + checkResult.Retryable = retryable return } else if !allowed { lookupLggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId) - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryAccessNotAllowed) + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryAccessNotAllowed) return } } else if streamsLookupResponse.IsMercuryVersionUnkown() { // if mercury version cannot be determined, set failure reason lookupLggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId) - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) return } @@ -171,71 +171,103 @@ func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keeper // in the revert for mercury v0.2, which is denoted by time in the struct bc starting from v0.3, only timestamp will be supported streamsLookupResponse.Block = uint64(block.Int64()) lookupLggr.Infof("at block %d upkeep %s DecodeStreamsLookupRequest feedKey=%s timeKey=%s feeds=%v time=%s extraData=%s", block, upkeepId, streamsLookupResponse.FeedParamKey, streamsLookupResponse.TimeParamKey, streamsLookupResponse.Feeds, streamsLookupResponse.Time, hexutil.Encode(streamsLookupResponse.ExtraData)) - lookups[i] = streamsLookupResponse + lookups[len(lookups)] = streamsLookupResponse } -func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, i int, checkResults []ocr2keepers.CheckResult) { +func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) { defer wg.Done() - state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) - pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) + values, err := s.DoMercuryRequest(ctx, lookup, checkResult) + if err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + } - if lookup.IsMercuryV02() { - state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) - } else if lookup.IsMercuryV03() { - state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + if err := s.CheckCallback(ctx, values, lookup, checkResult); err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) } +} +func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) error { + payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) if err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.Block, lookup.UpkeepId, lookup.Time, retryable, retryInterval, err.Error()) - checkResults[i].Retryable = retryable - checkResults[i].RetryInterval = retryInterval - checkResults[i].PipelineExecutionState = uint8(state) - checkResults[i].IneligibilityReason = uint8(reason) - return + s.lggr.Errorf("at block %d upkeep %s checkCallback packing err: %s", lookup.Block, lookup.UpkeepId, err.Error()) + checkResult.Retryable = false + checkResult.PipelineExecutionState = uint8(mercury.PackUnpackDecodeFailed) + return err } - for j, v := range values { - s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) + var mercuryBytes hexutil.Bytes + args := map[string]interface{}{ + "to": s.registry.Address().Hex(), + "data": hexutil.Bytes(payload), } - state, retryable, mercuryBytes, err := s.checkCallback(ctx, values, lookup) - if err != nil { + // call checkCallback function at the block which OCR3 has agreed upon + if err = s.client.CallContext(ctx, &mercuryBytes, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { s.lggr.Errorf("at block %d upkeep %s checkCallback err: %s", lookup.Block, lookup.UpkeepId, err.Error()) - checkResults[i].Retryable = retryable - checkResults[i].PipelineExecutionState = uint8(state) - return + checkResult.Retryable = true + checkResult.PipelineExecutionState = uint8(mercury.RpcFlakyFailure) + return err } + s.lggr.Infof("at block %d upkeep %s requested time %s checkCallback mercuryBytes: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(mercuryBytes)) unpackCallBackState, needed, performData, failureReason, _, err := s.packer.UnpackCheckCallbackResult(mercuryBytes) if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s UnpackCheckCallbackResult err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) - checkResults[i].PipelineExecutionState = unpackCallBackState - return + checkResult.PipelineExecutionState = unpackCallBackState + return err } if failureReason == uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) { - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) s.lggr.Debugf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) - return + return fmt.Errorf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) + } if !needed { - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonUpkeepNotNeeded) + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonUpkeepNotNeeded) s.lggr.Debugf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) - return + return fmt.Errorf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) } - checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonNone) - checkResults[i].Eligible = true - checkResults[i].PerformData = performData + checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonNone) + checkResult.Eligible = true + checkResult.PerformData = performData s.lggr.Infof("at block %d upkeep %s requested time %s successful with perform data: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(performData)) + + return nil +} + +func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) ([][]byte, error) { + state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) + pluginRetryKey := generatePluginRetryKey(checkResult.WorkID, lookup.Block) + + if lookup.IsMercuryV02() { + state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) + } else if lookup.IsMercuryV03() { + state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) + } + + if err != nil { + s.lggr.Errorf("at block %d upkeep %s requested time %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.Block, lookup.UpkeepId, lookup.Time, retryable, retryInterval, err.Error()) + checkResult.Retryable = retryable + checkResult.RetryInterval = retryInterval + checkResult.PipelineExecutionState = uint8(state) + checkResult.IneligibilityReason = uint8(reason) + return nil, err + } + + for j, v := range values { + s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) + } + return values, nil } -// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if +// AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if // this upkeep is allowed to use Mercury service. -func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { +func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { allowed, ok := s.mercuryConfig.IsUpkeepAllowed(upkeepId.String()) if ok { return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, allowed.(bool), nil @@ -255,7 +287,6 @@ func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s "data": hexutil.Bytes(payload), } - // call checkCallback function at the block which OCR3 has agreed upon if err = s.client.CallContext(opts.Context, &resultBytes, "eth_call", args, hexutil.EncodeBig(opts.BlockNumber)); err != nil { return mercury.RpcFlakyFailure, mercury.MercuryUpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err) } @@ -281,26 +312,6 @@ func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil } -func (s *streams) checkCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup) (mercury.MercuryUpkeepState, bool, hexutil.Bytes, error) { - payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) - if err != nil { - return mercury.PackUnpackDecodeFailed, false, nil, err - } - - var b hexutil.Bytes - args := map[string]interface{}{ - "to": s.registry.Address().Hex(), - "data": hexutil.Bytes(payload), - } - - // call checkCallback function at the block which OCR3 has agreed upon - if err := s.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { - return mercury.RpcFlakyFailure, true, nil, err - } - - return mercury.NoPipelineError, false, b, nil -} - func (s *streams) buildCallOpts(ctx context.Context, block *big.Int) *bind.CallOpts { opts := bind.CallOpts{ Context: ctx, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index abcc37dca18..2475244b4d0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -126,6 +126,7 @@ func TestStreams_CheckCallback(t *testing.T) { tests := []struct { name string lookup *mercury.StreamsLookup + input []ocr2keepers.CheckResult values [][]byte statusCode int @@ -153,6 +154,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 48, 120, 48, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -185,6 +189,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -216,6 +223,9 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, + input: []ocr2keepers.CheckResult{ + {}, + }, values: values, statusCode: http.StatusOK, callbackResp: []byte{}, @@ -255,10 +265,10 @@ func TestStreams_CheckCallback(t *testing.T) { }).Once() s.client = client - state, retryable, _, err := s.checkCallback(testutils.Context(t), tt.values, tt.lookup) - tt.wantErr(t, err, fmt.Sprintf("Error asserion failed: %v", tt.name)) - assert.Equal(t, tt.state, state) - assert.Equal(t, tt.retryable, retryable) + err = s.CheckCallback(testutils.Context(t), tt.values, tt.lookup, &tt.input[0]) + tt.wantErr(t, err, fmt.Sprintf("Error assertion failed: %v", tt.name)) + assert.Equal(t, uint8(tt.state), tt.input[0].PipelineExecutionState) + assert.Equal(t, tt.retryable, tt.input[0].Retryable) }) } } @@ -434,7 +444,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) { BlockNumber: big.NewInt(10), } - state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId) + state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId) assert.Equal(t, tt.err, err) assert.Equal(t, tt.allowed, allowed) assert.Equal(t, tt.state, state)