diff --git a/README.md b/README.md index 168b709ac7a..884e611fcd8 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ You can configure your node's behavior by setting environment variables which ca ETH_URL Default: ws://localhost:8546 ETH_CHAIN_ID Default: 0 ETH_GAS_BUMP_THRESHOLD Default: 12 - ETH_MIN_CONFIRMATIONS Default: 12 + TX_MIN_CONFIRMATIONS Default: 12 + TASK_MIN_CONFIRMATIONS Default: 6 ETH_GAS_BUMP_WEI Default: 5000000000 (5 gwei) ETH_GAS_PRICE_DEFAULT Default: 20000000000 (20 gwei) diff --git a/adapters/bridge.go b/adapters/bridge.go index 48450f294cd..bc25d28c96f 100644 --- a/adapters/bridge.go +++ b/adapters/bridge.go @@ -24,57 +24,69 @@ type Bridge struct { // If the Perform is resumed with a pending RunResult, the RunResult is marked // not pending and the RunResult is returned. func (ba *Bridge) Perform(input models.RunResult, _ *store.Store) models.RunResult { - if input.Pending { + if input.Status.Errored() { + return input + } else if input.Status.PendingExternal() { return markNotPending(input) } return ba.handleNewRun(input) } func markNotPending(input models.RunResult) models.RunResult { - input.Pending = false + input.Status = models.RunStatusInProgress return input } func (ba *Bridge) handleNewRun(input models.RunResult) models.RunResult { - in, err := json.Marshal(&bridgePayload{input}) + b, err := postToExternalAdapter(ba.URL.String(), input) if err != nil { - return baRunResultError(input, "marshaling request body", err) + return baRunResultError(input, "post to external adapter", err) } - resp, err := http.Post(ba.URL.String(), "application/json", bytes.NewBuffer(in)) + var brr models.BridgeRunResult + err = json.Unmarshal(b, &brr) if err != nil { - return baRunResultError(input, "POST request", err) + return baRunResultError(input, "unmarshaling JSON", err) } - defer resp.Body.Close() - if resp.StatusCode >= 400 { - b, _ := ioutil.ReadAll(resp.Body) - err = fmt.Errorf("%v %v", resp.StatusCode, string(b)) - return baRunResultError(input, "POST response", err) + rr, err := input.Merge(brr.RunResult) + if err != nil { + return baRunResultError(rr, "Unable to merge received payload", err) } - b, err := ioutil.ReadAll(resp.Body) + return rr +} + +func postToExternalAdapter(url string, input models.RunResult) ([]byte, error) { + in, err := json.Marshal(&bridgeOutgoing{input}) if err != nil { - return baRunResultError(input, "reading response body", err) + return nil, fmt.Errorf("marshaling request body: %v", err) } - rr := models.RunResult{} - err = json.Unmarshal(b, &rr) + resp, err := http.Post(url, "application/json", bytes.NewBuffer(in)) if err != nil { - return baRunResultError(input, "unmarshaling JSON", err) + return nil, fmt.Errorf("POST request: %v", err) } - return rr + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + b, _ := ioutil.ReadAll(resp.Body) + err = fmt.Errorf("%v %v", resp.StatusCode, string(b)) + return nil, fmt.Errorf("POST response: %v", err) + } + + return ioutil.ReadAll(resp.Body) } func baRunResultError(in models.RunResult, str string, err error) models.RunResult { return in.WithError(fmt.Errorf("ExternalBridge %v: %v", str, err)) } -type bridgePayload struct { +type bridgeOutgoing struct { models.RunResult } -func (bp bridgePayload) MarshalJSON() ([]byte, error) { +func (bp bridgeOutgoing) MarshalJSON() ([]byte, error) { anon := struct { JobRunID string `json:"id"` Data models.JSON `json:"data"` diff --git a/adapters/bridge_test.go b/adapters/bridge_test.go index 67746f25316..ebe8db747be 100644 --- a/adapters/bridge_test.go +++ b/adapters/bridge_test.go @@ -9,25 +9,24 @@ import ( "github.com/smartcontractkit/chainlink/store/models" "github.com/smartcontractkit/chainlink/utils" "github.com/stretchr/testify/assert" - null "gopkg.in/guregu/null.v3" ) -func TestBridge_Perform_FromUnstarted(t *testing.T) { +func TestBridge_Perform_fromUnstarted(t *testing.T) { cases := []struct { name string status int want string - wantExists bool wantErrored bool wantPending bool response string }{ - {"success", 200, "purchased", true, false, false, `{"data":{"value": "purchased"}}`}, - {"run error", 200, "", false, true, false, `{"error": "overload", "data": {}}`}, - {"server error", 400, "lot 49", true, true, false, `bad request`}, - {"server error", 500, "lot 49", true, true, false, `big error`}, - {"JSON parse error", 200, "lot 49", true, true, false, `}`}, - {"pending response", 200, "", false, false, true, `{"pending":true}`}, + {"success", 200, "purchased", false, false, `{"data":{"value": "purchased"}}`}, + {"run error", 200, "lot 49", true, false, `{"error": "overload", "data": {}}`}, + {"server error", 400, "lot 49", true, false, `bad request`}, + {"server error", 500, "lot 49", true, false, `big error`}, + {"JSON parse error", 200, "lot 49", true, false, `}`}, + {"pending response", 200, "lot 49", false, true, `{"pending":true}`}, + {"unsetting value", 200, "", false, false, `{"data":{"value":null}}`}, } store, cleanup := cltest.NewStore() @@ -53,22 +52,23 @@ func TestBridge_Perform_FromUnstarted(t *testing.T) { result = eb.Perform(result, store) val, _ := result.Get("value") assert.Equal(t, test.want, val.String()) - assert.Equal(t, test.wantExists, val.Exists()) assert.Equal(t, test.wantErrored, result.HasError()) - assert.Equal(t, test.wantPending, result.Pending) + assert.Equal(t, test.wantPending, result.Status.PendingExternal()) }) } } -func TestBridge_Perform_FromPending(t *testing.T) { +func TestBridge_Perform_resuming(t *testing.T) { + t.Parallel() cases := []struct { - name string - input string - errorMessage null.String - want string + name string + input string + status models.RunStatus + want string + wantStatus models.RunStatus }{ - {"basic", `{"value":"100","old":"remains"}`, cltest.NullString(nil), `{"value":"100","old":"remains"}`}, - {"with error", `{"value":"100","old":"remains"}`, cltest.NullString("Big error!"), `{"value":"100","old":"remains"}`}, + {"from pending", `{"value":"100","old":"remains"}`, models.RunStatusPendingExternal, `{"value":"100","old":"remains"}`, models.RunStatusInProgress}, + {"from errored", `{"value":"100","old":"remains"}`, models.RunStatusErrored, `{"value":"100","old":"remains"}`, models.RunStatusErrored}, } store, cleanup := cltest.NewStore() @@ -79,18 +79,18 @@ func TestBridge_Perform_FromPending(t *testing.T) { for _, test := range cases { t.Run(test.name, func(t *testing.T) { - t.Parallel() input := models.RunResult{ - Data: cltest.JSONFromString(test.input), - ErrorMessage: test.errorMessage, - Pending: true, + Data: cltest.JSONFromString(test.input), + Status: test.status, } result := ba.Perform(input, store) assert.Equal(t, test.want, result.Data.String()) - assert.Equal(t, test.errorMessage, result.ErrorMessage) - assert.Equal(t, false, result.Pending) + assert.Equal(t, test.wantStatus, result.Status) + if test.wantStatus.Errored() { + assert.Equal(t, input, result) + } }) } } diff --git a/adapters/eth_tx.go b/adapters/eth_tx.go index 178ba17fcd0..3176ea67033 100644 --- a/adapters/eth_tx.go +++ b/adapters/eth_tx.go @@ -20,7 +20,7 @@ type EthTx struct { // is not currently pending. Then it confirms the transaction was confirmed on // the blockchain. func (etx *EthTx) Perform(input models.RunResult, store *store.Store) models.RunResult { - if !input.Pending { + if !input.Status.PendingExternal() { return createTxRunResult(etx, input, store) } else { return ensureTxRunResult(input, store) @@ -67,7 +67,7 @@ func ensureTxRunResult(input models.RunResult, store *store.Store) models.RunRes if err != nil { return input.WithError(err) } else if !confirmed { - return input.MarkPending() + return input.MarkPendingExternal() } return input.WithValue(hash.String()) } diff --git a/adapters/eth_tx_test.go b/adapters/eth_tx_test.go index b320626d849..c282f625295 100644 --- a/adapters/eth_tx_test.go +++ b/adapters/eth_tx_test.go @@ -31,7 +31,7 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) { hash := cltest.NewHash() sentAt := uint64(23456) confirmed := sentAt + 1 - safe := confirmed + config.EthMinConfirmations + safe := confirmed + config.TxMinConfirmations ethMock.Register("eth_sendRawTransaction", hash, func(_ interface{}, data ...interface{}) error { rlp := data[0].([]interface{})[0].(string) @@ -87,12 +87,12 @@ func TestEthTxAdapter_Perform_FromPending(t *testing.T) { assert.Nil(t, err) adapter := adapters.EthTx{} sentResult := cltest.RunResultWithValue(a.Hash.String()) - input := sentResult.MarkPending() + input := sentResult.MarkPendingExternal() output := adapter.Perform(input, store) assert.False(t, output.HasError()) - assert.True(t, output.Pending) + assert.True(t, output.Status.PendingExternal()) assert.Nil(t, store.One("ID", tx.ID, tx)) attempts, _ := store.AttemptsFor(tx.ID) assert.Equal(t, 1, len(attempts)) @@ -121,12 +121,12 @@ func TestEthTxAdapter_Perform_FromPendingBumpGas(t *testing.T) { assert.Nil(t, err) adapter := adapters.EthTx{} sentResult := cltest.RunResultWithValue(a.Hash.String()) - input := sentResult.MarkPending() + input := sentResult.MarkPendingExternal() output := adapter.Perform(input, store) assert.False(t, output.HasError()) - assert.True(t, output.Pending) + assert.True(t, output.Status.PendingExternal()) assert.Nil(t, store.One("ID", tx.ID, tx)) attempts, _ := store.AttemptsFor(tx.ID) assert.Equal(t, 2, len(attempts)) @@ -150,7 +150,7 @@ func TestEthTxAdapter_Perform_FromPendingConfirm(t *testing.T) { Hash: cltest.NewHash(), BlockNumber: cltest.BigHexInt(sentAt), }) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.EthMinConfirmations)) + ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.TxMinConfirmations)) tx := cltest.NewTx(cltest.NewAddress(), sentAt) assert.Nil(t, store.Save(tx)) @@ -159,13 +159,13 @@ func TestEthTxAdapter_Perform_FromPendingConfirm(t *testing.T) { a3, _ := store.AddAttempt(tx, tx.EthTx(big.NewInt(3)), sentAt+2) adapter := adapters.EthTx{} sentResult := cltest.RunResultWithValue(a3.Hash.String()) - input := sentResult.MarkPending() + input := sentResult.MarkPendingExternal() assert.False(t, tx.Confirmed) output := adapter.Perform(input, store) - assert.False(t, output.Pending) + assert.False(t, output.Status.PendingExternal()) assert.False(t, output.HasError()) assert.Nil(t, store.One("ID", tx.ID, tx)) diff --git a/adapters/http_test.go b/adapters/http_test.go index 9b0573f5505..5c70ece4256 100644 --- a/adapters/http_test.go +++ b/adapters/http_test.go @@ -60,7 +60,7 @@ func TestHttpGet_Perform(t *testing.T) { assert.Nil(t, err) assert.Equal(t, test.want, val) assert.Equal(t, test.wantErrored, result.HasError()) - assert.Equal(t, false, result.Pending) + assert.Equal(t, false, result.Status.PendingExternal()) }) } } @@ -98,7 +98,7 @@ func TestHttpPost_Perform(t *testing.T) { assert.Equal(t, test.want, val.String()) assert.Equal(t, true, val.Exists()) assert.Equal(t, test.wantErrored, result.HasError()) - assert.Equal(t, false, result.Pending) + assert.Equal(t, false, result.Status.PendingExternal()) }) } } diff --git a/adapters/no_op.go b/adapters/no_op.go index 7306114e2fa..8fd8df52a55 100644 --- a/adapters/no_op.go +++ b/adapters/no_op.go @@ -17,7 +17,7 @@ func (noa *NoOp) Perform(input models.RunResult, _ *store.Store) models.RunResul type NoOpPend struct{} // Perform on this adapter type returns an empty RunResult with an -// added field for the status to indicate the task is Pending +// added field for the status to indicate the task is Pending. func (noa *NoOpPend) Perform(input models.RunResult, _ *store.Store) models.RunResult { - return input.MarkPending() + return input.MarkPendingExternal() } diff --git a/cmd/renderer.go b/cmd/renderer.go index af4463feebe..9deb197d4da 100644 --- a/cmd/renderer.go +++ b/cmd/renderer.go @@ -152,7 +152,7 @@ func (rt RendererTable) renderJobRuns(j presenters.JobSpec) error { for _, jr := range j.Runs { table.Append([]string{ jr.ID, - jr.Status, + string(jr.Status), utils.ISO8601UTC(jr.CreatedAt), utils.NullISO8601UTC(jr.CompletedAt), jr.Result.Data.String(), diff --git a/internal/bin/cldev b/internal/bin/cldev index 765fa8d5ef8..5a1bfbd68e9 100755 --- a/internal/bin/cldev +++ b/internal/bin/cldev @@ -11,7 +11,7 @@ export LOG_LEVEL=debug export ROOT=./internal/devnet export ETH_URL=ws://localhost:18546 export ETH_CHAIN_ID=17 -export ETH_MIN_CONFIRMATIONS=2 +export TX_MIN_CONFIRMATIONS=2 LDFLAGS="-X github.com/smartcontractkit/chainlink/store.Sha=`git rev-parse HEAD`" diff --git a/internal/cltest/cltest.go b/internal/cltest/cltest.go index a6df0f542de..3d3356c6159 100644 --- a/internal/cltest/cltest.go +++ b/internal/cltest/cltest.go @@ -63,15 +63,16 @@ func NewConfigWithWSServer(wsserver *httptest.Server) *TestConfig { rootdir := path.Join(RootDir, fmt.Sprintf("%d-%d", time.Now().UnixNano(), count)) config := TestConfig{ Config: store.Config{ - LogLevel: store.LogLevel{zapcore.DebugLevel}, - RootDir: rootdir, - BasicAuthUsername: Username, - BasicAuthPassword: Password, - ChainID: 3, - EthMinConfirmations: 6, - EthGasBumpWei: *big.NewInt(5000000000), - EthGasBumpThreshold: 3, - EthGasPriceDefault: *big.NewInt(20000000000), + LogLevel: store.LogLevel{zapcore.DebugLevel}, + RootDir: rootdir, + BasicAuthUsername: Username, + BasicAuthPassword: Password, + ChainID: 3, + TxMinConfirmations: 6, + TaskMinConfirmations: 0, + EthGasBumpWei: *big.NewInt(5000000000), + EthGasBumpThreshold: 3, + EthGasPriceDefault: *big.NewInt(20000000000), }, } config.SetEthereumServer(wsserver) @@ -392,7 +393,7 @@ func WaitForJobRunToComplete( store *store.Store, jr models.JobRun, ) models.JobRun { - return WaitForJobRunStatus(t, store, jr, models.StatusCompleted) + return WaitForJobRunStatus(t, store, jr, models.RunStatusCompleted) } func WaitForJobRunToPend( @@ -400,17 +401,25 @@ func WaitForJobRunToPend( store *store.Store, jr models.JobRun, ) models.JobRun { - return WaitForJobRunStatus(t, store, jr, models.StatusPending) + return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingExternal) +} + +func WaitForJobRunToBlock( + t *testing.T, + store *store.Store, + jr models.JobRun, +) models.JobRun { + return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingConfirmations) } func WaitForJobRunStatus( t *testing.T, store *store.Store, jr models.JobRun, - status string, + status models.RunStatus, ) models.JobRun { t.Helper() - gomega.NewGomegaWithT(t).Eventually(func() string { + gomega.NewGomegaWithT(t).Eventually(func() models.RunStatus { assert.Nil(t, store.One("ID", jr.ID, &jr)) return jr.Status }).Should(gomega.Equal(status)) @@ -476,8 +485,18 @@ func ParseNullableTime(s string) null.Time { return NullableTime(ParseISO8601(s)) } -func IndexableBlockNumber(n int64) *models.IndexableBlockNumber { - return models.NewIndexableBlockNumber(big.NewInt(n)) +func IndexableBlockNumber(val interface{}) *models.IndexableBlockNumber { + switch val.(type) { + case int: + return models.NewIndexableBlockNumber(big.NewInt(int64(val.(int)))) + case uint64: + return models.NewIndexableBlockNumber(big.NewInt(int64(val.(uint64)))) + case int64: + return models.NewIndexableBlockNumber(big.NewInt(val.(int64))) + default: + logger.Panicf("Could not convert %v of type %T to IndexableBlockNumber", val, val) + return nil + } } func mustNotErr(err error) { diff --git a/internal/cltest/fixtures.go b/internal/cltest/fixtures.go index 47bd45af2fc..494d33a4acf 100644 --- a/internal/cltest/fixtures.go +++ b/internal/cltest/fixtures.go @@ -173,10 +173,11 @@ func JSONFromString(body string, args ...interface{}) models.JSON { return j } -func NewRunLog(jobID string, addr common.Address, json string) ethtypes.Log { +func NewRunLog(jobID string, addr common.Address, blk int, json string) ethtypes.Log { return ethtypes.Log{ - Address: addr, - Data: StringToRunLogData(json), + Address: addr, + BlockNumber: uint64(blk), + Data: StringToRunLogData(json), Topics: []common.Hash{ services.RunLogTopic, common.StringToHash("requestID"), @@ -199,6 +200,11 @@ func BigHexInt(val interface{}) hexutil.Big { } } +func NewBigHexInt(val interface{}) *hexutil.Big { + rval := BigHexInt(val) + return &rval +} + func RunResultWithValue(val string) models.RunResult { data := models.JSON{} data, err := data.Add("value", val) @@ -211,14 +217,15 @@ func RunResultWithValue(val string) models.RunResult { func RunResultWithError(err error) models.RunResult { return models.RunResult{ + Status: models.RunStatusErrored, ErrorMessage: null.StringFrom(err.Error()), } } -func MarkJobRunPending(jr models.JobRun, i int) models.JobRun { - jr.Status = models.StatusPending - jr.Result.Pending = true - jr.TaskRuns[i].Status = models.StatusPending - jr.TaskRuns[i].Result.Pending = true +func MarkJobRunPendingExternal(jr models.JobRun, i int) models.JobRun { + jr.Status = models.RunStatusPendingExternal + jr.Result.Status = models.RunStatusPendingExternal + jr.TaskRuns[i].Status = models.RunStatusPendingExternal + jr.TaskRuns[i].Result.Status = models.RunStatusPendingExternal return jr } diff --git a/internal/fixtures/web/runlog_random_number_job.json b/internal/fixtures/web/runlog_noop_job.json similarity index 56% rename from internal/fixtures/web/runlog_random_number_job.json rename to internal/fixtures/web/runlog_noop_job.json index 3b994da9170..e9962e2f2ce 100644 --- a/internal/fixtures/web/runlog_random_number_job.json +++ b/internal/fixtures/web/runlog_noop_job.json @@ -1,4 +1,4 @@ { "initiators": [{"type": "runLog"}], - "tasks": [{"type": "HttpGet"}] + "tasks": [{"type": "NoOp"}] } diff --git a/internal/fixtures/web/uint256_job.json b/internal/fixtures/web/uint256_job.json index 4f54ef0c7ff..421ab7fd1df 100644 --- a/internal/fixtures/web/uint256_job.json +++ b/internal/fixtures/web/uint256_job.json @@ -1,8 +1,6 @@ { "initiators": [{ "type": "web" }], "tasks": [ - { "type": "HttpGet", "url": "https://bitstamp.net/api/ticker/" }, - { "type": "JsonParse", "path": ["last"] }, { "type": "Multiply", "times": 100 }, { "type": "EthUint256" } ] diff --git a/internal/fixtures/web/uint256_string_times_job.json b/internal/fixtures/web/uint256_string_times_job.json deleted file mode 100644 index 0cd66bcdc49..00000000000 --- a/internal/fixtures/web/uint256_string_times_job.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "initiators": [{ "type": "web" }], - "tasks": [ - { "type": "HttpGet", "url": "https://bitstamp.net/api/ticker/" }, - { "type": "JsonParse", "path": ["last"] }, - { "type": "Multiply", "times": "100" }, - { "type": "EthUint256" } - ] -} diff --git a/services/ethereum_listener.go b/services/ethereum_listener.go index cb819d3efa3..acfe56a7610 100644 --- a/services/ethereum_listener.go +++ b/services/ethereum_listener.go @@ -76,13 +76,13 @@ func (el *EthereumListener) Disconnect() { } // OnNewHead resumes all pending job runs based on the new head activity. -func (el *EthereumListener) OnNewHead(_ *models.BlockHeader) { +func (el *EthereumListener) OnNewHead(head *models.BlockHeader) { pendingRuns, err := el.Store.PendingJobRuns() if err != nil { logger.Error(err.Error()) } for _, jr := range pendingRuns { - if _, err := ExecuteRun(jr, el.Store, models.RunResult{}); err != nil { + if _, err := ExecuteRunAtBlock(jr, el.Store, models.RunResult{}, head.ToIndexableBlockNumber()); err != nil { logger.Error(err.Error()) } } @@ -262,7 +262,7 @@ func (ht *HeadTracker) updateBlockHeader() { return } - bn := header.IndexableBlockNumber() + bn := header.ToIndexableBlockNumber() if bn.GreaterThan(ht.LastRecord()) { logger.Debug("Fast forwarding to block header ", bn.FriendlyString()) ht.Save(bn) @@ -274,7 +274,7 @@ func (ht *HeadTracker) listenToNewHeads() { logger.Debug("Tracking logs from last block ", ht.number.FriendlyString(), " with hash ", ht.number.Hash.String()) } for header := range ht.headers { - number := header.IndexableBlockNumber() + number := header.ToIndexableBlockNumber() logger.Debugw(fmt.Sprintf("Received header %v", number.FriendlyString()), "hash", header.Hash()) if err := ht.Save(number); err != nil { logger.Error(err.Error()) diff --git a/services/job_runner.go b/services/job_runner.go index 80c21312617..08beb665068 100644 --- a/services/job_runner.go +++ b/services/job_runner.go @@ -2,13 +2,11 @@ package services import ( "fmt" - "time" "github.com/smartcontractkit/chainlink/adapters" "github.com/smartcontractkit/chainlink/logger" "github.com/smartcontractkit/chainlink/store" "github.com/smartcontractkit/chainlink/store/models" - null "gopkg.in/guregu/null.v3" ) // BeginRun creates a new run if the job is valid and starts the job. @@ -17,12 +15,24 @@ func BeginRun( initr models.Initiator, input models.RunResult, store *store.Store, +) (models.JobRun, error) { + return BeginRunAtBlock(job, initr, input, store, nil) +} + +// BeginRunAtBlock builds and executes a new run if the job is valid with the block number +// to determine if tasks should be resumed. +func BeginRunAtBlock( + job models.JobSpec, + initr models.Initiator, + input models.RunResult, + store *store.Store, + bn *models.IndexableBlockNumber, ) (models.JobRun, error) { run, err := BuildRun(job, initr, store) if err != nil { return models.JobRun{}, err } - return ExecuteRun(run, store, input) + return ExecuteRunAtBlock(run, store, input, bn) } // BuildRun checks to ensure the given job has not started or ended before @@ -45,83 +55,93 @@ func BuildRun(job models.JobSpec, i models.Initiator, store *store.Store) (model // ExecuteRun starts the job and executes task runs within that job in the // order defined in the run for as long as they do not return errors. Results // are saved in the store (db). -func ExecuteRun(run models.JobRun, store *store.Store, input models.RunResult) (models.JobRun, error) { - run.Status = models.StatusInProgress - if err := store.Save(&run); err != nil { - return run, wrapError(run, err) +func ExecuteRun(jr models.JobRun, store *store.Store, overrides models.RunResult) (models.JobRun, error) { + return ExecuteRunAtBlock(jr, store, overrides, nil) +} + +func ExecuteRunAtBlock( + jr models.JobRun, + store *store.Store, + overrides models.RunResult, + bn *models.IndexableBlockNumber, +) (models.JobRun, error) { + jr.Status = models.RunStatusInProgress + if err := store.Save(&jr); err != nil { + return jr, wrapError(jr, err) } - logger.Infow("Starting job", run.ForLogger()...) - unfinished := run.UnfinishedTaskRuns() - offset := len(run.TaskRuns) - len(unfinished) - prevRun := unfinished[0] + jr, err := store.SaveCreationHeight(jr, bn) + if err != nil { + return jr, wrapError(jr, err) + } + logger.Infow("Starting job", jr.ForLogger()...) + unfinished := jr.UnfinishedTaskRuns() + offset := len(jr.TaskRuns) - len(unfinished) + latestRun := unfinished[0] - merged, err := prevRun.Result.Merge(input) + merged, err := latestRun.Result.Merge(overrides) if err != nil { - return run, wrapError(run, err) + return jr, wrapError(jr, err) } - prevRun.Result = merged + latestRun.Result = merged for i, taskRunTemplate := range unfinished { - taskRun, err := taskRunTemplate.MergeTaskParams(input.Data) + taskRun, err := taskRunTemplate.MergeTaskParams(overrides.Data) if err != nil { - return run, wrapError(run, err) - } - prevRun = startTask(taskRun, prevRun.Result, store) - logger.Debugw("Produced task run", "tr", prevRun) - run.TaskRuns[i+offset] = prevRun - if err := store.Save(&run); err != nil { - return run, wrapError(run, err) + return jr, wrapError(jr, err) } - if prevRun.Result.Pending { - logger.Infow(fmt.Sprintf("Task %v pending", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...) - break + latestRun = markCompleted(startTask(jr, taskRun, latestRun.Result, bn, store)) + jr.TaskRuns[i+offset] = latestRun + logTaskResult(latestRun, taskRun, i) + + if err := store.Save(&jr); err != nil { + return jr, wrapError(jr, err) } - logger.Infow(fmt.Sprintf("Task %v finished", taskRun.Task.Type), taskRun.ForLogger("task", i, "result", prevRun.Result)...) - if prevRun.Result.HasError() { + if !latestRun.Status.Runnable() { break } } - run.Result = prevRun.Result - if run.Result.HasError() { - run.Status = models.StatusErrored - } else if run.Result.Pending { - run.Status = models.StatusPending - } else { - run.Status = models.StatusCompleted - run.CompletedAt = null.Time{Time: time.Now(), Valid: true} - } + jr = jr.ApplyResult(latestRun.Result) + logger.Infow("Finished current job run execution", jr.ForLogger()...) + return jr, wrapError(jr, store.Save(&jr)) +} - logger.Infow("Finished current job run execution", run.ForLogger()...) - return run, wrapError(run, store.Save(&run)) +func logTaskResult(lr models.TaskRun, tr models.TaskRun, i int) { + logger.Debugw("Produced task run", "taskRun", lr) + logger.Debugw(fmt.Sprintf("Task %v %v", tr.Task.Type, tr.Result.Status), tr.ForLogger("task", i, "result", lr.Result)...) +} + +func markCompleted(tr models.TaskRun) models.TaskRun { + if tr.Status.Runnable() { + return tr.MarkCompleted() + } + return tr } func startTask( - run models.TaskRun, + jr models.JobRun, + tr models.TaskRun, input models.RunResult, + bn *models.IndexableBlockNumber, store *store.Store, ) models.TaskRun { - run.Status = models.StatusInProgress - adapter, err := adapters.For(run.Task, store) - if err != nil { - run.Status = models.StatusErrored - run.Result.SetError(err) - return run + if !jr.Runnable(bn, store.Config.TaskMinConfirmations) { + return tr.MarkPendingConfirmations() } - run.Result = adapter.Perform(input, store) - if run.Result.HasError() { - run.Status = models.StatusErrored - } else if run.Result.Pending { - run.Status = models.StatusPending - } else { - run.Status = models.StatusCompleted + tr.Status = models.RunStatusInProgress + adapter, err := adapters.For(tr.Task, store) + + if err != nil { + tr.Status = models.RunStatusErrored + tr.Result.SetError(err) + return tr } - return run + return tr.ApplyResult(adapter.Perform(input, store)) } func wrapError(run models.JobRun, err error) error { diff --git a/services/job_runner_test.go b/services/job_runner_test.go index d58a33dc6b2..fb2c0aee522 100644 --- a/services/job_runner_test.go +++ b/services/job_runner_test.go @@ -19,19 +19,14 @@ func TestJobRunner_ExecuteRun(t *testing.T) { name string input string runResult string - wantStatus string + wantStatus models.RunStatus wantData string }{ - {"success", `{}`, `{"data":{"value":"100"}}`, - models.StatusCompleted, `{"value":"100"}`}, - {"errored", `{}`, `{"error":"too much"}`, - models.StatusErrored, `{}`}, - {"errored with a value", `{}`, `{"error":"too much", "data":{"value":"99"}}`, - models.StatusErrored, `{"value":"99"}`}, - {"overriding bridge type params", `{"data":{"url":"hack"},"url":"hack"}`, `{"data":{"value":"100"}}`, - models.StatusCompleted, `{"value":"100"}`}, - {"type parameter does not override", `{"data":{"type":"0"},"type":"0"}`, `{"data":{"value":"100"}}`, - models.StatusCompleted, `{"value":"100"}`}, + {"success", `{}`, `{"data":{"value":"100"}}`, models.RunStatusCompleted, `{"value":"100"}`}, + {"errored", `{}`, `{"error":"too much"}`, models.RunStatusErrored, `{}`}, + {"errored with a value", `{}`, `{"error":"too much", "data":{"value":"99"}}`, models.RunStatusErrored, `{"value":"99"}`}, + {"overriding bridge type params", `{"url":"hack"}`, `{"data":{"value":"100"}}`, models.RunStatusCompleted, `{"value":"100","url":"hack"}`}, + {"type parameter does not override", `{"type":"0"}`, `{"data":{"value":"100"}}`, models.RunStatusCompleted, `{"value":"100","type":"0"}`}, } store, cleanup := cltest.NewStore() @@ -65,21 +60,44 @@ func TestJobRunner_ExecuteRun(t *testing.T) { store.One("ID", run.ID, &run) assert.Equal(t, test.wantStatus, run.Status) - assert.Equal(t, test.wantData, run.Result.Data.String()) + assert.JSONEq(t, test.wantData, run.Result.Data.String()) tr1 := run.TaskRuns[0] assert.Equal(t, test.wantStatus, tr1.Status) - assert.Equal(t, test.wantData, tr1.Result.Data.String()) + assert.JSONEq(t, test.wantData, tr1.Result.Data.String()) - if test.wantStatus == models.StatusCompleted { + if test.wantStatus == models.RunStatusCompleted { tr2 := run.TaskRuns[1] - assert.Equal(t, test.wantData, tr2.Result.Data.String()) + assert.JSONEq(t, test.wantData, tr2.Result.Data.String()) assert.True(t, run.CompletedAt.Valid) } }) } } +func TestExecuteRun_TransitionToPendingConfirmations(t *testing.T) { + t.Parallel() + store, cleanup := cltest.NewStore() + defer cleanup() + store.Config.TaskMinConfirmations = 6 + + job, initr := cltest.NewJobWithLogInitiator() + job.Tasks = []models.TaskSpec{cltest.NewTask("NoOp")} + + zero := cltest.IndexableBlockNumber(0) + run := job.NewRun(initr) + run, err := services.ExecuteRunAtBlock(run, store, models.RunResult{}, zero) + assert.Nil(t, err) + + store.One("ID", run.ID, &run) + assert.Equal(t, models.RunStatusPendingConfirmations, run.Status) + + trigger := cltest.IndexableBlockNumber(store.Config.TaskMinConfirmations) + run, err = services.ExecuteRunAtBlock(run, store, models.RunResult{}, trigger) + assert.Nil(t, err) + assert.Equal(t, models.RunStatusCompleted, run.Status) +} + func TestJobRunner_ExecuteRun_TransitionToPending(t *testing.T) { t.Parallel() store, cleanup := cltest.NewStore() @@ -93,7 +111,7 @@ func TestJobRunner_ExecuteRun_TransitionToPending(t *testing.T) { assert.Nil(t, err) store.One("ID", run.ID, &run) - assert.Equal(t, models.StatusPending, run.Status) + assert.Equal(t, models.RunStatusPendingExternal, run.Status) } func TestJobRunner_BeginRun(t *testing.T) { diff --git a/services/subscription.go b/services/subscription.go index 809ad7fd178..8e75f7030c6 100644 --- a/services/subscription.go +++ b/services/subscription.go @@ -224,7 +224,7 @@ func receiveEthLog(le RPCLogEvent) { func runJob(le RPCLogEvent, data models.JSON, initr models.Initiator) { input := models.RunResult{Data: data} - if _, err := BeginRun(le.Job, initr, input, le.store); err != nil { + if _, err := BeginRunAtBlock(le.Job, initr, input, le.store, le.ToIndexableBlockNumber()); err != nil { logger.Errorw(err.Error(), le.ForLogger()...) } } @@ -256,6 +256,12 @@ func (le RPCLogEvent) ToDebug() { logger.Debugw(msg, le.ForLogger()...) } +func (le RPCLogEvent) ToIndexableBlockNumber() *models.IndexableBlockNumber { + num := new(big.Int) + num.SetUint64(le.Log.BlockNumber) + return models.NewIndexableBlockNumber(num, le.Log.BlockHash) +} + // ValidateRunLog returns whether or not the contained log is a RunLog, // a specific Chainlink event trigger from smart contracts. func (le RPCLogEvent) ValidateRunLog() bool { diff --git a/store/config.go b/store/config.go index 8dc1d35e522..55ad4faaaf3 100644 --- a/store/config.go +++ b/store/config.go @@ -17,19 +17,20 @@ import ( // Config holds parameters used by the application which can be overridden // by setting environment variables. type Config struct { - LogLevel LogLevel `env:"LOG_LEVEL" envDefault:"info"` - RootDir string `env:"ROOT" envDefault:"~/.chainlink"` - Port string `env:"PORT" envDefault:"6688"` - BasicAuthUsername string `env:"USERNAME" envDefault:"chainlink"` - BasicAuthPassword string `env:"PASSWORD" envDefault:"twochains"` - EthereumURL string `env:"ETH_URL" envDefault:"ws://localhost:8546"` - ChainID uint64 `env:"ETH_CHAIN_ID" envDefault:"0"` - ClientNodeURL string `env:"CLIENT_NODE_URL" envDefault:"http://localhost:6688"` - EthMinConfirmations uint64 `env:"ETH_MIN_CONFIRMATIONS" envDefault:"12"` - EthGasBumpThreshold uint64 `env:"ETH_GAS_BUMP_THRESHOLD" envDefault:"12"` - EthGasBumpWei big.Int `env:"ETH_GAS_BUMP_WEI" envDefault:"5000000000"` - EthGasPriceDefault big.Int `env:"ETH_GAS_PRICE_DEFAULT" envDefault:"20000000000"` - LinkContractAddress string `env:"LINK_CONTRACT_ADDRESS" envDefault:"0x514910771AF9Ca656af840dff83E8264EcF986CA"` + LogLevel LogLevel `env:"LOG_LEVEL" envDefault:"info"` + RootDir string `env:"ROOT" envDefault:"~/.chainlink"` + Port string `env:"PORT" envDefault:"6688"` + BasicAuthUsername string `env:"USERNAME" envDefault:"chainlink"` + BasicAuthPassword string `env:"PASSWORD" envDefault:"twochains"` + EthereumURL string `env:"ETH_URL" envDefault:"ws://localhost:8546"` + ChainID uint64 `env:"ETH_CHAIN_ID" envDefault:"0"` + ClientNodeURL string `env:"CLIENT_NODE_URL" envDefault:"http://localhost:6688"` + TxMinConfirmations uint64 `env:"TX_MIN_CONFIRMATIONS" envDefault:"12"` + TaskMinConfirmations uint64 `env:"TASK_MIN_CONFIRMATIONS" envDefault:"0"` + EthGasBumpThreshold uint64 `env:"ETH_GAS_BUMP_THRESHOLD" envDefault:"12"` + EthGasBumpWei big.Int `env:"ETH_GAS_BUMP_WEI" envDefault:"5000000000"` + EthGasPriceDefault big.Int `env:"ETH_GAS_PRICE_DEFAULT" envDefault:"20000000000"` + LinkContractAddress string `env:"LINK_CONTRACT_ADDRESS" envDefault:"0x514910771AF9Ca656af840dff83E8264EcF986CA"` } // NewConfig returns the config with the environment variables set to their diff --git a/store/models/common.go b/store/models/common.go index f6cae6f2855..1d3a4a4764e 100644 --- a/store/models/common.go +++ b/store/models/common.go @@ -12,6 +12,54 @@ import ( "github.com/tidwall/gjson" ) +type RunStatus string + +const ( + // RunStatusUnstarted is the default state of any run status. + RunStatusUnstarted = RunStatus("") + // RunStatusInProgress is used for when a run is actively being executed. + RunStatusInProgress = RunStatus("in progress") + // RunStatusPendingConfirmations is used for when a run is awaiting for block confirmations. + RunStatusPendingConfirmations = RunStatus("pending_confirmations") + // RunStatusPendingExternal is used for when a run is waiting on the completion + // of another event. + RunStatusPendingExternal = RunStatus("pending_external") + // RunStatusErrored is used for when a run has errored and will not complete. + RunStatusErrored = RunStatus("errored") + // RunStatusCompleted is used for when a run has successfully completed execution. + RunStatusCompleted = RunStatus("completed") +) + +// Pending returns true if the status is pending. +func (s RunStatus) PendingExternal() bool { + return s == RunStatusPendingExternal +} + +// PendingConfirmations returns true if the status is pending. +func (s RunStatus) PendingConfirmations() bool { + return s == RunStatusPendingConfirmations +} + +// Completed returns true if the status is RunStatusCompleted. +func (s RunStatus) Completed() bool { + return s == RunStatusCompleted +} + +// Errored returns true if the status is RunStatusErrored. +func (s RunStatus) Errored() bool { + return s == RunStatusErrored +} + +// Pending returns true if the status is pending external or confirmations. +func (s RunStatus) Pending() bool { + return s.PendingExternal() || s.PendingConfirmations() +} + +// Runnable returns true if the status is ready to be run. +func (s RunStatus) Runnable() bool { + return !s.Errored() && !s.Pending() +} + // JSON stores the json types string, number, bool, and null. // Arrays and Objects are returned as their raw json types. type JSON struct { diff --git a/store/models/eth.go b/store/models/eth.go index be95087ccea..209c58e1200 100644 --- a/store/models/eth.go +++ b/store/models/eth.go @@ -126,7 +126,7 @@ func (h BlockHeader) Hash() common.Hash { return h.ParityHash } -func (h BlockHeader) IndexableBlockNumber() *IndexableBlockNumber { +func (h BlockHeader) ToIndexableBlockNumber() *IndexableBlockNumber { return NewIndexableBlockNumber(h.Number.ToInt(), h.Hash()) } diff --git a/store/models/job_spec.go b/store/models/job_spec.go index 722fe740c33..8209dfc0ac0 100644 --- a/store/models/job_spec.go +++ b/store/models/job_spec.go @@ -11,18 +11,6 @@ import ( null "gopkg.in/guregu/null.v3" ) -const ( - // StatusInProgress is used for when a run is actively being executed. - StatusInProgress = "in progress" - // StatusPending is used for when a run is waiting on the completion - // of another event. - StatusPending = "pending" - // StatusErrored is used for when a run has errored and will not complete. - StatusErrored = "errored" - // StatusCompleted is used for when a run has successfully completed execution. - StatusCompleted = "completed" -) - // JobSpec is the definition for all the work to be carried out by the node // for a given contract. It contains the Initiators, Tasks (which are the // individual steps to be carried out), StartAt, EndAt, and CreatedAt fields. diff --git a/store/models/orm.go b/store/models/orm.go index 33bb10c6001..934df3f1da0 100644 --- a/store/models/orm.go +++ b/store/models/orm.go @@ -110,10 +110,25 @@ func (orm *ORM) SaveJob(job *JobSpec) error { return tx.Commit() } +func (orm *ORM) SaveCreationHeight(jr JobRun, bn *IndexableBlockNumber) (JobRun, error) { + if jr.CreationHeight != nil || bn == nil { + return jr, nil + } + + dup := bn.Number + jr.CreationHeight = &dup + return jr, orm.Save(&jr) +} + // PendingJobRuns returns the JobRuns which have a status of "pending". func (orm *ORM) PendingJobRuns() ([]JobRun, error) { runs := []JobRun{} - err := orm.Where("Status", StatusPending, &runs) + statuses := []RunStatus{RunStatusPendingExternal, RunStatusPendingConfirmations} + err := orm.Select(q.In("Status", statuses)).Find(&runs) + if err == storm.ErrNotFound { + return []JobRun{}, nil + } + return runs, err } diff --git a/store/models/orm_test.go b/store/models/orm_test.go index 7603342dc08..14f012ab434 100644 --- a/store/models/orm_test.go +++ b/store/models/orm_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/smartcontractkit/chainlink/internal/cltest" "github.com/smartcontractkit/chainlink/store/models" "github.com/stretchr/testify/assert" @@ -64,9 +65,13 @@ func TestPendingJobRuns(t *testing.T) { assert.Nil(t, store.Save(&npr)) pr := j.NewRun(i) - pr.Status = models.StatusPending + pr.Status = models.RunStatusPendingExternal assert.Nil(t, store.Save(&pr)) + br := j.NewRun(i) + br.Status = models.RunStatusPendingConfirmations + assert.Nil(t, store.Save(&br)) + pending, err := store.PendingJobRuns() assert.Nil(t, err) pendingIDs := []string{} @@ -75,6 +80,7 @@ func TestPendingJobRuns(t *testing.T) { } assert.Contains(t, pendingIDs, pr.ID) + assert.Contains(t, pendingIDs, br.ID) assert.NotContains(t, pendingIDs, npr.ID) } @@ -140,3 +146,40 @@ func TestBridgeTypeFor(t *testing.T) { }) } } + +func TestORM_SaveCreationHeight(t *testing.T) { + t.Parallel() + + store, cleanup := cltest.NewStore() + defer cleanup() + + job, initr := cltest.NewJobWithWebInitiator() + cases := []struct { + name string + creationHeight *big.Int + parameterHeight *big.Int + wantHeight *big.Int + }{ + {"unset", nil, big.NewInt(2), big.NewInt(2)}, + {"set", big.NewInt(1), big.NewInt(2), big.NewInt(1)}, + {"unset and nil", nil, nil, nil}, + } + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + jr := job.NewRun(initr) + if test.creationHeight != nil { + ch := hexutil.Big(*test.creationHeight) + jr.CreationHeight = &ch + } + assert.Nil(t, store.Save(&jr)) + + bn := models.NewIndexableBlockNumber(test.parameterHeight) + result, err := store.SaveCreationHeight(jr, bn) + + assert.Nil(t, err) + assert.Equal(t, test.wantHeight, result.CreationHeight.ToInt()) + assert.Nil(t, store.One("ID", jr.ID, &jr)) + assert.Equal(t, test.wantHeight, jr.CreationHeight.ToInt()) + }) + } +} diff --git a/store/models/run.go b/store/models/run.go index fe973bc9085..fb7de48511a 100644 --- a/store/models/run.go +++ b/store/models/run.go @@ -1,9 +1,12 @@ package models import ( + "encoding/json" "fmt" + "math/big" "time" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/tidwall/gjson" null "gopkg.in/guregu/null.v3" ) @@ -11,14 +14,15 @@ import ( // JobRun tracks the status of a job by holding its TaskRuns and the // Result of each Run. type JobRun struct { - ID string `json:"id" storm:"id,unique"` - JobID string `json:"jobId" storm:"index"` - Status string `json:"status" storm:"index"` - Result RunResult `json:"result" storm:"inline"` - TaskRuns []TaskRun `json:"taskRuns" storm:"inline"` - CreatedAt time.Time `json:"createdAt" storm:"index"` - CompletedAt null.Time `json:"completedAt"` - Initiator Initiator `json:"initiator"` + ID string `json:"id" storm:"id,unique"` + JobID string `json:"jobId" storm:"index"` + Result RunResult `json:"result" storm:"inline"` + Status RunStatus `json:"status" storm:"index"` + TaskRuns []TaskRun `json:"taskRuns" storm:"inline"` + CreatedAt time.Time `json:"createdAt" storm:"index"` + CompletedAt null.Time `json:"completedAt"` + Initiator Initiator `json:"initiator"` + CreationHeight *hexutil.Big `json:"creationHeight"` } // ForLogger formats the JobRun for a common formatting in the log. @@ -41,9 +45,9 @@ func (jr JobRun) ForLogger(kvs ...interface{}) []interface{} { func (jr JobRun) UnfinishedTaskRuns() []TaskRun { unfinished := jr.TaskRuns for _, tr := range jr.TaskRuns { - if tr.Completed() { + if tr.Status.Completed() { unfinished = unfinished[1:] - } else if tr.Errored() { + } else if tr.Status.Errored() { return []TaskRun{} } else { return unfinished @@ -58,23 +62,41 @@ func (jr JobRun) NextTaskRun() TaskRun { return jr.UnfinishedTaskRuns()[0] } +func (jr JobRun) Runnable(bn *IndexableBlockNumber, minConfs uint64) bool { + if jr.CreationHeight == nil || bn == nil { + return true + } + + diff := new(big.Int).Sub(bn.ToInt(), jr.CreationHeight.ToInt()) + min := new(big.Int).SetUint64(minConfs) + return diff.Cmp(min) >= 0 +} + +func (jr JobRun) ApplyResult(result RunResult) JobRun { + jr.Result = result + jr.Status = result.Status + if jr.Status.Completed() { + jr.CompletedAt = null.Time{Time: time.Now(), Valid: true} + } + return jr +} + +// MarkCompleted sets the JobRun's status to completed and records the +// completed at time. +func (jr JobRun) MarkCompleted() JobRun { + jr.Status = RunStatusCompleted + jr.Result.Status = RunStatusCompleted + jr.CompletedAt = null.Time{Time: time.Now(), Valid: true} + return jr +} + // TaskRun stores the Task and represents the status of the // Task to be ran. type TaskRun struct { - Task TaskSpec `json:"task"` ID string `json:"id" storm:"id,unique"` - Status string `json:"status"` Result RunResult `json:"result"` -} - -// Completed returns true if the TaskRun status is StatusCompleted. -func (tr TaskRun) Completed() bool { - return tr.Status == StatusCompleted -} - -// Errored returns true if the TaskRun status is StatusErrored. -func (tr TaskRun) Errored() bool { - return tr.Status == StatusErrored + Status RunStatus `json:"status"` + Task TaskSpec `json:"task"` } // String returns info on the TaskRun as "ID,Type,Status,Result". @@ -109,39 +131,59 @@ func (tr TaskRun) MergeTaskParams(j JSON) (TaskRun, error) { return tr, nil } +func (tr TaskRun) ApplyResult(result RunResult) TaskRun { + tr.Result = result + tr.Status = result.Status + return tr +} + +// MarkCompleted marks the task's status as completed. +func (tr TaskRun) MarkCompleted() TaskRun { + tr.Status = RunStatusCompleted + tr.Result.Status = RunStatusCompleted + return tr +} + +// MarkPendingConfirmations marks the task's status as blocked. +func (tr TaskRun) MarkPendingConfirmations() TaskRun { + tr.Status = RunStatusPendingConfirmations + tr.Result.Status = RunStatusPendingConfirmations + return tr +} + // RunResult keeps track of the outcome of a TaskRun. It stores // the Data and ErrorMessage, if any of either, and contains // a Pending field to track the status. type RunResult struct { JobRunID string `json:"jobRunId"` Data JSON `json:"data"` + Status RunStatus `json:"status"` ErrorMessage null.String `json:"error"` - Pending bool `json:"pending"` } // WithValue returns a copy of the RunResult, overriding the "value" field of -// Data and setting Pending to false. +// Data and setting the status to completed. func (rr RunResult) WithValue(val string) RunResult { data, err := rr.Data.Add("value", val) if err != nil { return rr.WithError(err) } - rr.Pending = false + rr.Status = RunStatusCompleted rr.Data = data return rr } // WithValue returns a copy of the RunResult, setting the error field -// and setting Pending to false. +// and setting the status to in progress. func (rr RunResult) WithError(err error) RunResult { rr.ErrorMessage = null.StringFrom(err.Error()) - rr.Pending = false + rr.Status = RunStatusErrored return rr } -// MarkPending returns a copy of RunResult but with Pending set to true. -func (rr RunResult) MarkPending() RunResult { - rr.Pending = true +// MarkPendingExternal returns a copy of RunResult but with status set to pending. +func (rr RunResult) MarkPendingExternal() RunResult { + rr.Status = RunStatusPendingExternal return rr } @@ -210,8 +252,35 @@ func (rr RunResult) Merge(in RunResult) (RunResult, error) { if len(in.JobRunID) == 0 { in.JobRunID = rr.JobRunID } - if in.Pending || rr.Pending { - in.Pending = true + if in.Status.Errored() || rr.Status.Errored() { + in.Status = RunStatusErrored + } else if in.Status.PendingExternal() || rr.Status.PendingExternal() { + in = in.MarkPendingExternal() } return in, nil } + +// BridgeRunResult handles the parsing of RunResults from external adapters. +type BridgeRunResult struct { + RunResult + ExternalPending bool `json:"pending"` +} + +// UnmarshalJSON parses the given input and updates the BridgeRunResult in the +// external adapter format. +func (brr *BridgeRunResult) UnmarshalJSON(input []byte) error { + type biAlias BridgeRunResult + var anon biAlias + err := json.Unmarshal(input, &anon) + *brr = BridgeRunResult(anon) + + if brr.Status.Errored() || brr.HasError() { + brr.Status = RunStatusErrored + } else if brr.ExternalPending || brr.Status.PendingExternal() { + brr.Status = RunStatusPendingExternal + } else { + brr.Status = RunStatusCompleted + } + + return err +} diff --git a/store/models/run_test.go b/store/models/run_test.go index 1de04a449e4..0ce27aeb535 100644 --- a/store/models/run_test.go +++ b/store/models/run_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/smartcontractkit/chainlink/internal/cltest" "github.com/smartcontractkit/chainlink/services" "github.com/smartcontractkit/chainlink/store/models" @@ -52,6 +53,36 @@ func TestJobRun_UnfinishedTaskRuns(t *testing.T) { assert.Equal(t, jr.TaskRuns[1:], jr.UnfinishedTaskRuns()) } +func TestJobRun_Runnable(t *testing.T) { + t.Parallel() + + job, initr := cltest.NewJobWithLogInitiator() + tests := []struct { + name string + creationHeight *hexutil.Big + blockNumber *models.IndexableBlockNumber + minimumConfirmations uint64 + want bool + }{ + {"unset nil 0", nil, nil, 0, true}, + {"1 nil 0", cltest.NewBigHexInt(1), nil, 0, true}, + {"1 1 diff 0", cltest.NewBigHexInt(1), cltest.IndexableBlockNumber(1), 0, true}, + {"1 1 diff 1", cltest.NewBigHexInt(1), cltest.IndexableBlockNumber(1), 1, false}, + {"1 2 diff 1", cltest.NewBigHexInt(1), cltest.IndexableBlockNumber(2), 1, true}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + jr := job.NewRun(initr) + if test.creationHeight != nil { + jr.CreationHeight = test.creationHeight + } + + assert.Equal(t, test.want, jr.Runnable(test.blockNumber, test.minimumConfirmations)) + }) + } +} + func TestTaskRun_Merge(t *testing.T) { t.Parallel() @@ -121,52 +152,56 @@ func TestRunResult_Value(t *testing.T) { func TestRunResult_Merge(t *testing.T) { t.Parallel() + inProgress := models.RunStatusInProgress + pending := models.RunStatusPendingExternal + errored := models.RunStatusErrored + nullString := cltest.NullString(nil) jrID := utils.NewBytes32ID() tests := []struct { name string originalData string originalError null.String - originalPending bool + originalStatus models.RunStatus originalJRID string inData string inError null.String - inPending bool + inStatus models.RunStatus inJRID string wantData string wantErrorMessage null.String - wantPending bool + wantStatus models.RunStatus wantJRID string wantErrored bool }{ {"merging data", - `{"value":"old&busted","unique":"1"}`, nullString, false, jrID, - `{"value":"newHotness","and":"!"}`, nullString, false, jrID, - `{"value":"newHotness","unique":"1","and":"!"}`, nullString, false, jrID, false}, + `{"value":"old&busted","unique":"1"}`, nullString, inProgress, jrID, + `{"value":"newHotness","and":"!"}`, nullString, inProgress, jrID, + `{"value":"newHotness","unique":"1","and":"!"}`, nullString, inProgress, jrID, false}, {"original error throws", - `{"value":"old"}`, cltest.NullString("old problem"), false, jrID, - `{}`, nullString, false, jrID, - `{"value":"old"}`, cltest.NullString("old problem"), false, jrID, true}, + `{"value":"old"}`, cltest.NullString("old problem"), errored, jrID, + `{}`, nullString, inProgress, jrID, + `{"value":"old"}`, cltest.NullString("old problem"), errored, jrID, true}, {"error override", - `{"value":"old"}`, nullString, false, jrID, - `{}`, cltest.NullString("new problem"), false, jrID, - `{"value":"old"}`, cltest.NullString("new problem"), false, jrID, false}, + `{"value":"old"}`, nullString, inProgress, jrID, + `{}`, cltest.NullString("new problem"), errored, jrID, + `{"value":"old"}`, cltest.NullString("new problem"), errored, jrID, false}, {"original job run ID", - `{"value":"old"}`, nullString, false, jrID, - `{}`, nullString, false, "", - `{"value":"old"}`, nullString, false, jrID, false}, + `{"value":"old"}`, nullString, inProgress, jrID, + `{}`, nullString, inProgress, "", + `{"value":"old"}`, nullString, inProgress, jrID, false}, {"job run ID override", - `{"value":"old"}`, nullString, false, utils.NewBytes32ID(), - `{}`, nullString, false, jrID, - `{"value":"old"}`, nullString, false, jrID, false}, + `{"value":"old"}`, nullString, inProgress, utils.NewBytes32ID(), + `{}`, nullString, inProgress, jrID, + `{"value":"old"}`, nullString, inProgress, jrID, false}, {"original pending", - `{"value":"old"}`, nullString, true, jrID, - `{}`, nullString, false, jrID, - `{"value":"old"}`, nullString, true, jrID, false}, + `{"value":"old"}`, nullString, pending, jrID, + `{}`, nullString, inProgress, jrID, + `{"value":"old"}`, nullString, pending, jrID, false}, {"pending override", - `{"value":"old"}`, nullString, false, jrID, - `{}`, nullString, true, jrID, - `{"value":"old"}`, nullString, true, jrID, false}, + `{"value":"old"}`, nullString, inProgress, jrID, + `{}`, nullString, pending, jrID, + `{"value":"old"}`, nullString, pending, jrID, false}, } for _, test := range tests { @@ -175,13 +210,13 @@ func TestRunResult_Merge(t *testing.T) { Data: models.JSON{gjson.Parse(test.originalData)}, ErrorMessage: test.originalError, JobRunID: test.originalJRID, - Pending: test.originalPending, + Status: test.originalStatus, } in := models.RunResult{ Data: cltest.JSONFromString(test.inData), ErrorMessage: test.inError, JobRunID: test.inJRID, - Pending: test.inPending, + Status: test.inStatus, } merged, err := original.Merge(in) assert.Equal(t, test.wantErrored, err != nil) @@ -189,17 +224,17 @@ func TestRunResult_Merge(t *testing.T) { assert.JSONEq(t, test.originalData, original.Data.String()) assert.Equal(t, test.originalError, original.ErrorMessage) assert.Equal(t, test.originalJRID, original.JobRunID) - assert.Equal(t, test.originalPending, original.Pending) + assert.Equal(t, test.originalStatus, original.Status) assert.JSONEq(t, test.inData, in.Data.String()) assert.Equal(t, test.inError, in.ErrorMessage) assert.Equal(t, test.inJRID, in.JobRunID) - assert.Equal(t, test.inPending, in.Pending) + assert.Equal(t, test.inStatus, in.Status) assert.JSONEq(t, test.wantData, merged.Data.String()) assert.Equal(t, test.wantErrorMessage, merged.ErrorMessage) assert.Equal(t, test.wantJRID, merged.JobRunID) - assert.Equal(t, test.wantPending, merged.Pending) + assert.Equal(t, test.wantStatus, merged.Status) }) } } diff --git a/store/tx_manager.go b/store/tx_manager.go index 1db56f6fe3d..bcb7c001fea 100644 --- a/store/tx_manager.go +++ b/store/tx_manager.go @@ -144,7 +144,7 @@ func (txm *TxManager) handleConfirmed( blkNum uint64, ) (bool, error) { - minConfs := big.NewInt(int64(txm.Config.EthMinConfirmations)) + minConfs := big.NewInt(int64(txm.Config.TxMinConfirmations)) rcptBlkNum := big.Int(rcpt.BlockNumber) safeAt := minConfs.Add(&rcptBlkNum, minConfs) if big.NewInt(int64(blkNum)).Cmp(safeAt) == -1 { diff --git a/store/tx_manager_test.go b/store/tx_manager_test.go index 2be65cc85df..2ae569fe7c8 100644 --- a/store/tx_manager_test.go +++ b/store/tx_manager_test.go @@ -128,7 +128,7 @@ func TestTxManager_EnsureTxConfirmed_WhenSafe(t *testing.T) { Hash: cltest.NewHash(), BlockNumber: cltest.BigHexInt(sentAt), }) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.EthMinConfirmations)) + ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.TxMinConfirmations)) tx := cltest.CreateTxAndAttempt(store, from, sentAt) a := tx.TxAttempt @@ -161,7 +161,7 @@ func TestTxManager_EnsureTxConfirmed_WhenWithConfsButNotSafe(t *testing.T) { Hash: cltest.NewHash(), BlockNumber: cltest.BigHexInt(sentAt), }) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.EthMinConfirmations-1)) + ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.TxMinConfirmations-1)) tx := cltest.CreateTxAndAttempt(store, from, sentAt) a := tx.TxAttempt diff --git a/web/integration_test.go b/web/integration_test.go index 90b3a1bd2ef..9ed01e9179f 100644 --- a/web/integration_test.go +++ b/web/integration_test.go @@ -57,7 +57,7 @@ func TestIntegration_HelloWorld(t *testing.T) { hash := common.HexToHash("0xb7862c896a6ba2711bccc0410184e46d793ea83b3e05470f1d359ea276d16bb5") sentAt := uint64(23456) confirmed := sentAt + config.EthGasBumpThreshold + 1 - safe := confirmed + config.EthMinConfirmations + safe := confirmed + config.TxMinConfirmations eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) eth.Register("eth_sendRawTransaction", hash) @@ -150,30 +150,35 @@ func TestIntegration_EthLog(t *testing.T) { } func TestIntegration_RunLog(t *testing.T) { - app, cleanup := cltest.NewApplication() + config, _ := cltest.NewConfig() + config.TaskMinConfirmations = 6 + app, cleanup := cltest.NewApplicationWithConfig(config) defer cleanup() eth := app.MockEthClient() logs := make(chan types.Log, 1) eth.RegisterSubscription("logs", logs) + newHeads := eth.RegisterNewHeads() app.Start() - gock.EnableNetworking() - defer cltest.CloseGock(t) - gock.New("https://etherprice.com"). - Get("/api"). - Reply(200). - JSON(`{}`) - - j := cltest.FixtureCreateJobViaWeb(t, app, "../internal/fixtures/web/runlog_random_number_job.json") + j := cltest.FixtureCreateJobViaWeb(t, app, "../internal/fixtures/web/runlog_noop_job.json") var initr models.Initiator app.Store.One("JobID", j.ID, &initr) assert.Equal(t, models.InitiatorRunLog, initr.Type) - logs <- cltest.NewRunLog(j.ID, cltest.NewAddress(), `{"url":"https://etherprice.com/api"}`) - + logBlockNumber := 1 + logs <- cltest.NewRunLog(j.ID, cltest.NewAddress(), logBlockNumber, `{}`) cltest.WaitForRuns(t, j, app.Store, 1) + + runs, err := app.Store.JobRunsFor(j.ID) + assert.Nil(t, err) + jr := runs[0] + cltest.WaitForJobRunToBlock(t, app.Store, jr) + + safeNumber := logBlockNumber + int(app.Store.Config.TaskMinConfirmations) + newHeads <- models.BlockHeader{Number: cltest.BigHexInt(safeNumber)} + cltest.WaitForJobRunToComplete(t, app.Store, jr) } func TestIntegration_EndAt(t *testing.T) { @@ -275,7 +280,7 @@ func TestIntegration_ExternalAdapter_Pending(t *testing.T) { jr = cltest.WaitForJobRunToPend(t, app.Store, jr) tr := jr.TaskRuns[0] - assert.Equal(t, models.StatusPending, tr.Status) + assert.Equal(t, models.RunStatusPendingExternal, tr.Status) val, err := tr.Result.Value() assert.NotNil(t, err) assert.Equal(t, "", val) @@ -283,7 +288,7 @@ func TestIntegration_ExternalAdapter_Pending(t *testing.T) { jr = cltest.UpdateJobRunViaWeb(t, app, jr, `{"data":{"value":"100"}}`) jr = cltest.WaitForJobRunToComplete(t, app.Store, jr) tr = jr.TaskRuns[0] - assert.Equal(t, models.StatusCompleted, tr.Status) + assert.Equal(t, models.RunStatusCompleted, tr.Status) val, err = tr.Result.Value() assert.Nil(t, err) assert.Equal(t, "100", val) @@ -324,42 +329,12 @@ func TestIntegration_WeiWatchers(t *testing.T) { } func TestIntegration_MultiplierUint256(t *testing.T) { - gock.EnableNetworking() - defer cltest.CloseGock(t) - app, cleanup := cltest.NewApplication() defer cleanup() app.Start() - gock.New("https://bitstamp.net"). - Get("/api/ticker"). - Reply(200). - JSON(`{"last": "10221.30"}`) - j := cltest.FixtureCreateJobViaWeb(t, app, "../internal/fixtures/web/uint256_job.json") - jr := cltest.CreateJobRunViaWeb(t, app, j) - jr = cltest.WaitForJobRunToComplete(t, app.Store, jr) - - val, err := jr.Result.Value() - assert.Nil(t, err) - assert.Equal(t, "0x00000000000000000000000000000000000000000000000000000000000f98b2", val) -} - -func TestIntegration_MultiplierUint256String(t *testing.T) { - gock.EnableNetworking() - defer cltest.CloseGock(t) - - app, cleanup := cltest.NewApplication() - defer cleanup() - app.Start() - - gock.New("https://bitstamp.net"). - Get("/api/ticker"). - Reply(200). - JSON(`{"last": "10221.30"}`) - - j := cltest.FixtureCreateJobViaWeb(t, app, "../internal/fixtures/web/uint256_string_times_job.json") - jr := cltest.CreateJobRunViaWeb(t, app, j) + jr := cltest.CreateJobRunViaWeb(t, app, j, `{"value":"10221.30"}`) jr = cltest.WaitForJobRunToComplete(t, app.Store, jr) val, err := jr.Result.Value() diff --git a/web/job_runs_controller.go b/web/job_runs_controller.go index 23fe4159824..f6909538ba7 100644 --- a/web/job_runs_controller.go +++ b/web/job_runs_controller.go @@ -76,7 +76,7 @@ func getRunData(c *gin.Context) (models.JSON, error) { // "/runs/:RunID" func (jrc *JobRunsController) Update(c *gin.Context) { id := c.Param("RunID") - var rr models.RunResult + var brr models.BridgeRunResult if jr, err := jrc.App.Store.FindJobRun(id); err == storm.ErrNotFound { c.JSON(404, gin.H{ "errors": []string{"Job Run not found"}, @@ -85,16 +85,16 @@ func (jrc *JobRunsController) Update(c *gin.Context) { c.JSON(500, gin.H{ "errors": []string{err.Error()}, }) - } else if !jr.Result.Pending { + } else if !jr.Result.Status.PendingExternal() { c.JSON(405, gin.H{ "errors": []string{"Cannot resume a job run that isn't pending"}, }) - } else if err := c.ShouldBindJSON(&rr); err != nil { + } else if err := c.ShouldBindJSON(&brr); err != nil { c.JSON(500, gin.H{ "errors": []string{err.Error()}, }) } else { - executeRun(jr, jrc.App.Store, rr) + executeRun(jr, jrc.App.Store, brr.RunResult) c.JSON(200, gin.H{"id": jr.ID}) } } diff --git a/web/job_runs_controller_test.go b/web/job_runs_controller_test.go index 05aa9da3ea3..97f7bd40ca2 100644 --- a/web/job_runs_controller_test.go +++ b/web/job_runs_controller_test.go @@ -141,7 +141,7 @@ func TestJobRunsController_Update_Success(t *testing.T) { j, initr := cltest.NewJobWithWebInitiator() j.Tasks = []models.TaskSpec{cltest.NewTask(bt.Name)} assert.Nil(t, app.Store.Save(&j)) - jr := cltest.MarkJobRunPending(j.NewRun(initr), 0) + jr := cltest.MarkJobRunPendingExternal(j.NewRun(initr), 0) assert.Nil(t, app.Store.Save(&jr)) url := app.Server.URL + "/v2/runs/" + jr.ID @@ -186,7 +186,7 @@ func TestJobRunsController_Update_WithError(t *testing.T) { j, initr := cltest.NewJobWithWebInitiator() j.Tasks = []models.TaskSpec{cltest.NewTask(bt.Name)} assert.Nil(t, app.Store.Save(&j)) - jr := cltest.MarkJobRunPending(j.NewRun(initr), 0) + jr := cltest.MarkJobRunPendingExternal(j.NewRun(initr), 0) assert.Nil(t, app.Store.Save(&jr)) url := app.Server.URL + "/v2/runs/" + jr.ID @@ -196,7 +196,7 @@ func TestJobRunsController_Update_WithError(t *testing.T) { jrID := cltest.ParseCommonJSON(resp.Body).ID assert.Equal(t, jr.ID, jrID) - jr = cltest.WaitForJobRunStatus(t, app.Store, jr, models.StatusErrored) + jr = cltest.WaitForJobRunStatus(t, app.Store, jr, models.RunStatusErrored) val, err := jr.Result.Value() assert.Nil(t, err) assert.Equal(t, "0", val) @@ -212,7 +212,7 @@ func TestJobRunsController_Update_BadInput(t *testing.T) { j, initr := cltest.NewJobWithWebInitiator() j.Tasks = []models.TaskSpec{cltest.NewTask(bt.Name)} assert.Nil(t, app.Store.Save(&j)) - jr := cltest.MarkJobRunPending(j.NewRun(initr), 0) + jr := cltest.MarkJobRunPendingExternal(j.NewRun(initr), 0) assert.Nil(t, app.Store.Save(&jr)) url := app.Server.URL + "/v2/runs/" + jr.ID @@ -220,7 +220,7 @@ func TestJobRunsController_Update_BadInput(t *testing.T) { resp := cltest.BasicAuthPatch(url, "application/json", bytes.NewBufferString(body)) assert.Equal(t, 500, resp.StatusCode, "Response should be successful") assert.Nil(t, app.Store.One("ID", jr.ID, &jr)) - assert.Equal(t, models.StatusPending, jr.Status) + assert.Equal(t, models.RunStatusPendingExternal, jr.Status) } func TestJobRunsController_Update_NotFound(t *testing.T) { @@ -233,7 +233,7 @@ func TestJobRunsController_Update_NotFound(t *testing.T) { j, initr := cltest.NewJobWithWebInitiator() j.Tasks = []models.TaskSpec{cltest.NewTask(bt.Name)} assert.Nil(t, app.Store.Save(&j)) - jr := cltest.MarkJobRunPending(j.NewRun(initr), 0) + jr := cltest.MarkJobRunPendingExternal(j.NewRun(initr), 0) assert.Nil(t, app.Store.Save(&jr)) url := app.Server.URL + "/v2/runs/" + jr.ID + "1" @@ -241,5 +241,5 @@ func TestJobRunsController_Update_NotFound(t *testing.T) { resp := cltest.BasicAuthPatch(url, "application/json", bytes.NewBufferString(body)) assert.Equal(t, 404, resp.StatusCode, "Response should be successful") assert.Nil(t, app.Store.One("ID", jr.ID, &jr)) - assert.Equal(t, models.StatusPending, jr.Status) + assert.Equal(t, models.RunStatusPendingExternal, jr.Status) }