Skip to content

Commit

Permalink
services/horizon + horizonclient: Add new async transaction submissio…
Browse files Browse the repository at this point in the history
…n endpoint (#5188)

* Add new txsub endpoint - 1

* Add new txsub endpoint - 2

* Add new txsub endpoint - 3

* Update Status to TxStatus

* Add unittests for new endpoint

* Create submit_transaction_async_test.go

* Fix goimports

* Rearrange code and remove duplicate code

* Add metrics - 1

* Add metrics - 2

* Add metrics - 3

* Fix failing unittest

* Add new endpoint to go sdk + integration test

* Small changes - 1

* Add openAPI taml

* Address review changes - 1

* Remove private methods from interface

* Use common metrics client for legacy and async txsub

* Fix submitter test

* Update submit_transaction_async.go

* Fix failing test

* Update txsub_async_oapi.yaml

* Update submitter.go

* Interface method change

* Remove duplicate code

* Add test for GET /transactions-async

* Encapsulation - 1

* Change endpoint naming

* Pass interface instead of client

* Remove ClientInterface

* Remove HTTP Status from submission response

* Add logging statements

* Fix failing integration tests

* Fix failing tests - 1

* Add back deleted files

* Remove circular import

* Group metrics into submission duration

* Group metrics into submission duration - 2

* Remove logging statements where not needed

* Change to internal server error

* Use request context logger

* Use interface method for setting http status

* Remove not needed metrics

* Remove version

* add error in extras

* Resolve merge conflicts

* Add TODO for problem response

* Adding and removing logging statements

* Move interface to async handler file

* change httpstatus interface definition

* Add deleted files back

* Revert friendbot change

* Add test for getting pending tx

* Fix failing test

* remove metrics struct and make vars private

* pass only rawTx string

* Move mock to test file

* Make core client private

* Remove UpdateTxSubMetrics func

* Change http status for DISABLE_TX_SUB

* Fix failing unittest

* Revert submitter changes

* Fix failing submitter_test

* Revert import changes

* Revert import changes - 2

* Revert import changes - 3

* Remove integration test function

* Update main.go
  • Loading branch information
aditya1702 authored Apr 26, 2024
1 parent ee9bbbf commit a387ffb
Show file tree
Hide file tree
Showing 22 changed files with 1,001 additions and 106 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/services/horizon/captive-core
/services/horizon/horizon
/services/horizon/stellar-horizon
/bucket-cache
.vscode
.idea
debug
Expand Down
112 changes: 95 additions & 17 deletions clients/horizonclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,44 @@ func (c *Client) OperationDetail(id string) (ops operations.Operation, err error
return ops, nil
}

// validateFeeBumpTx checks if the inner transaction has a memo or not and converts the transaction object to
// base64 string.
func (c *Client) validateFeeBumpTx(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (string, error) {
var err error
if inner := transaction.InnerTransaction(); !opts.SkipMemoRequiredCheck && inner.Memo() == nil {
err = c.checkMemoRequired(inner)
if err != nil {
return "", err
}
}

txeBase64, err := transaction.Base64()
if err != nil {
err = errors.Wrap(err, "Unable to convert transaction object to base64 string")
return "", err
}
return txeBase64, nil
}

// validateTx checks if the transaction has a memo or not and converts the transaction object to
// base64 string.
func (c *Client) validateTx(transaction *txnbuild.Transaction, opts SubmitTxOpts) (string, error) {
var err error
if !opts.SkipMemoRequiredCheck && transaction.Memo() == nil {
err = c.checkMemoRequired(transaction)
if err != nil {
return "", err
}
}

txeBase64, err := transaction.Base64()
if err != nil {
err = errors.Wrap(err, "Unable to convert transaction object to base64 string")
return "", err
}
return txeBase64, nil
}

// SubmitTransactionXDR submits a transaction represented as a base64 XDR string to the network. err can be either error object or horizon.Error object.
// See https://developers.stellar.org/api/resources/transactions/post/
func (c *Client) SubmitTransactionXDR(transactionXdr string) (tx hProtocol.Transaction,
Expand Down Expand Up @@ -469,16 +507,8 @@ func (c *Client) SubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransacti
func (c *Client) SubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (tx hProtocol.Transaction, err error) {
// only check if memo is required if skip is false and the inner transaction
// doesn't have a memo.
if inner := transaction.InnerTransaction(); !opts.SkipMemoRequiredCheck && inner.Memo() == nil {
err = c.checkMemoRequired(inner)
if err != nil {
return
}
}

txeBase64, err := transaction.Base64()
txeBase64, err := c.validateFeeBumpTx(transaction, opts)
if err != nil {
err = errors.Wrap(err, "Unable to convert transaction object to base64 string")
return
}

Expand All @@ -505,20 +535,68 @@ func (c *Client) SubmitTransaction(transaction *txnbuild.Transaction) (tx hProto
func (c *Client) SubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (tx hProtocol.Transaction, err error) {
// only check if memo is required if skip is false and the transaction
// doesn't have a memo.
if !opts.SkipMemoRequiredCheck && transaction.Memo() == nil {
err = c.checkMemoRequired(transaction)
if err != nil {
return
}
txeBase64, err := c.validateTx(transaction, opts)
if err != nil {
return
}

txeBase64, err := transaction.Base64()
return c.SubmitTransactionXDR(txeBase64)
}

// AsyncSubmitTransactionXDR submits a base64 XDR transaction using the transactions_async endpoint. err can be either error object or horizon.Error object.
func (c *Client) AsyncSubmitTransactionXDR(transactionXdr string) (txResp hProtocol.AsyncTransactionSubmissionResponse,
err error) {
request := submitRequest{endpoint: "transactions_async", transactionXdr: transactionXdr}
err = c.sendRequest(request, &txResp)
return
}

// AsyncSubmitFeeBumpTransaction submits an async fee bump transaction to the network. err can be either an
// error object or a horizon.Error object.
//
// This function will always check if the destination account requires a memo in the transaction as
// defined in SEP0029: https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0029.md
//
// If you want to skip this check, use SubmitTransactionWithOptions.
func (c *Client) AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) {
return c.AsyncSubmitFeeBumpTransactionWithOptions(transaction, SubmitTxOpts{})
}

// AsyncSubmitFeeBumpTransactionWithOptions submits an async fee bump transaction to the network, allowing
// you to pass SubmitTxOpts. err can be either an error object or a horizon.Error object.
func (c *Client) AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) {
// only check if memo is required if skip is false and the inner transaction
// doesn't have a memo.
txeBase64, err := c.validateFeeBumpTx(transaction, opts)
if err != nil {
err = errors.Wrap(err, "Unable to convert transaction object to base64 string")
return
}

return c.SubmitTransactionXDR(txeBase64)
return c.AsyncSubmitTransactionXDR(txeBase64)
}

// AsyncSubmitTransaction submits an async transaction to the network. err can be either an
// error object or a horizon.Error object.
//
// This function will always check if the destination account requires a memo in the transaction as
// defined in SEP0029: https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0029.md
//
// If you want to skip this check, use SubmitTransactionWithOptions.
func (c *Client) AsyncSubmitTransaction(transaction *txnbuild.Transaction) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) {
return c.AsyncSubmitTransactionWithOptions(transaction, SubmitTxOpts{})
}

// AsyncSubmitTransactionWithOptions submits an async transaction to the network, allowing
// you to pass SubmitTxOpts. err can be either an error object or a horizon.Error object.
func (c *Client) AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (txResp hProtocol.AsyncTransactionSubmissionResponse, err error) {
// only check if memo is required if skip is false and the transaction
// doesn't have a memo.
txeBase64, err := c.validateTx(transaction, opts)
if err != nil {
return
}

return c.AsyncSubmitTransactionXDR(txeBase64)
}

// Transactions returns stellar transactions (https://developers.stellar.org/api/resources/transactions/list/)
Expand Down
5 changes: 4 additions & 1 deletion clients/horizonclient/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func decodeResponse(resp *http.Response, object interface{}, horizonUrl string,
}
setCurrentServerTime(u.Hostname(), resp.Header["Date"], clock)

if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
// While this part of code assumes that any error < 200 or error >= 300 is a Horizon problem, it is not
// true for the response from /transactions_async endpoint which does give these codes for certain responses
// from core.
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) && (resp.Request == nil || resp.Request.URL == nil || resp.Request.URL.Path != "/transactions_async") {
horizonError := &Error{
Response: resp,
}
Expand Down
5 changes: 5 additions & 0 deletions clients/horizonclient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ type ClientInterface interface {
SubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.Transaction, error)
SubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.Transaction, error)
SubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.Transaction, error)
AsyncSubmitTransactionXDR(transactionXdr string) (hProtocol.AsyncTransactionSubmissionResponse, error)
AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error)
AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error)
AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.AsyncTransactionSubmissionResponse, error)
AsyncSubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.AsyncTransactionSubmissionResponse, error)
Transactions(request TransactionRequest) (hProtocol.TransactionsPage, error)
TransactionDetail(txHash string) (hProtocol.Transaction, error)
OrderBook(request OrderBookRequest) (hProtocol.OrderBookSummary, error)
Expand Down
30 changes: 30 additions & 0 deletions clients/horizonclient/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,36 @@ func (m *MockClient) SubmitTransactionWithOptions(transaction *txnbuild.Transact
return a.Get(0).(hProtocol.Transaction), a.Error(1)
}

