diff --git a/.gitignore b/.gitignore index e4760a698c..c505323c66 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ /services/horizon/captive-core /services/horizon/horizon /services/horizon/stellar-horizon +/bucket-cache .vscode .idea debug diff --git a/clients/horizonclient/client.go b/clients/horizonclient/client.go index 0b51237bdd..0c580de771 100644 --- a/clients/horizonclient/client.go +++ b/clients/horizonclient/client.go @@ -440,6 +440,44 @@ func (c *Client) OperationDetail(id string) (ops operations.Operation, err error return ops, nil } +// validateFeeBumpTx checks if the inner transaction has a memo or not and converts the transaction object to +// base64 string. +func (c *Client) validateFeeBumpTx(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (string, error) { + var err error + if inner := transaction.InnerTransaction(); !opts.SkipMemoRequiredCheck && inner.Memo() == nil { + err = c.checkMemoRequired(inner) + if err != nil { + return "", err + } + } + + txeBase64, err := transaction.Base64() + if err != nil { + err = errors.Wrap(err, "Unable to convert transaction object to base64 string") + return "", err + } + return txeBase64, nil +} + +// validateTx checks if the transaction has a memo or not and converts the transaction object to +// base64 string. +func (c *Client) validateTx(transaction *txnbuild.Transaction, opts SubmitTxOpts) (string, error) { + var err error + if !opts.SkipMemoRequiredCheck && transaction.Memo() == nil { + err = c.checkMemoRequired(transaction) + if err != nil { + return "", err + } + } + + txeBase64, err := transaction.Base64() + if err != nil { + err = errors.Wrap(err, "Unable to convert transaction object to base64 string") + return "", err + } + return txeBase64, nil +} + // SubmitTransactionXDR submits a transaction represented as a base64 XDR string to the network. err can be either error object or horizon.Error object. // See https://developers.stellar.org/api/resources/transactions/post/ func (c *Client) SubmitTransactionXDR(transactionXdr string) (tx hProtocol.Transaction, @@ -469,16 +507,8 @@ func (c *Client) SubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransacti func (c *Client) SubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (tx hProtocol.Transaction, err error) { // only check if memo is required if skip is false and the inner transaction // doesn't have a memo. - if inner := transaction.InnerTransaction(); !opts.SkipMemoRequiredCheck && inner.Memo() == nil { - err = c.checkMemoRequired(inner) - if err != nil { - return - } - } - - txeBase64, err := transaction.Base64() + txeBase64, err := c.validateFeeBumpTx(transaction, opts) if err != nil { - err = errors.Wrap(err, "Unable to convert transaction object to base64 string") return } @@ -505,20 +535,68 @@ func (c *Client) SubmitTransaction(transaction *txnbuild.Transaction) (tx hProto func (c *Client) SubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (tx hProtocol.Transaction, err error) { // only check if memo is required if skip is false and the transaction // doesn't have a memo. - if !opts.SkipMemoRequiredCheck && transaction.Memo() == nil { - err = c.checkMemoRequired(transaction) - if err != nil { - return - } + txeBase64, err := c.validateTx(transaction, opts) + if err != nil { + return } - txeBase64, err := transaction.Base64() + return c.SubmitTransactionXDR(txeBase64) +} + +// AsyncSubmitTransactionXDR submits a base64 XDR transaction using the transactions_async endpoint. err can be either error object or horizon.Error object. +func (c *Client) AsyncSubmitTransactionXDR(transactionXdr string) (txResp hProtocol.AsyncTransactionSubmissionResponse, + err error) { + request := submitRequest{endpoint: "transactions_async", transactionXdr: transactionXdr} + err = c.sendRequest(request, &txResp) + return +} + +// AsyncSubmitFeeBumpTransaction submits an async fee bump transaction to the network. err can be either an +// error object or a horizon.Error object. +// +// This function will always check if the destination account requires a memo in the transaction as +// defined in SEP0029: https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0029.md +// +// If you want to skip this check, use SubmitTransactionWithOptions. +func (c *Client) AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) { + return c.AsyncSubmitFeeBumpTransactionWithOptions(transaction, SubmitTxOpts{}) +} + +// AsyncSubmitFeeBumpTransactionWithOptions submits an async fee bump transaction to the network, allowing +// you to pass SubmitTxOpts. err can be either an error object or a horizon.Error object. +func (c *Client) AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) { + // only check if memo is required if skip is false and the inner transaction + // doesn't have a memo. + txeBase64, err := c.validateFeeBumpTx(transaction, opts) if err != nil { - err = errors.Wrap(err, "Unable to convert transaction object to base64 string") return } - return c.SubmitTransactionXDR(txeBase64) + return c.AsyncSubmitTransactionXDR(txeBase64) +} + +// AsyncSubmitTransaction submits an async transaction to the network. err can be either an +// error object or a horizon.Error object. +// +// This function will always check if the destination account requires a memo in the transaction as +// defined in SEP0029: https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0029.md +// +// If you want to skip this check, use SubmitTransactionWithOptions. +func (c *Client) AsyncSubmitTransaction(transaction *txnbuild.Transaction) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) { + return c.AsyncSubmitTransactionWithOptions(transaction, SubmitTxOpts{}) +} + +// AsyncSubmitTransactionWithOptions submits an async transaction to the network, allowing +// you to pass SubmitTxOpts. err can be either an error object or a horizon.Error object. +func (c *Client) AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) { + // only check if memo is required if skip is false and the transaction + // doesn't have a memo. + txeBase64, err := c.validateTx(transaction, opts) + if err != nil { + return + } + + return c.AsyncSubmitTransactionXDR(txeBase64) } // Transactions returns stellar transactions (https://developers.stellar.org/api/resources/transactions/list/) diff --git a/clients/horizonclient/internal.go b/clients/horizonclient/internal.go index 123788caa9..9dc6052263 100644 --- a/clients/horizonclient/internal.go +++ b/clients/horizonclient/internal.go @@ -27,7 +27,10 @@ func decodeResponse(resp *http.Response, object interface{}, horizonUrl string, } setCurrentServerTime(u.Hostname(), resp.Header["Date"], clock) - if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { + // While this part of code assumes that any error < 200 or error >= 300 is a Horizon problem, it is not + // true for the response from /transactions_async endpoint which does give these codes for certain responses + // from core. + if !(resp.StatusCode >= 200 && resp.StatusCode < 300) && (resp.Request == nil || resp.Request.URL == nil || resp.Request.URL.Path != "/transactions_async") { horizonError := &Error{ Response: resp, } diff --git a/clients/horizonclient/main.go b/clients/horizonclient/main.go index c7fa40f03c..a7bf923ac9 100644 --- a/clients/horizonclient/main.go +++ b/clients/horizonclient/main.go @@ -194,6 +194,11 @@ type ClientInterface interface { SubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.Transaction, error) SubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.Transaction, error) SubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.Transaction, error) + AsyncSubmitTransactionXDR(transactionXdr string) (hProtocol.AsyncTransactionSubmissionResponse, error) + AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) + AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) + AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.AsyncTransactionSubmissionResponse, error) + AsyncSubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.AsyncTransactionSubmissionResponse, error) Transactions(request TransactionRequest) (hProtocol.TransactionsPage, error) TransactionDetail(txHash string) (hProtocol.Transaction, error) OrderBook(request OrderBookRequest) (hProtocol.OrderBookSummary, error) diff --git a/clients/horizonclient/mocks.go b/clients/horizonclient/mocks.go index fbf6fe5b66..92c766dd54 100644 --- a/clients/horizonclient/mocks.go +++ b/clients/horizonclient/mocks.go @@ -121,6 +121,36 @@ func (m *MockClient) SubmitTransactionWithOptions(transaction *txnbuild.Transact return a.Get(0).(hProtocol.Transaction), a.Error(1) } +// AsyncSubmitTransactionXDR is a mocking method +func (m *MockClient) AsyncSubmitTransactionXDR(transactionXdr string) (hProtocol.AsyncTransactionSubmissionResponse, error) { + a := m.Called(transactionXdr) + return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1) +} + +// AsyncSubmitFeeBumpTransaction is a mocking method +func (m *MockClient) AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.AsyncTransactionSubmissionResponse, error) { + a := m.Called(transaction) + return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1) +} + +// AsyncSubmitTransaction is a mocking method +func (m *MockClient) AsyncSubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.AsyncTransactionSubmissionResponse, error) { + a := m.Called(transaction) + return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1) +} + +// AsyncSubmitFeeBumpTransactionWithOptions is a mocking method +func (m *MockClient) AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) { + a := m.Called(transaction, opts) + return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1) +} + +// AsyncSubmitTransactionWithOptions is a mocking method +func (m *MockClient) AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) { + a := m.Called(transaction, opts) + return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1) +} + // Transactions is a mocking method func (m *MockClient) Transactions(request TransactionRequest) (hProtocol.TransactionsPage, error) { a := m.Called(request) diff --git a/clients/stellarcore/metrics_client.go b/clients/stellarcore/metrics_client.go new file mode 100644 index 0000000000..2353905af7 --- /dev/null +++ b/clients/stellarcore/metrics_client.go @@ -0,0 +1,70 @@ +package stellarcore + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + + proto "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/xdr" +) + +var envelopeTypeToLabel = map[xdr.EnvelopeType]string{ + xdr.EnvelopeTypeEnvelopeTypeTxV0: "v0", + xdr.EnvelopeTypeEnvelopeTypeTx: "v1", + xdr.EnvelopeTypeEnvelopeTypeTxFeeBump: "fee_bump", +} + +type ClientWithMetrics struct { + coreClient Client + + // submissionDuration exposes timing metrics about the rate and latency of + // submissions to stellar-core + submissionDuration *prometheus.SummaryVec +} + +func (c ClientWithMetrics) SubmitTx(ctx context.Context, rawTx string) (*proto.TXResponse, error) { + var envelope xdr.TransactionEnvelope + err := xdr.SafeUnmarshalBase64(rawTx, &envelope) + if err != nil { + return &proto.TXResponse{}, err + } + + startTime := time.Now() + response, err := c.coreClient.SubmitTransaction(ctx, rawTx) + duration := time.Since(startTime).Seconds() + + label := prometheus.Labels{} + if err != nil { + label["status"] = "request_error" + } else if response.IsException() { + label["status"] = "exception" + } else { + label["status"] = response.Status + } + + label["envelope_type"] = envelopeTypeToLabel[envelope.Type] + c.submissionDuration.With(label).Observe(duration) + + return response, err +} + +func NewClientWithMetrics(client Client, registry *prometheus.Registry, prometheusSubsystem string) ClientWithMetrics { + submissionDuration := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", + Subsystem: prometheusSubsystem, + Name: "submission_duration_seconds", + Help: "submission durations to Stellar-Core, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"status", "envelope_type"}) + + registry.MustRegister( + submissionDuration, + ) + + return ClientWithMetrics{ + coreClient: client, + submissionDuration: submissionDuration, + } +} diff --git a/protocols/horizon/main.go b/protocols/horizon/main.go index bdec98ba0b..dd9cc7814f 100644 --- a/protocols/horizon/main.go +++ b/protocols/horizon/main.go @@ -8,10 +8,12 @@ import ( "fmt" "math" "math/big" + "net/http" "strconv" "time" "github.com/stellar/go/protocols/horizon/base" + proto "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/strkey" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/render/hal" @@ -29,6 +31,13 @@ var KeyTypeNames = map[strkey.VersionByte]string{ strkey.VersionByteSignedPayload: "ed25519_signed_payload", } +var coreStatusToHTTPStatus = map[string]int{ + proto.TXStatusPending: http.StatusCreated, + proto.TXStatusDuplicate: http.StatusConflict, + proto.TXStatusTryAgainLater: http.StatusServiceUnavailable, + proto.TXStatusError: http.StatusBadRequest, +} + // Account is the summary of an account type Account struct { Links struct { @@ -567,6 +576,26 @@ type InnerTransaction struct { MaxFee int64 `json:"max_fee,string"` } +// AsyncTransactionSubmissionResponse represents the response returned by Horizon +// when using the transaction-async endpoint. +type AsyncTransactionSubmissionResponse struct { + // ErrorResultXDR is present only if Status is equal to proto.TXStatusError. + // ErrorResultXDR is a TransactionResult xdr string which contains details on why + // the transaction could not be accepted by stellar-core. + ErrorResultXDR string `json:"errorResultXdr,omitempty"` + // TxStatus represents the status of the transaction submission returned by stellar-core. + // It can be one of: proto.TXStatusPending, proto.TXStatusDuplicate, + // proto.TXStatusTryAgainLater, or proto.TXStatusError. + TxStatus string `json:"tx_status"` + // Hash is a hash of the transaction which can be used to look up whether + // the transaction was included in the ledger. + Hash string `json:"hash"` +} + +func (response AsyncTransactionSubmissionResponse) GetStatus() int { + return coreStatusToHTTPStatus[response.TxStatus] +} + // MarshalJSON implements a custom marshaler for Transaction. // The memo field should be omitted if and only if the // memo_type is "none". diff --git a/services/horizon/internal/actions/submit_transaction.go b/services/horizon/internal/actions/submit_transaction.go index 314caf32a5..00049a9172 100644 --- a/services/horizon/internal/actions/submit_transaction.go +++ b/services/horizon/internal/actions/submit_transaction.go @@ -7,12 +7,13 @@ import ( "net/http" "github.com/stellar/go/network" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/protocols/stellarcore" hProblem "github.com/stellar/go/services/horizon/internal/render/problem" "github.com/stellar/go/services/horizon/internal/resourceadapter" "github.com/stellar/go/services/horizon/internal/txsub" - "github.com/stellar/go/support/errors" "github.com/stellar/go/support/render/hal" "github.com/stellar/go/support/render/problem" "github.com/stellar/go/xdr" @@ -37,7 +38,7 @@ type envelopeInfo struct { parsed xdr.TransactionEnvelope } -func (handler SubmitTransactionHandler) extractEnvelopeInfo(raw string, passphrase string) (envelopeInfo, error) { +func extractEnvelopeInfo(raw string, passphrase string) (envelopeInfo, error) { result := envelopeInfo{raw: raw} err := xdr.SafeUnmarshalBase64(raw, &result.parsed) if err != nil { @@ -60,7 +61,7 @@ func (handler SubmitTransactionHandler) extractEnvelopeInfo(raw string, passphra return result, nil } -func (handler SubmitTransactionHandler) validateBodyType(r *http.Request) error { +func validateBodyType(r *http.Request) error { c := r.Header.Get("Content-Type") if c == "" { return nil @@ -139,7 +140,7 @@ func (handler SubmitTransactionHandler) response(r *http.Request, info envelopeI } func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Request) (interface{}, error) { - if err := handler.validateBodyType(r); err != nil { + if err := validateBodyType(r); err != nil { return nil, err } @@ -147,7 +148,7 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ return nil, &problem.P{ Type: "transaction_submission_disabled", Title: "Transaction Submission Disabled", - Status: http.StatusMethodNotAllowed, + Status: http.StatusForbidden, Detail: "Transaction submission has been disabled for Horizon. " + "To enable it again, remove env variable DISABLE_TX_SUB.", Extras: map[string]interface{}{}, @@ -159,7 +160,7 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ return nil, err } - info, err := handler.extractEnvelopeInfo(raw, handler.NetworkPassphrase) + info, err := extractEnvelopeInfo(raw, handler.NetworkPassphrase) if err != nil { return nil, &problem.P{ Type: "transaction_malformed", diff --git a/services/horizon/internal/actions/submit_transaction_async.go b/services/horizon/internal/actions/submit_transaction_async.go new file mode 100644 index 0000000000..0cce31b5f6 --- /dev/null +++ b/services/horizon/internal/actions/submit_transaction_async.go @@ -0,0 +1,138 @@ +package actions + +import ( + "context" + "net/http" + + "github.com/stellar/go/protocols/horizon" + proto "github.com/stellar/go/protocols/stellarcore" + hProblem "github.com/stellar/go/services/horizon/internal/render/problem" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/render/problem" +) + +type coreClient interface { + SubmitTx(ctx context.Context, rawTx string) (resp *proto.TXResponse, err error) +} + +type AsyncSubmitTransactionHandler struct { + NetworkPassphrase string + DisableTxSub bool + ClientWithMetrics coreClient + CoreStateGetter +} + +func (handler AsyncSubmitTransactionHandler) GetResource(_ HeaderWriter, r *http.Request) (interface{}, error) { + // TODO: Move the problem responses to a separate file as constants or a function. + logger := log.Ctx(r.Context()) + + if err := validateBodyType(r); err != nil { + return nil, err + } + + raw, err := getString(r, "tx") + if err != nil { + return nil, err + } + + if handler.DisableTxSub { + return nil, &problem.P{ + Type: "transaction_submission_disabled", + Title: "Transaction Submission Disabled", + Status: http.StatusForbidden, + Detail: "Transaction submission has been disabled for Horizon. " + + "To enable it again, remove env variable DISABLE_TX_SUB.", + Extras: map[string]interface{}{ + "envelope_xdr": raw, + }, + } + } + + info, err := extractEnvelopeInfo(raw, handler.NetworkPassphrase) + if err != nil { + return nil, &problem.P{ + Type: "transaction_malformed", + Title: "Transaction Malformed", + Status: http.StatusBadRequest, + Detail: "Horizon could not decode the transaction envelope in this " + + "request. A transaction should be an XDR TransactionEnvelope struct " + + "encoded using base64. The envelope read from this request is " + + "echoed in the `extras.envelope_xdr` field of this response for your " + + "convenience.", + Extras: map[string]interface{}{ + "envelope_xdr": raw, + "error": err, + }, + } + } + + coreState := handler.GetCoreState() + if !coreState.Synced { + return nil, hProblem.StaleHistory + } + + resp, err := handler.ClientWithMetrics.SubmitTx(r.Context(), raw) + if err != nil { + return nil, &problem.P{ + Type: "transaction_submission_failed", + Title: "Transaction Submission Failed", + Status: http.StatusInternalServerError, + Detail: "Could not submit transaction to stellar-core. " + + "The `extras.error` field on this response contains further " + + "details. Descriptions of each code can be found at: " + + "https://developers.stellar.org/api/errors/http-status-codes/horizon-specific/transaction-submission-async/transaction_submission_failed", + Extras: map[string]interface{}{ + "envelope_xdr": raw, + "error": err, + }, + } + } + + if resp.IsException() { + logger.WithField("envelope_xdr", raw).WithError(errors.Errorf(resp.Exception)).Error("Transaction submission exception from stellar-core") + return nil, &problem.P{ + Type: "transaction_submission_exception", + Title: "Transaction Submission Exception", + Status: http.StatusInternalServerError, + Detail: "Received exception from stellar-core." + + "The `extras.error` field on this response contains further " + + "details. Descriptions of each code can be found at: " + + "https://developers.stellar.org/api/errors/http-status-codes/horizon-specific/transaction-submission-async/transaction_submission_exception", + Extras: map[string]interface{}{ + "envelope_xdr": raw, + "error": resp.Exception, + }, + } + } + + switch resp.Status { + case proto.TXStatusError, proto.TXStatusPending, proto.TXStatusDuplicate, proto.TXStatusTryAgainLater: + response := horizon.AsyncTransactionSubmissionResponse{ + TxStatus: resp.Status, + Hash: info.hash, + } + + if resp.Status == proto.TXStatusError { + response.ErrorResultXDR = resp.Error + } + + return response, nil + default: + logger.WithField("envelope_xdr", raw).WithError(errors.Errorf(resp.Error)).Error("Received invalid submission status from stellar-core") + return nil, &problem.P{ + Type: "transaction_submission_invalid_status", + Title: "Transaction Submission Invalid Status", + Status: http.StatusInternalServerError, + Detail: "Received invalid status from stellar-core." + + "The `extras.error` field on this response contains further " + + "details. Descriptions of each code can be found at: " + + "https://developers.stellar.org/api/errors/http-status-codes/horizon-specific/transaction-submission-async/transaction_submission_invalid_status", + Extras: map[string]interface{}{ + "envelope_xdr": raw, + "error": resp.Error, + }, + } + } + +} diff --git a/services/horizon/internal/actions/submit_transaction_async_test.go b/services/horizon/internal/actions/submit_transaction_async_test.go new file mode 100644 index 0000000000..258012bc3c --- /dev/null +++ b/services/horizon/internal/actions/submit_transaction_async_test.go @@ -0,0 +1,218 @@ +package actions + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + + "github.com/stellar/go/protocols/horizon" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/network" + proto "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/services/horizon/internal/corestate" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/render/problem" +) + +const ( + TxXDR = "AAAAAAGUcmKO5465JxTSLQOQljwk2SfqAJmZSG6JH6wtqpwhAAABLAAAAAAAAAABAAAAAAAAAAEAAAALaGVsbG8gd29ybGQAAAAAAwAAAAAAAAAAAAAAABbxCy3mLg3hiTqX4VUEEp60pFOrJNxYM1JtxXTwXhY2AAAAAAvrwgAAAAAAAAAAAQAAAAAW8Qst5i4N4Yk6l+FVBBKetKRTqyTcWDNSbcV08F4WNgAAAAAN4Lazj4x61AAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABLaqcIQAAAEBKwqWy3TaOxoGnfm9eUjfTRBvPf34dvDA0Nf+B8z4zBob90UXtuCqmQqwMCyH+okOI3c05br3khkH0yP4kCwcE" + TxHash = "3389e9f0f1a65f19736cacf544c2e825313e8447f569233bb8db39aa607c8889" +) + +type MockClientWithMetrics struct { + mock.Mock +} + +// SubmitTx mocks the SubmitTransaction method +func (m *MockClientWithMetrics) SubmitTx(ctx context.Context, rawTx string) (*proto.TXResponse, error) { + args := m.Called(ctx, rawTx) + return args.Get(0).(*proto.TXResponse), args.Error(1) +} + +func createRequest() *http.Request { + form := url.Values{} + form.Set("tx", TxXDR) + + request, _ := http.NewRequest( + "POST", + "http://localhost:8000/transactions_async", + strings.NewReader(form.Encode()), + ) + request.Header.Add("Content-Type", "application/x-www-form-urlencoded") + return request +} + +func TestAsyncSubmitTransactionHandler_DisabledTxSub(t *testing.T) { + handler := AsyncSubmitTransactionHandler{ + DisableTxSub: true, + } + + request := createRequest() + w := httptest.NewRecorder() + + _, err := handler.GetResource(w, request) + assert.NotNil(t, err) + assert.IsType(t, &problem.P{}, err) + p := err.(*problem.P) + assert.Equal(t, "transaction_submission_disabled", p.Type) + assert.Equal(t, http.StatusForbidden, p.Status) +} + +func TestAsyncSubmitTransactionHandler_MalformedTransaction(t *testing.T) { + handler := AsyncSubmitTransactionHandler{} + + request := createRequest() + w := httptest.NewRecorder() + + _, err := handler.GetResource(w, request) + assert.NotNil(t, err) + assert.IsType(t, &problem.P{}, err) + p := err.(*problem.P) + assert.Equal(t, "transaction_malformed", p.Type) + assert.Equal(t, http.StatusBadRequest, p.Status) +} + +func TestAsyncSubmitTransactionHandler_CoreNotSynced(t *testing.T) { + coreStateGetter := new(coreStateGetterMock) + coreStateGetter.On("GetCoreState").Return(corestate.State{Synced: false}) + handler := AsyncSubmitTransactionHandler{ + CoreStateGetter: coreStateGetter, + NetworkPassphrase: network.PublicNetworkPassphrase, + } + + request := createRequest() + w := httptest.NewRecorder() + + _, err := handler.GetResource(w, request) + assert.NotNil(t, err) + assert.IsType(t, problem.P{}, err) + p := err.(problem.P) + assert.Equal(t, "stale_history", p.Type) + assert.Equal(t, http.StatusServiceUnavailable, p.Status) +} + +func TestAsyncSubmitTransactionHandler_TransactionSubmissionFailed(t *testing.T) { + coreStateGetter := new(coreStateGetterMock) + coreStateGetter.On("GetCoreState").Return(corestate.State{Synced: true}) + + MockClientWithMetrics := &MockClientWithMetrics{} + MockClientWithMetrics.On("SubmitTx", context.Background(), TxXDR).Return(&proto.TXResponse{}, errors.Errorf("submission error")) + + handler := AsyncSubmitTransactionHandler{ + CoreStateGetter: coreStateGetter, + NetworkPassphrase: network.PublicNetworkPassphrase, + ClientWithMetrics: MockClientWithMetrics, + } + + request := createRequest() + w := httptest.NewRecorder() + + _, err := handler.GetResource(w, request) + assert.NotNil(t, err) + assert.IsType(t, &problem.P{}, err) + p := err.(*problem.P) + assert.Equal(t, "transaction_submission_failed", p.Type) + assert.Equal(t, http.StatusInternalServerError, p.Status) +} + +func TestAsyncSubmitTransactionHandler_TransactionSubmissionException(t *testing.T) { + coreStateGetter := new(coreStateGetterMock) + coreStateGetter.On("GetCoreState").Return(corestate.State{Synced: true}) + + MockClientWithMetrics := &MockClientWithMetrics{} + MockClientWithMetrics.On("SubmitTx", context.Background(), TxXDR).Return(&proto.TXResponse{ + Exception: "some-exception", + }, nil) + + handler := AsyncSubmitTransactionHandler{ + CoreStateGetter: coreStateGetter, + NetworkPassphrase: network.PublicNetworkPassphrase, + ClientWithMetrics: MockClientWithMetrics, + } + + request := createRequest() + w := httptest.NewRecorder() + + _, err := handler.GetResource(w, request) + assert.NotNil(t, err) + assert.IsType(t, &problem.P{}, err) + p := err.(*problem.P) + assert.Equal(t, "transaction_submission_exception", p.Type) + assert.Equal(t, http.StatusInternalServerError, p.Status) +} + +func TestAsyncSubmitTransactionHandler_TransactionStatusResponse(t *testing.T) { + coreStateGetter := new(coreStateGetterMock) + coreStateGetter.On("GetCoreState").Return(corestate.State{Synced: true}) + + successCases := []struct { + mockCoreResponse *proto.TXResponse + expectedResponse horizon.AsyncTransactionSubmissionResponse + }{ + { + mockCoreResponse: &proto.TXResponse{ + Exception: "", + Error: "test-error", + Status: proto.TXStatusError, + DiagnosticEvents: "test-diagnostic-events", + }, + expectedResponse: horizon.AsyncTransactionSubmissionResponse{ + ErrorResultXDR: "test-error", + TxStatus: proto.TXStatusError, + Hash: TxHash, + }, + }, + { + mockCoreResponse: &proto.TXResponse{ + Status: proto.TXStatusPending, + }, + expectedResponse: horizon.AsyncTransactionSubmissionResponse{ + TxStatus: proto.TXStatusPending, + Hash: TxHash, + }, + }, + { + mockCoreResponse: &proto.TXResponse{ + Status: proto.TXStatusDuplicate, + }, + expectedResponse: horizon.AsyncTransactionSubmissionResponse{ + TxStatus: proto.TXStatusDuplicate, + Hash: TxHash, + }, + }, + { + mockCoreResponse: &proto.TXResponse{ + Status: proto.TXStatusTryAgainLater, + }, + expectedResponse: horizon.AsyncTransactionSubmissionResponse{ + TxStatus: proto.TXStatusTryAgainLater, + Hash: TxHash, + }, + }, + } + + for _, testCase := range successCases { + MockClientWithMetrics := &MockClientWithMetrics{} + MockClientWithMetrics.On("SubmitTx", context.Background(), TxXDR).Return(testCase.mockCoreResponse, nil) + + handler := AsyncSubmitTransactionHandler{ + NetworkPassphrase: network.PublicNetworkPassphrase, + ClientWithMetrics: MockClientWithMetrics, + CoreStateGetter: coreStateGetter, + } + + request := createRequest() + w := httptest.NewRecorder() + + resp, err := handler.GetResource(w, request) + assert.NoError(t, err) + assert.Equal(t, resp, testCase.expectedResponse) + } +} diff --git a/services/horizon/internal/actions/submit_transaction_test.go b/services/horizon/internal/actions/submit_transaction_test.go index a15ce3bd94..eb1987bdea 100644 --- a/services/horizon/internal/actions/submit_transaction_test.go +++ b/services/horizon/internal/actions/submit_transaction_test.go @@ -178,7 +178,7 @@ func TestDisableTxSubFlagSubmission(t *testing.T) { var p = &problem.P{ Type: "transaction_submission_disabled", Title: "Transaction Submission Disabled", - Status: http.StatusMethodNotAllowed, + Status: http.StatusForbidden, Detail: "Transaction submission has been disabled for Horizon. " + "To enable it again, remove env variable DISABLE_TX_SUB.", Extras: map[string]interface{}{}, diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 36fc2031e8..843232beba 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -554,6 +554,7 @@ func (a *App) init() error { HorizonVersion: a.horizonVersion, FriendbotURL: a.config.FriendbotURL, DisableTxSub: a.config.DisableTxSub, + StellarCoreURL: a.config.StellarCoreURL, HealthCheck: healthCheck{ session: a.historyQ.SessionInterface, ctx: a.ctx, diff --git a/services/horizon/internal/httpx/handler.go b/services/horizon/internal/httpx/handler.go index e17ecb987d..ade5742566 100644 --- a/services/horizon/internal/httpx/handler.go +++ b/services/horizon/internal/httpx/handler.go @@ -25,6 +25,10 @@ type objectAction interface { ) (interface{}, error) } +type HttpResponse interface { + GetStatus() int +} + type ObjectActionHandler struct { Action objectAction } @@ -41,8 +45,13 @@ func (handler ObjectActionHandler) ServeHTTP( return } - httpjson.Render( + statusCode := http.StatusOK + if httpResponse, ok := response.(HttpResponse); ok { + statusCode = httpResponse.GetStatus() + } + httpjson.RenderStatus( w, + statusCode, response, httpjson.HALJSON, ) diff --git a/services/horizon/internal/httpx/router.go b/services/horizon/internal/httpx/router.go index cd3b6821b0..ba5af85b51 100644 --- a/services/horizon/internal/httpx/router.go +++ b/services/horizon/internal/httpx/router.go @@ -8,6 +8,8 @@ import ( "net/url" "time" + "github.com/stellar/go/clients/stellarcore" + "github.com/go-chi/chi" chimiddleware "github.com/go-chi/chi/middleware" "github.com/prometheus/client_golang/prometheus" @@ -52,6 +54,7 @@ type RouterConfig struct { HealthCheck http.Handler DisableTxSub bool SkipTxMeta bool + StellarCoreURL string } type Router struct { @@ -344,6 +347,17 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate SkipTxMeta: config.SkipTxMeta, }}) + // Async Transaction submission API + r.Method(http.MethodPost, "/transactions_async", ObjectActionHandler{actions.AsyncSubmitTransactionHandler{ + NetworkPassphrase: config.NetworkPassphrase, + DisableTxSub: config.DisableTxSub, + CoreStateGetter: config.CoreGetter, + ClientWithMetrics: stellarcore.NewClientWithMetrics(stellarcore.Client{ + HTTP: http.DefaultClient, + URL: config.StellarCoreURL, + }, config.PrometheusRegistry, "async_txsub"), + }}) + // Network state related endpoints r.Method(http.MethodGet, "/fee_stats", ObjectActionHandler{actions.FeeStatsHandler{}}) @@ -371,6 +385,15 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate w.Header().Set("Content-Type", "application/openapi+yaml") w.Write(p) }) + r.Internal.Get("/transactions_async", func(w http.ResponseWriter, r *http.Request) { + p, err := staticFiles.ReadFile("static/txsub_async_oapi.yaml") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/openapi+yaml") + w.Write(p) + }) r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP) r.Internal.Get("/debug/pprof/heap", pprof.Index) r.Internal.Get("/debug/pprof/profile", pprof.Profile) diff --git a/services/horizon/internal/httpx/static/txsub_async_oapi.yaml b/services/horizon/internal/httpx/static/txsub_async_oapi.yaml new file mode 100644 index 0000000000..f889cf4ec8 --- /dev/null +++ b/services/horizon/internal/httpx/static/txsub_async_oapi.yaml @@ -0,0 +1,168 @@ +openapi: 3.0.0 +info: + title: Stellar Horizon Async Transaction Submission + version: "1.0" +paths: + /transactions_async: + post: + summary: Asynchronously submit a transaction to the Stellar network. + tags: + - Transactions + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + tx: + type: string + description: A base64 transaction XDR string. + required: + - tx + responses: + '201': + description: Transaction has been received by core and is in pending status. + content: + application/json: + schema: + $ref: '#/components/schemas/AsyncTransactionSubmissionResponse' + example: + tx_status: "PENDING" + hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165" + + '400': + description: Transaction is malformed; transaction submission exception; transaction submission failed; invalid submission status from core; ERROR status from core. + content: + application/json: + schema: + oneOf: + - $ref: '#/components/schemas/AsyncTransactionSubmissionResponse' + - $ref: '#/components/schemas/Problem' + examples: + TransactionMalformedExample: + summary: Transaction Malformed + value: + type: "transaction_malformed" + title: "Transaction Malformed" + status: 400 + detail: "Horizon could not decode the transaction envelope in this request. A transaction should be an XDR TransactionEnvelope struct encoded using base64. The envelope read from this request is echoed in the `extras.envelope_xdr` field of this response for your convenience." + extras: + envelope_xdr: "" + ErrorStatusExample: + summary: ERROR Status from core + value: + errorResultXdr: "AAAAAAAAAGT////7AAAAAA==" + tx_status: "ERROR" + hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165" + '405': + description: Transaction submission has been disabled for Horizon. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + example: + TransactionSubmissionDisabledExample: + summary: Transaction Submission Disabled + value: + type: "transaction_submission_disabled" + title: "Transaction Submission Disabled" + status: 403 + detail: "Transaction submission has been disabled for Horizon. To enable it again, remove env variable DISABLE_TX_SUB." + extras: + envelope_xdr: "" + '409': + description: Transaction is a duplicate of a previously submitted transaction. + content: + application/json: + schema: + $ref: '#/components/schemas/AsyncTransactionSubmissionResponse' + example: + errorResultXdr: "" + tx_status: "DUPLICATE" + hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165" + '500': + description: Transaction is a duplicate of a previously submitted transaction. + content: + application/json: + schema: + $ref: '#/components/schemas/Problem' + examples: + TransactionFailedExample: + summary: Transaction Submission Failed + value: + type: "transaction_submission_failed" + title: "Transaction Submission Failed" + status: 500 + detail: "Could not submit transaction to stellar-core. The `extras.error` field on this response contains further details. Descriptions of each code can be found at: https://developers.stellar.org/api/errors/http-status-codes/horizon-specific/transaction-submission-async/transaction_submission_failed" + extras: + envelope_xdr: "" + error: "Error details here" + TransactionExceptionExample: + summary: Transaction Submission Exception + value: + type: "transaction_submission_exception" + title: "Transaction Submission Exception" + status: 500 + detail: "Received exception from stellar-core. The `extras.error` field on this response contains further details. Descriptions of each code can be found at: https://developers.stellar.org/api/errors/http-status-codes/horizon-specific/transaction-submission-async/transaction_submission_exception" + extras: + envelope_xdr: "" + error: "Exception details here" + '503': + description: History DB is stale; core is unavailable for transaction submission. + content: + application/json: + schema: + $ref: '#/components/schemas/AsyncTransactionSubmissionResponse' + examples: + HistoryDBStaleExample: + summary: Historical DB Is Too Stale + value: + type: "stale_history" + title: "Historical DB Is Too Stale" + status: 503 + detail: "This horizon instance is configured to reject client requests when it can determine that the history database is lagging too far behind the connected instance of Stellar-Core or read replica. It's also possible that Stellar-Core is out of sync. Please try again later." + extras: + envelope_xdr: "" + TryAgainLaterExample: + summary: TRY_AGAIN_LATER Status from core + value: + tx_status: "TRY_AGAIN_LATER" + hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165" + + +components: + schemas: + AsyncTransactionSubmissionResponse: + type: object + properties: + errorResultXdr: + type: string + nullable: true + description: TransactionResult XDR string which is present only if the submission status from core is an ERROR. + tx_status: + type: string + enum: ["ERROR", "PENDING", "DUPLICATE", "TRY_AGAIN_LATER"] + description: Status of the transaction submission. + hash: + type: string + description: Hash of the transaction. + Problem: + type: object + properties: + type: + type: string + description: Identifies the problem type. + title: + type: string + description: A short, human-readable summary of the problem type. + status: + type: integer + description: The HTTP status code for this occurrence of the problem. + detail: + type: string + description: A human-readable explanation specific to this occurrence of the problem. + extras: + type: object + additionalProperties: true + description: Additional details that might help the client understand the error(s) that occurred. diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 93580fed54..a221c00682 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -223,7 +223,7 @@ func initWebMetrics(app *App) { func initSubmissionSystem(app *App) { app.submitter = &txsub.System{ Pending: txsub.NewDefaultSubmissionList(), - Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL), + Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL, app.prometheusRegistry), DB: func(ctx context.Context) txsub.HorizonDB { return &history.Q{SessionInterface: app.HorizonSession()} }, diff --git a/services/horizon/internal/integration/txsub_async_test.go b/services/horizon/internal/integration/txsub_async_test.go new file mode 100644 index 0000000000..2d16e4d4ea --- /dev/null +++ b/services/horizon/internal/integration/txsub_async_test.go @@ -0,0 +1,146 @@ +package integration + +import ( + "io" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/clients/horizonclient" + "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/services/horizon/internal/test/integration" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/txnbuild" +) + +func getTransaction(client *horizonclient.Client, hash string) error { + for i := 0; i < 60; i++ { + _, err := client.TransactionDetail(hash) + if err != nil { + time.Sleep(time.Second) + continue + } + + return nil + } + return errors.New("transaction not found") +} + +func TestAsyncTxSub_SuccessfulSubmission(t *testing.T) { + itest := integration.NewTest(t, integration.Config{}) + master := itest.Master() + masterAccount := itest.MasterAccount() + + txParams := txnbuild.TransactionParams{ + BaseFee: txnbuild.MinBaseFee, + SourceAccount: masterAccount, + IncrementSequenceNum: true, + Operations: []txnbuild.Operation{ + &txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + }, + }, + Preconditions: txnbuild.Preconditions{ + TimeBounds: txnbuild.NewInfiniteTimeout(), + LedgerBounds: &txnbuild.LedgerBounds{MinLedger: 0, MaxLedger: 100}, + }, + } + + txResp, err := itest.AsyncSubmitTransaction(master, txParams) + assert.NoError(t, err) + assert.Equal(t, txResp, horizon.AsyncTransactionSubmissionResponse{ + TxStatus: "PENDING", + Hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165", + }) + + err = getTransaction(itest.Client(), txResp.Hash) + assert.NoError(t, err) +} + +func TestAsyncTxSub_SubmissionError(t *testing.T) { + itest := integration.NewTest(t, integration.Config{}) + master := itest.Master() + masterAccount := itest.MasterAccount() + + txParams := txnbuild.TransactionParams{ + BaseFee: txnbuild.MinBaseFee, + SourceAccount: masterAccount, + IncrementSequenceNum: false, + Operations: []txnbuild.Operation{ + &txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + }, + }, + Preconditions: txnbuild.Preconditions{ + TimeBounds: txnbuild.NewInfiniteTimeout(), + LedgerBounds: &txnbuild.LedgerBounds{MinLedger: 0, MaxLedger: 100}, + }, + } + + txResp, err := itest.AsyncSubmitTransaction(master, txParams) + assert.NoError(t, err) + assert.Equal(t, txResp, horizon.AsyncTransactionSubmissionResponse{ + ErrorResultXDR: "AAAAAAAAAGT////7AAAAAA==", + TxStatus: "ERROR", + Hash: "0684df00f20efd5876f1b8d17bc6d3a68d8b85c06bb41e448815ecaa6307a251", + }) +} + +func TestAsyncTxSub_SubmissionTryAgainLater(t *testing.T) { + itest := integration.NewTest(t, integration.Config{}) + master := itest.Master() + masterAccount := itest.MasterAccount() + + txParams := txnbuild.TransactionParams{ + BaseFee: txnbuild.MinBaseFee, + SourceAccount: masterAccount, + IncrementSequenceNum: true, + Operations: []txnbuild.Operation{ + &txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + }, + }, + Preconditions: txnbuild.Preconditions{ + TimeBounds: txnbuild.NewInfiniteTimeout(), + LedgerBounds: &txnbuild.LedgerBounds{MinLedger: 0, MaxLedger: 100}, + }, + } + + txResp, err := itest.AsyncSubmitTransaction(master, txParams) + assert.NoError(t, err) + assert.Equal(t, txResp, horizon.AsyncTransactionSubmissionResponse{ + ErrorResultXDR: "", + TxStatus: "PENDING", + Hash: "6cbb7f714bd08cea7c30cab7818a35c510cbbfc0a6aa06172a1e94146ecf0165", + }) + + txResp, err = itest.AsyncSubmitTransaction(master, txParams) + assert.NoError(t, err) + assert.Equal(t, txResp, horizon.AsyncTransactionSubmissionResponse{ + ErrorResultXDR: "", + TxStatus: "TRY_AGAIN_LATER", + Hash: "d5eb72a4c1832b89965850fff0bd9bba4b6ca102e7c89099dcaba5e7d7d2e049", + }) +} + +func TestAsyncTxSub_GetOpenAPISpecResponse(t *testing.T) { + itest := integration.NewTest(t, integration.Config{}) + res, err := http.Get(itest.AsyncTxSubOpenAPISpecURL()) + assert.NoError(t, err) + assert.Equal(t, res.StatusCode, 200) + + bytes, err := io.ReadAll(res.Body) + res.Body.Close() + assert.NoError(t, err) + + openAPISpec := string(bytes) + assert.Contains(t, openAPISpec, "openapi: 3.0.0") +} diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index a661c9edf8..d755d00252 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -911,6 +911,11 @@ func (i *Test) MetricsURL() string { return fmt.Sprintf("http://localhost:%d/metrics", i.AdminPort()) } +// AsyncTxSubOpenAPISpecURL returns the URL for getting the openAPI spec yaml for async-txsub endpoint. +func (i *Test) AsyncTxSubOpenAPISpecURL() string { + return fmt.Sprintf("http://localhost:%d/transactions_async", i.AdminPort()) +} + // Master returns a keypair of the network masterKey account. func (i *Test) Master() *keypair.Full { if i.masterKey != nil { @@ -1146,6 +1151,16 @@ func (i *Test) SubmitMultiSigTransaction( return i.Client().SubmitTransaction(tx) } +func (i *Test) AsyncSubmitTransaction( + signer *keypair.Full, txParams txnbuild.TransactionParams, +) (proto.AsyncTransactionSubmissionResponse, error) { + tx, err := i.CreateSignedTransaction([]*keypair.Full{signer}, txParams) + if err != nil { + return proto.AsyncTransactionSubmissionResponse{}, err + } + return i.Client().AsyncSubmitTransaction(tx) +} + func (i *Test) MustSubmitMultiSigTransaction( signers []*keypair.Full, txParams txnbuild.TransactionParams, ) proto.Transaction { diff --git a/services/horizon/internal/txsub/submitter.go b/services/horizon/internal/txsub/submitter.go index 27ce85c87a..694cc3b372 100644 --- a/services/horizon/internal/txsub/submitter.go +++ b/services/horizon/internal/txsub/submitter.go @@ -5,6 +5,8 @@ import ( "net/http" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/clients/stellarcore" proto "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/errors" @@ -14,12 +16,12 @@ import ( // NewDefaultSubmitter returns a new, simple Submitter implementation // that submits directly to the stellar-core at `url` using the http client // `h`. -func NewDefaultSubmitter(h *http.Client, url string) Submitter { +func NewDefaultSubmitter(h *http.Client, url string, registry *prometheus.Registry) Submitter { return &submitter{ - StellarCore: &stellarcore.Client{ + StellarCore: stellarcore.NewClientWithMetrics(stellarcore.Client{ HTTP: h, URL: url, - }, + }, registry, "txsub"), Log: log.DefaultLogger.WithField("service", "txsub.submitter"), } } @@ -28,13 +30,13 @@ func NewDefaultSubmitter(h *http.Client, url string) Submitter { // submits directly to the configured stellar-core instance using the // configured http client. type submitter struct { - StellarCore *stellarcore.Client + StellarCore stellarcore.ClientWithMetrics Log *log.Entry } // Submit sends the provided envelope to stellar-core and parses the response into // a SubmissionResult -func (sub *submitter) Submit(ctx context.Context, env string) (result SubmissionResult) { +func (sub *submitter) Submit(ctx context.Context, rawTx string) (result SubmissionResult) { start := time.Now() defer func() { result.Duration = time.Since(start) @@ -44,7 +46,7 @@ func (sub *submitter) Submit(ctx context.Context, env string) (result Submission }).Info("Submitter result") }() - cresp, err := sub.StellarCore.SubmitTransaction(ctx, env) + cresp, err := sub.StellarCore.SubmitTx(ctx, rawTx) if err != nil { result.Err = errors.Wrap(err, "failed to submit") return diff --git a/services/horizon/internal/txsub/submitter_test.go b/services/horizon/internal/txsub/submitter_test.go index 4406f46fb8..5662930b5d 100644 --- a/services/horizon/internal/txsub/submitter_test.go +++ b/services/horizon/internal/txsub/submitter_test.go @@ -1,13 +1,19 @@ package txsub import ( - "github.com/stretchr/testify/assert" "net/http" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/services/horizon/internal/test" ) +const ( + TxXDR = "AAAAAAGUcmKO5465JxTSLQOQljwk2SfqAJmZSG6JH6wtqpwhAAABLAAAAAAAAAABAAAAAAAAAAEAAAALaGVsbG8gd29ybGQAAAAAAwAAAAAAAAAAAAAAABbxCy3mLg3hiTqX4VUEEp60pFOrJNxYM1JtxXTwXhY2AAAAAAvrwgAAAAAAAAAAAQAAAAAW8Qst5i4N4Yk6l+FVBBKetKRTqyTcWDNSbcV08F4WNgAAAAAN4Lazj4x61AAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABLaqcIQAAAEBKwqWy3TaOxoGnfm9eUjfTRBvPf34dvDA0Nf+B8z4zBob90UXtuCqmQqwMCyH+okOI3c05br3khkH0yP4kCwcE" +) + func TestDefaultSubmitter(t *testing.T) { ctx := test.Context() // submits to the configured stellar-core instance correctly @@ -17,11 +23,11 @@ func TestDefaultSubmitter(t *testing.T) { }`) defer server.Close() - s := NewDefaultSubmitter(http.DefaultClient, server.URL) - sr := s.Submit(ctx, "hello") + s := NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr := s.Submit(ctx, TxXDR) assert.Nil(t, sr.Err) assert.True(t, sr.Duration > 0) - assert.Equal(t, "hello", server.LastRequest.URL.Query().Get("blob")) + assert.Equal(t, TxXDR, server.LastRequest.URL.Query().Get("blob")) // Succeeds when stellar-core gives the DUPLICATE response. server = test.NewStaticMockServer(`{ @@ -30,41 +36,41 @@ func TestDefaultSubmitter(t *testing.T) { }`) defer server.Close() - s = NewDefaultSubmitter(http.DefaultClient, server.URL) - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.Nil(t, sr.Err) // Errors when the stellar-core url is empty - s = NewDefaultSubmitter(http.DefaultClient, "") - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, "", prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) //errors when the stellar-core url is not parseable - s = NewDefaultSubmitter(http.DefaultClient, "http://Not a url") - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, "http://Not a url", prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) // errors when the stellar-core url is not reachable - s = NewDefaultSubmitter(http.DefaultClient, "http://127.0.0.1:65535") - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, "http://127.0.0.1:65535", prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) // errors when the stellar-core returns an unparseable response server = test.NewStaticMockServer(`{`) defer server.Close() - s = NewDefaultSubmitter(http.DefaultClient, server.URL) - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) // errors when the stellar-core returns an exception response server = test.NewStaticMockServer(`{"exception": "Invalid XDR"}`) defer server.Close() - s = NewDefaultSubmitter(http.DefaultClient, server.URL) - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) assert.Contains(t, sr.Err.Error(), "Invalid XDR") @@ -72,8 +78,8 @@ func TestDefaultSubmitter(t *testing.T) { server = test.NewStaticMockServer(`{"status": "NOTREAL"}`) defer server.Close() - s = NewDefaultSubmitter(http.DefaultClient, server.URL) - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.NotNil(t, sr.Err) assert.Contains(t, sr.Err.Error(), "NOTREAL") @@ -81,8 +87,8 @@ func TestDefaultSubmitter(t *testing.T) { server = test.NewStaticMockServer(`{"status": "ERROR", "error": "1234"}`) defer server.Close() - s = NewDefaultSubmitter(http.DefaultClient, server.URL) - sr = s.Submit(ctx, "hello") + s = NewDefaultSubmitter(http.DefaultClient, server.URL, prometheus.NewRegistry()) + sr = s.Submit(ctx, TxXDR) assert.IsType(t, &FailedTransactionError{}, sr.Err) ferr := sr.Err.(*FailedTransactionError) assert.Equal(t, "1234", ferr.ResultXDR) diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 31038135f3..2232e61c8d 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -4,11 +4,13 @@ import ( "context" "database/sql" "fmt" - "github.com/stellar/go/services/horizon/internal/ledger" "sync" "time" + "github.com/stellar/go/services/horizon/internal/ledger" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" @@ -44,10 +46,6 @@ type System struct { LedgerState ledger.StateInterface Metrics struct { - // SubmissionDuration exposes timing metrics about the rate and latency of - // submissions to stellar-core - SubmissionDuration prometheus.Summary - // OpenSubmissionsGauge tracks the count of "open" submissions (i.e. // submissions whose transactions haven't been confirmed successful or failed OpenSubmissionsGauge prometheus.Gauge @@ -59,30 +57,14 @@ type System struct { // SuccessfulSubmissionsCounter tracks the rate of successful transactions that // have been submitted to this process SuccessfulSubmissionsCounter prometheus.Counter - - // V0TransactionsCounter tracks the rate of v0 transaction envelopes that - // have been submitted to this process - V0TransactionsCounter prometheus.Counter - - // V1TransactionsCounter tracks the rate of v1 transaction envelopes that - // have been submitted to this process - V1TransactionsCounter prometheus.Counter - - // FeeBumpTransactionsCounter tracks the rate of fee bump transaction envelopes that - // have been submitted to this process - FeeBumpTransactionsCounter prometheus.Counter } } // RegisterMetrics registers the prometheus metrics func (sys *System) RegisterMetrics(registry *prometheus.Registry) { - registry.MustRegister(sys.Metrics.SubmissionDuration) registry.MustRegister(sys.Metrics.OpenSubmissionsGauge) registry.MustRegister(sys.Metrics.FailedSubmissionsCounter) registry.MustRegister(sys.Metrics.SuccessfulSubmissionsCounter) - registry.MustRegister(sys.Metrics.V0TransactionsCounter) - registry.MustRegister(sys.Metrics.V1TransactionsCounter) - registry.MustRegister(sys.Metrics.FeeBumpTransactionsCounter) } // Submit submits the provided base64 encoded transaction envelope to the @@ -130,7 +112,6 @@ func (sys *System) Submit( } sr := sys.submitOnce(ctx, rawTx) - sys.updateTransactionTypeMetrics(envelope) if sr.Err != nil { // any error other than "txBAD_SEQ" is a failure @@ -222,12 +203,10 @@ func (sys *System) deriveTxSubError(ctx context.Context) error { // Submit submits the provided base64 encoded transaction envelope to the // network using this submission system. -func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult { +func (sys *System) submitOnce(ctx context.Context, rawTx string) SubmissionResult { // submit to stellar-core - sr := sys.Submitter.Submit(ctx, env) - sys.Metrics.SubmissionDuration.Observe(float64(sr.Duration.Seconds())) + sr := sys.Submitter.Submit(ctx, rawTx) - // if received or duplicate, add to the open submissions list if sr.Err == nil { sys.Metrics.SuccessfulSubmissionsCounter.Inc() } else { @@ -237,17 +216,6 @@ func (sys *System) submitOnce(ctx context.Context, env string) SubmissionResult return sr } -func (sys *System) updateTransactionTypeMetrics(envelope xdr.TransactionEnvelope) { - switch envelope.Type { - case xdr.EnvelopeTypeEnvelopeTypeTxV0: - sys.Metrics.V0TransactionsCounter.Inc() - case xdr.EnvelopeTypeEnvelopeTypeTx: - sys.Metrics.V1TransactionsCounter.Inc() - case xdr.EnvelopeTypeEnvelopeTypeTxFeeBump: - sys.Metrics.FeeBumpTransactionsCounter.Inc() - } -} - // setTickInProgress sets `tickInProgress` to `true` if it's // `false`. Returns `true` if `tickInProgress` has been switched // to `true` inside this method and `Tick()` should continue. @@ -360,11 +328,6 @@ func (sys *System) Init() { sys.initializer.Do(func() { sys.Log = log.DefaultLogger.WithField("service", "txsub.System") - sys.Metrics.SubmissionDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "txsub", Name: "submission_duration_seconds", - Help: "submission durations to Stellar-Core, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) sys.Metrics.FailedSubmissionsCounter = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "horizon", Subsystem: "txsub", Name: "failed", }) @@ -374,15 +337,6 @@ func (sys *System) Init() { sys.Metrics.OpenSubmissionsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "horizon", Subsystem: "txsub", Name: "open", }) - sys.Metrics.V0TransactionsCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "horizon", Subsystem: "txsub", Name: "v0", - }) - sys.Metrics.V1TransactionsCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "horizon", Subsystem: "txsub", Name: "v1", - }) - sys.Metrics.FeeBumpTransactionsCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "horizon", Subsystem: "txsub", Name: "feebump", - }) sys.accountSeqPollInterval = time.Second diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index b4a36fb522..fbaa353c1b 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -250,7 +250,6 @@ func (suite *SystemTestSuite) TestSubmit_NotFoundError() { assert.True(suite.T(), suite.submitter.WasSubmittedTo) assert.Equal(suite.T(), float64(0), getMetricValue(suite.system.Metrics.SuccessfulSubmissionsCounter).GetCounter().GetValue()) assert.Equal(suite.T(), float64(1), getMetricValue(suite.system.Metrics.FailedSubmissionsCounter).GetCounter().GetValue()) - assert.Equal(suite.T(), uint64(1), getMetricValue(suite.system.Metrics.SubmissionDuration).GetSummary().GetSampleCount()) } // If the error is bad_seq and the result at the transaction's sequence number is for the same hash, return result. @@ -408,7 +407,6 @@ func (suite *SystemTestSuite) TestSubmit_OpenTransactionList() { assert.Equal(suite.T(), suite.successTx.Transaction.TransactionHash, pending[0]) assert.Equal(suite.T(), float64(1), getMetricValue(suite.system.Metrics.SuccessfulSubmissionsCounter).GetCounter().GetValue()) assert.Equal(suite.T(), float64(0), getMetricValue(suite.system.Metrics.FailedSubmissionsCounter).GetCounter().GetValue()) - assert.Equal(suite.T(), uint64(1), getMetricValue(suite.system.Metrics.SubmissionDuration).GetSummary().GetSampleCount()) } // Tick should be a no-op if there are no open submissions.