Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks block until TaskMinConfirmations have been met #184

Merged
merged 12 commits into from
Mar 23, 2018
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ You can configure your node's behavior by setting environment variables which ca
ETH_URL Default: ws://localhost:8546
ETH_CHAIN_ID Default: 0
ETH_GAS_BUMP_THRESHOLD Default: 12
ETH_MIN_CONFIRMATIONS Default: 12
TX_MIN_CONFIRMATIONS Default: 12
TASK_MIN_CONFIRMATIONS Default: 6
ETH_GAS_BUMP_WEI Default: 5000000000 (5 gwei)
ETH_GAS_PRICE_DEFAULT Default: 20000000000 (20 gwei)

Expand Down
50 changes: 31 additions & 19 deletions adapters/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,57 +24,69 @@ type Bridge struct {
// If the Perform is resumed with a pending RunResult, the RunResult is marked
// not pending and the RunResult is returned.
func (ba *Bridge) Perform(input models.RunResult, _ *store.Store) models.RunResult {
if input.Pending {
if input.Status.Errored() {
return input
} else if input.Status.PendingExternal() {
return markNotPending(input)
}
return ba.handleNewRun(input)
}

func markNotPending(input models.RunResult) models.RunResult {
input.Pending = false
input.Status = models.RunStatusInProgress
return input
}

func (ba *Bridge) handleNewRun(input models.RunResult) models.RunResult {
in, err := json.Marshal(&bridgePayload{input})
b, err := postToExternalAdapter(ba.URL.String(), input)
if err != nil {
return baRunResultError(input, "marshaling request body", err)
return baRunResultError(input, "post to external adapter", err)
}

resp, err := http.Post(ba.URL.String(), "application/json", bytes.NewBuffer(in))
var brr models.BridgeRunResult
err = json.Unmarshal(b, &brr)
if err != nil {
return baRunResultError(input, "POST request", err)
return baRunResultError(input, "unmarshaling JSON", err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
b, _ := ioutil.ReadAll(resp.Body)
err = fmt.Errorf("%v %v", resp.StatusCode, string(b))
return baRunResultError(input, "POST response", err)
rr, err := input.Merge(brr.RunResult)
if err != nil {
return baRunResultError(rr, "Unable to merge received payload", err)
}

b, err := ioutil.ReadAll(resp.Body)
return rr
}

func postToExternalAdapter(url string, input models.RunResult) ([]byte, error) {
in, err := json.Marshal(&bridgeOutgoing{input})
if err != nil {
return baRunResultError(input, "reading response body", err)
return nil, fmt.Errorf("marshaling request body: %v", err)
}

rr := models.RunResult{}
err = json.Unmarshal(b, &rr)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(in))
if err != nil {
return baRunResultError(input, "unmarshaling JSON", err)
return nil, fmt.Errorf("POST request: %v", err)
}
return rr
defer resp.Body.Close()

if resp.StatusCode >= 400 {
b, _ := ioutil.ReadAll(resp.Body)
err = fmt.Errorf("%v %v", resp.StatusCode, string(b))
return nil, fmt.Errorf("POST response: %v", err)
}

return ioutil.ReadAll(resp.Body)
}

func baRunResultError(in models.RunResult, str string, err error) models.RunResult {
return in.WithError(fmt.Errorf("ExternalBridge %v: %v", str, err))
}

type bridgePayload struct {
type bridgeOutgoing struct {
models.RunResult
}

func (bp bridgePayload) MarshalJSON() ([]byte, error) {
func (bp bridgeOutgoing) MarshalJSON() ([]byte, error) {
anon := struct {
JobRunID string `json:"id"`
Data models.JSON `json:"data"`
Expand Down
48 changes: 24 additions & 24 deletions adapters/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,24 @@ import (
"github.com/smartcontractkit/chainlink/store/models"
"github.com/smartcontractkit/chainlink/utils"
"github.com/stretchr/testify/assert"
null "gopkg.in/guregu/null.v3"
)

func TestBridge_Perform_FromUnstarted(t *testing.T) {
func TestBridge_Perform_fromUnstarted(t *testing.T) {
cases := []struct {
name string
status int
want string
wantExists bool
wantErrored bool
wantPending bool
response string
}{
{"success", 200, "purchased", true, false, false, `{"data":{"value": "purchased"}}`},
{"run error", 200, "", false, true, false, `{"error": "overload", "data": {}}`},
{"server error", 400, "lot 49", true, true, false, `bad request`},
{"server error", 500, "lot 49", true, true, false, `big error`},
{"JSON parse error", 200, "lot 49", true, true, false, `}`},
{"pending response", 200, "", false, false, true, `{"pending":true}`},
{"success", 200, "purchased", false, false, `{"data":{"value": "purchased"}}`},
{"run error", 200, "lot 49", true, false, `{"error": "overload", "data": {}}`},
{"server error", 400, "lot 49", true, false, `bad request`},
{"server error", 500, "lot 49", true, false, `big error`},
{"JSON parse error", 200, "lot 49", true, false, `}`},
{"pending response", 200, "lot 49", false, true, `{"pending":true}`},
{"unsetting value", 200, "", false, false, `{"data":{"value":null}}`},
}

store, cleanup := cltest.NewStore()
Expand All @@ -53,22 +52,23 @@ func TestBridge_Perform_FromUnstarted(t *testing.T) {
result = eb.Perform(result, store)
val, _ := result.Get("value")
assert.Equal(t, test.want, val.String())
assert.Equal(t, test.wantExists, val.Exists())
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, test.wantPending, result.Pending)
assert.Equal(t, test.wantPending, result.Status.PendingExternal())
})
}
}

func TestBridge_Perform_FromPending(t *testing.T) {
func TestBridge_Perform_resuming(t *testing.T) {
t.Parallel()
cases := []struct {
name string
input string
errorMessage null.String
want string
name string
input string
status models.RunStatus
want string
wantStatus models.RunStatus
}{
{"basic", `{"value":"100","old":"remains"}`, cltest.NullString(nil), `{"value":"100","old":"remains"}`},
{"with error", `{"value":"100","old":"remains"}`, cltest.NullString("Big error!"), `{"value":"100","old":"remains"}`},
{"from pending", `{"value":"100","old":"remains"}`, models.RunStatusPendingExternal, `{"value":"100","old":"remains"}`, models.RunStatusInProgress},
{"from errored", `{"value":"100","old":"remains"}`, models.RunStatusErrored, `{"value":"100","old":"remains"}`, models.RunStatusErrored},
}

store, cleanup := cltest.NewStore()
Expand All @@ -79,18 +79,18 @@ func TestBridge_Perform_FromPending(t *testing.T) {

for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
input := models.RunResult{
Data: cltest.JSONFromString(test.input),
ErrorMessage: test.errorMessage,
Pending: true,
Data: cltest.JSONFromString(test.input),
Status: test.status,
}

result := ba.Perform(input, store)

assert.Equal(t, test.want, result.Data.String())
assert.Equal(t, test.errorMessage, result.ErrorMessage)
assert.Equal(t, false, result.Pending)
assert.Equal(t, test.wantStatus, result.Status)
if test.wantStatus.Errored() {
assert.Equal(t, input, result)
}
})
}
}
4 changes: 2 additions & 2 deletions adapters/eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type EthTx struct {
// is not currently pending. Then it confirms the transaction was confirmed on
// the blockchain.
func (etx *EthTx) Perform(input models.RunResult, store *store.Store) models.RunResult {
if !input.Pending {
if !input.Status.PendingExternal() {
return createTxRunResult(etx, input, store)
} else {
return ensureTxRunResult(input, store)
Expand Down Expand Up @@ -67,7 +67,7 @@ func ensureTxRunResult(input models.RunResult, store *store.Store) models.RunRes
if err != nil {
return input.WithError(err)
} else if !confirmed {
return input.MarkPending()
return input.MarkPendingExternal()
}
return input.WithValue(hash.String())
}
16 changes: 8 additions & 8 deletions adapters/eth_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) {
hash := cltest.NewHash()
sentAt := uint64(23456)
confirmed := sentAt + 1
safe := confirmed + config.EthMinConfirmations
safe := confirmed + config.TxMinConfirmations
ethMock.Register("eth_sendRawTransaction", hash,
func(_ interface{}, data ...interface{}) error {
rlp := data[0].([]interface{})[0].(string)
Expand Down Expand Up @@ -87,12 +87,12 @@ func TestEthTxAdapter_Perform_FromPending(t *testing.T) {
assert.Nil(t, err)
adapter := adapters.EthTx{}
sentResult := cltest.RunResultWithValue(a.Hash.String())
input := sentResult.MarkPending()
input := sentResult.MarkPendingExternal()

output := adapter.Perform(input, store)

assert.False(t, output.HasError())
assert.True(t, output.Pending)
assert.True(t, output.Status.PendingExternal())
assert.Nil(t, store.One("ID", tx.ID, tx))
attempts, _ := store.AttemptsFor(tx.ID)
assert.Equal(t, 1, len(attempts))
Expand Down Expand Up @@ -121,12 +121,12 @@ func TestEthTxAdapter_Perform_FromPendingBumpGas(t *testing.T) {
assert.Nil(t, err)
adapter := adapters.EthTx{}
sentResult := cltest.RunResultWithValue(a.Hash.String())
input := sentResult.MarkPending()
input := sentResult.MarkPendingExternal()

output := adapter.Perform(input, store)

assert.False(t, output.HasError())
assert.True(t, output.Pending)
assert.True(t, output.Status.PendingExternal())
assert.Nil(t, store.One("ID", tx.ID, tx))
attempts, _ := store.AttemptsFor(tx.ID)
assert.Equal(t, 2, len(attempts))
Expand All @@ -150,7 +150,7 @@ func TestEthTxAdapter_Perform_FromPendingConfirm(t *testing.T) {
Hash: cltest.NewHash(),
BlockNumber: cltest.BigHexInt(sentAt),
})
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.EthMinConfirmations))
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.TxMinConfirmations))

tx := cltest.NewTx(cltest.NewAddress(), sentAt)
assert.Nil(t, store.Save(tx))
Expand All @@ -159,13 +159,13 @@ func TestEthTxAdapter_Perform_FromPendingConfirm(t *testing.T) {
a3, _ := store.AddAttempt(tx, tx.EthTx(big.NewInt(3)), sentAt+2)
adapter := adapters.EthTx{}
sentResult := cltest.RunResultWithValue(a3.Hash.String())
input := sentResult.MarkPending()
input := sentResult.MarkPendingExternal()

assert.False(t, tx.Confirmed)

output := adapter.Perform(input, store)

assert.False(t, output.Pending)
assert.False(t, output.Status.PendingExternal())
assert.False(t, output.HasError())

assert.Nil(t, store.One("ID", tx.ID, tx))
Expand Down
4 changes: 2 additions & 2 deletions adapters/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestHttpGet_Perform(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, test.want, val)
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, false, result.Pending)
assert.Equal(t, false, result.Status.PendingExternal())
})
}
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestHttpPost_Perform(t *testing.T) {
assert.Equal(t, test.want, val.String())
assert.Equal(t, true, val.Exists())
assert.Equal(t, test.wantErrored, result.HasError())
assert.Equal(t, false, result.Pending)
assert.Equal(t, false, result.Status.PendingExternal())
})
}
}
4 changes: 2 additions & 2 deletions adapters/no_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (noa *NoOp) Perform(input models.RunResult, _ *store.Store) models.RunResul
type NoOpPend struct{}

// Perform on this adapter type returns an empty RunResult with an
// added field for the status to indicate the task is Pending
// added field for the status to indicate the task is Pending.
func (noa *NoOpPend) Perform(input models.RunResult, _ *store.Store) models.RunResult {
return input.MarkPending()
return input.MarkPendingExternal()
}
2 changes: 1 addition & 1 deletion cmd/renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (rt RendererTable) renderJobRuns(j presenters.JobSpec) error {
for _, jr := range j.Runs {
table.Append([]string{
jr.ID,
jr.Status,
string(jr.Status),
utils.ISO8601UTC(jr.CreatedAt),
utils.NullISO8601UTC(jr.CompletedAt),
jr.Result.Data.String(),
Expand Down
2 changes: 1 addition & 1 deletion internal/bin/cldev
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export LOG_LEVEL=debug
export ROOT=./internal/devnet
export ETH_URL=ws://localhost:18546
export ETH_CHAIN_ID=17
export ETH_MIN_CONFIRMATIONS=2
export TX_MIN_CONFIRMATIONS=2

LDFLAGS="-X github.com/smartcontractkit/chainlink/store.Sha=`git rev-parse HEAD`"

Expand Down
Loading