// AsyncSubmitTransactionXDR is a mocking method
func (m *MockClient) AsyncSubmitTransactionXDR(transactionXdr string) (hProtocol.AsyncTransactionSubmissionResponse, error) {
a := m.Called(transactionXdr)
return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1)
}

// AsyncSubmitFeeBumpTransaction is a mocking method
func (m *MockClient) AsyncSubmitFeeBumpTransaction(transaction *txnbuild.FeeBumpTransaction) (hProtocol.AsyncTransactionSubmissionResponse, error) {
a := m.Called(transaction)
return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1)
}

// AsyncSubmitTransaction is a mocking method
func (m *MockClient) AsyncSubmitTransaction(transaction *txnbuild.Transaction) (hProtocol.AsyncTransactionSubmissionResponse, error) {
a := m.Called(transaction)
return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1)
}

// AsyncSubmitFeeBumpTransactionWithOptions is a mocking method
func (m *MockClient) AsyncSubmitFeeBumpTransactionWithOptions(transaction *txnbuild.FeeBumpTransaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) {
a := m.Called(transaction, opts)
return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1)
}

// AsyncSubmitTransactionWithOptions is a mocking method
func (m *MockClient) AsyncSubmitTransactionWithOptions(transaction *txnbuild.Transaction, opts SubmitTxOpts) (hProtocol.AsyncTransactionSubmissionResponse, error) {
a := m.Called(transaction, opts)
return a.Get(0).(hProtocol.AsyncTransactionSubmissionResponse), a.Error(1)
}

// Transactions is a mocking method
func (m *MockClient) Transactions(request TransactionRequest) (hProtocol.TransactionsPage, error) {
a := m.Called(request)
Expand Down
70 changes: 70 additions & 0 deletions clients/stellarcore/metrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package stellarcore

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

proto "github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/xdr"
)

var envelopeTypeToLabel = map[xdr.EnvelopeType]string{
xdr.EnvelopeTypeEnvelopeTypeTxV0: "v0",
xdr.EnvelopeTypeEnvelopeTypeTx: "v1",
xdr.EnvelopeTypeEnvelopeTypeTxFeeBump: "fee_bump",
}

type ClientWithMetrics struct {
coreClient Client

// submissionDuration exposes timing metrics about the rate and latency of
// submissions to stellar-core
submissionDuration *prometheus.SummaryVec
}

func (c ClientWithMetrics) SubmitTx(ctx context.Context, rawTx string) (*proto.TXResponse, error) {
var envelope xdr.TransactionEnvelope
err := xdr.SafeUnmarshalBase64(rawTx, &envelope)
if err != nil {
return &proto.TXResponse{}, err
}

startTime := time.Now()
response, err := c.coreClient.SubmitTransaction(ctx, rawTx)
duration := time.Since(startTime).Seconds()

label := prometheus.Labels{}
if err != nil {
label["status"] = "request_error"
} else if response.IsException() {
label["status"] = "exception"
} else {
label["status"] = response.Status
}

label["envelope_type"] = envelopeTypeToLabel[envelope.Type]
c.submissionDuration.With(label).Observe(duration)

return response, err
}

func NewClientWithMetrics(client Client, registry *prometheus.Registry, prometheusSubsystem string) ClientWithMetrics {
submissionDuration := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon",
Subsystem: prometheusSubsystem,
Name: "submission_duration_seconds",
Help: "submission durations to Stellar-Core, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"status", "envelope_type"})

registry.MustRegister(
submissionDuration,
)

return ClientWithMetrics{
coreClient: client,
submissionDuration: submissionDuration,
}
}
29 changes: 29 additions & 0 deletions protocols/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"math"
"math/big"
"net/http"
"strconv"
"time"

"github.com/stellar/go/protocols/horizon/base"
proto "github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/strkey"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
Expand All @@ -29,6 +31,13 @@ var KeyTypeNames = map[strkey.VersionByte]string{
strkey.VersionByteSignedPayload: "ed25519_signed_payload",
}

var coreStatusToHTTPStatus = map[string]int{
proto.TXStatusPending: http.StatusCreated,
proto.TXStatusDuplicate: http.StatusConflict,
proto.TXStatusTryAgainLater: http.StatusServiceUnavailable,
proto.TXStatusError: http.StatusBadRequest,
}

// Account is the summary of an account
type Account struct {
Links struct {
Expand Down Expand Up @@ -567,6 +576,26 @@ type InnerTransaction struct {
MaxFee int64 `json:"max_fee,string"`
}

// AsyncTransactionSubmissionResponse represents the response returned by Horizon
// when using the transaction-async endpoint.
type AsyncTransactionSubmissionResponse struct {
// ErrorResultXDR is present only if Status is equal to proto.TXStatusError.
// ErrorResultXDR is a TransactionResult xdr string which contains details on why
// the transaction could not be accepted by stellar-core.
ErrorResultXDR string `json:"errorResultXdr,omitempty"`
// TxStatus represents the status of the transaction submission returned by stellar-core.
// It can be one of: proto.TXStatusPending, proto.TXStatusDuplicate,
// proto.TXStatusTryAgainLater, or proto.TXStatusError.
TxStatus string `json:"tx_status"`
// Hash is a hash of the transaction which can be used to look up whether
// the transaction was included in the ledger.
Hash string `json:"hash"`
}

func (response AsyncTransactionSubmissionResponse) GetStatus() int {
return coreStatusToHTTPStatus[response.TxStatus]
}

// MarshalJSON implements a custom marshaler for Transaction.
// The memo field should be omitted if and only if the
// memo_type is "none".
Expand Down
13 changes: 7 additions & 6 deletions services/horizon/internal/actions/submit_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"net/http"

"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"

"github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/protocols/stellarcore"
hProblem "github.com/stellar/go/services/horizon/internal/render/problem"
"github.com/stellar/go/services/horizon/internal/resourceadapter"
"github.com/stellar/go/services/horizon/internal/txsub"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
"github.com/stellar/go/support/render/problem"
"github.com/stellar/go/xdr"
Expand All @@ -37,7 +38,7 @@ type envelopeInfo struct {
parsed xdr.TransactionEnvelope
}

func (handler SubmitTransactionHandler) extractEnvelopeInfo(raw string, passphrase string) (envelopeInfo, error) {
func extractEnvelopeInfo(raw string, passphrase string) (envelopeInfo, error) {
result := envelopeInfo{raw: raw}
err := xdr.SafeUnmarshalBase64(raw, &result.parsed)
if err != nil {
Expand All @@ -60,7 +61,7 @@ func (handler SubmitTransactionHandler) extractEnvelopeInfo(raw string, passphra
return result, nil
}

func (handler SubmitTransactionHandler) validateBodyType(r *http.Request) error {
func validateBodyType(r *http.Request) error {
c := r.Header.Get("Content-Type")
if c == "" {
return nil
Expand Down Expand Up @@ -139,15 +140,15 @@ func (handler SubmitTransactionHandler) response(r *http.Request, info envelopeI
}

func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Request) (interface{}, error) {
if err := handler.validateBodyType(r); err != nil {
if err := validateBodyType(r); err != nil {
return nil, err
}

if handler.DisableTxSub {
return nil, &problem.P{
Type: "transaction_submission_disabled",
Title: "Transaction Submission Disabled",
Status: http.StatusMethodNotAllowed,
Status: http.StatusForbidden,
Detail: "Transaction submission has been disabled for Horizon. " +
"To enable it again, remove env variable DISABLE_TX_SUB.",
Extras: map[string]interface{}{},
Expand All @@ -159,7 +160,7 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ
return nil, err
}

info, err := handler.extractEnvelopeInfo(raw, handler.NetworkPassphrase)
info, err := extractEnvelopeInfo(raw, handler.NetworkPassphrase)
if err != nil {
return nil, &problem.P{
Type: "transaction_malformed",
Expand Down
Loading

0 comments on commit a387ffb

Please sign in to comment.