diff --git a/token/services/network/orion/approval.go b/token/services/network/orion/approval.go index e7b71ab66..587b8603d 100644 --- a/token/services/network/orion/approval.go +++ b/token/services/network/orion/approval.go @@ -60,6 +60,8 @@ func NewRequestApprovalView( } func (r *RequestApprovalView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("approval_request_view") + defer span.End() sm, err := r.DBManager.GetSessionManager(r.Network) if err != nil { return nil, errors.WithMessagef(err, "failed getting session manager for network [%s]", r.Network) @@ -75,17 +77,22 @@ func (r *RequestApprovalView) Call(context view.Context) (interface{}, error) { TxID: r.TxID, Request: r.RequestRaw, } - if err := session.Send(request); err != nil { + span.AddEvent("send_approval_request") + if err := session.SendWithContext(context.Context(), request); err != nil { return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", sm.CustodianID) } response := &ApprovalResponse{} + span.AddEvent("receive_approval_response") if err := session.ReceiveWithTimeout(response, 30*time.Second); err != nil { + span.RecordError(err) return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", sm.CustodianID) } + span.AddEvent("read_tx_envelope") env := sm.Orion.TransactionManager().NewEnvelope() if err := env.FromBytes(response.Envelope); err != nil { return nil, errors.Wrap(err, "failed to unmarshal transaction") } + span.AddEvent("return_tx_envelope") return env, nil } @@ -94,8 +101,11 @@ type RequestApprovalResponderView struct { } func (r *RequestApprovalResponderView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("approval_respond_view") + defer span.End() // receive request session := session2.JSON(context) + span.AddEvent("receive_approval_request") request := &ApprovalRequest{} if err := session.Receive(request); err != nil { return nil, errors.Wrapf(err, "failed to receive request") @@ -106,13 +116,16 @@ func (r *RequestApprovalResponderView) Call(context view.Context) (interface{}, if err != nil { return nil, errors.Wrapf(err, "failed to process request") } - if err := session.Send(&ApprovalResponse{Envelope: txRaw}); err != nil { + span.AddEvent("send_approval_response") + if err := session.SendWithContext(context.Context(), &ApprovalResponse{Envelope: txRaw}); err != nil { return nil, errors.Wrapf(err, "failed to send response") } return nil, nil } func (r *RequestApprovalResponderView) process(context view.Context, request *ApprovalRequest) ([]byte, error) { + span := context.StartSpan("approval_request_process") + defer span.End() ds, err := driver.GetTokenDriverService(context) if err != nil { return nil, errors.Wrapf(err, "failed to get token driver") @@ -121,6 +134,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap if err != nil { return nil, errors.Wrapf(err, "failed to get session manager for network [%s]", request.Network) } + span.AddEvent("fetch_public_params") pp, err := sm.PublicParameters(ds, request.Namespace) if err != nil { return nil, errors.Wrapf(err, "failed to get public parameters for network [%s]", request.Network) @@ -135,6 +149,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap numRetries := 5 sleepDuration := 1 * time.Second for i := 0; i < numRetries; i++ { + span.AddEvent("try_validate") envelopeRaw, retry, err := r.validate(context, request, validator) if err != nil { if !retry { @@ -143,6 +158,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap } logger.Errorf("failed to commit transaction [%s], retry [%d]", err, i) // was the transaction committed, by any chance? + span.AddEvent("fetch_tx_status") status, err := txStatusFetcher.process(context, &TxStatusRequest{ Network: request.Network, Namespace: request.Namespace, @@ -173,6 +189,8 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap } func (r *RequestApprovalResponderView) validate(context view.Context, request *ApprovalRequest, validator driver.Validator) ([]byte, bool, error) { + span := context.StartSpan("tx_request_validation") + defer span.End() sm, err := r.dbManager.GetSessionManager(request.Network) if err != nil { return nil, true, errors.Wrapf(err, "failed to get session manager for network [%s]", request.Network) @@ -185,6 +203,7 @@ func (r *RequestApprovalResponderView) validate(context view.Context, request *A if err != nil { return nil, true, errors.Wrapf(err, "failed to get query executor for orion network [%s]", request.Network) } + span.AddEvent("validate_request") actions, attributes, err := token.NewValidator(validator).UnmarshallAndVerifyWithMetadata( context.Context(), &LedgerWrapper{qe: qe}, @@ -212,6 +231,7 @@ func (r *RequestApprovalResponderView) validate(context view.Context, request *A return nil, false, errors.Wrapf(err, "failed to write action") } } + span.AddEvent("commit_token_request") err = t.CommitTokenRequest(attributes[common.TokenRequestToSign], true) if err != nil { return nil, false, errors.Wrapf(err, "failed to commit token request") diff --git a/token/services/network/orion/broadcast.go b/token/services/network/orion/broadcast.go index 24b351145..ba6ac92d7 100644 --- a/token/services/network/orion/broadcast.go +++ b/token/services/network/orion/broadcast.go @@ -40,6 +40,8 @@ func NewBroadcastView(dbManager *DBManager, network string, blob interface{}) *B } func (r *BroadcastView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("broadcast_view") + defer span.End() sm, err := r.DBManager.GetSessionManager(r.Network) if err != nil { return nil, errors.Wrapf(err, "failed getting session manager for network [%s]", r.Network) @@ -67,10 +69,12 @@ func (r *BroadcastView) Call(context view.Context) (interface{}, error) { Network: r.Network, Blob: blob, } - if err := session.Send(request); err != nil { + span.AddEvent("send_broadcast_request") + if err := session.SendWithContext(context.Context(), request); err != nil { return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", custodian) } response := &BroadcastResponse{} + span.AddEvent("receive_broadcast_response") if err := session.ReceiveWithTimeout(response, 30*time.Second); err != nil { return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", custodian) } @@ -85,9 +89,12 @@ type BroadcastResponderView struct { } func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("broadcast_responder_view") + defer span.End() // receive request session := session2.JSON(context) request := &BroadcastRequest{} + span.AddEvent("receive_request") if err := session.Receive(request); err != nil { return nil, errors.Wrapf(err, "failed to receive request") } @@ -105,7 +112,9 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error) numRetries := 5 sleepDuration := 1 * time.Second for i := 0; i < numRetries; i++ { + span.AddEvent("try_broadcast") if _, txID, err2 := r.broadcast(context, sm, request); err2 != nil { + span.RecordError(err2) logger.Errorf("failed to broadcast to [%s], txID [%s] with err [%s], retry [%d]", sm.CustodianID, txID, err2, i) if strings.Contains(err2.Error(), "is not valid") { err = err2 @@ -114,6 +123,7 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error) if len(txID) != 0 { // was the transaction committed, by any chance? logger.Errorf("check transaction [%s] status on [%s], retry [%d]", txID, sm.CustodianID, i) + span.AddEvent("fetch_tx_status") status, err := txStatusFetcher.process(context, &TxStatusRequest{ Network: request.Network, TxID: txID, @@ -145,7 +155,7 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error) if !done { broadcastError = fmt.Sprintf("failed to broadcast to [%s] with err [%s]", sm.CustodianID, err) } - if err := session.Send(&BroadcastResponse{Err: broadcastError}); err != nil { + if err := session.SendWithContext(context.Context(), &BroadcastResponse{Err: broadcastError}); err != nil { return nil, errors.Wrapf(err, "failed to send response") } return nil, nil diff --git a/token/services/network/orion/txstatus.go b/token/services/network/orion/txstatus.go index 6b9345881..c2b430b04 100644 --- a/token/services/network/orion/txstatus.go +++ b/token/services/network/orion/txstatus.go @@ -44,6 +44,8 @@ func NewRequestTxStatusView(network string, namespace string, txID string, dbMan } func (r *RequestTxStatusView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("request_tx_status_view") + defer span.End() sm, err := r.dbManager.GetSessionManager(r.Network) if err != nil { return nil, errors.WithMessagef(err, "failed getting session manager for network [%s]", r.Network) @@ -59,10 +61,12 @@ func (r *RequestTxStatusView) Call(context view.Context) (interface{}, error) { Namespace: r.Namespace, TxID: r.TxID, } - if err := session.Send(request); err != nil { + span.AddEvent("send_tx_status_request") + if err := session.SendWithContext(context.Context(), request); err != nil { return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", custodian) } response := &TxStatusResponse{} + span.AddEvent("receive_tx_status_response") if err := session.Receive(response); err != nil { return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", custodian) } @@ -74,19 +78,24 @@ type RequestTxStatusResponderView struct { } func (r *RequestTxStatusResponderView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("request_tx_status_responder_view") + defer span.End() // receive request session := session2.JSON(context) request := &TxStatusRequest{} + span.AddEvent("receive_tx_status_request") if err := session.Receive(request); err != nil { return nil, errors.Wrapf(err, "failed to receive request") } logger.Debugf("request: %+v", request) + span.AddEvent("process_tx_status_request") response, err := r.process(context, request) if err != nil { return nil, errors.Wrapf(err, "failed to process request") } - if err := session.Send(response); err != nil { + span.AddEvent("send_tx_status_response") + if err := session.SendWithContext(context.Context(), response); err != nil { return nil, errors.Wrapf(err, "failed to send response") } return nil, nil diff --git a/token/services/ttx/endorse.go b/token/services/ttx/endorse.go index 62af02bb2..e4a4081a5 100644 --- a/token/services/ttx/endorse.go +++ b/token/services/ttx/endorse.go @@ -275,7 +275,7 @@ func (c *CollectEndorsementsView) signRemote(context view.Context, party view.Id if err != nil { return nil, err } - err = session.Send(signatureRequestRaw) + err = session.SendWithContext(context.Context(), signatureRequestRaw) if err != nil { return nil, errors.Wrap(err, "failed sending transaction content") } @@ -564,7 +564,7 @@ func (c *CollectEndorsementsView) distributeEnv(context view.Context, env *netwo // Wait to receive a content back ch := session.Receive() // Send the content - err = session.Send(txRaw) + err = session.SendWithContext(context.Context(), txRaw) if err != nil { return errors.Wrap(err, "failed sending transaction content") } @@ -631,6 +631,8 @@ func NewReceiveTransactionView(network string) *ReceiveTransactionView { } func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("receive_tx_view") + defer span.End() // Wait to receive a transaction back ch := context.Session().Receive() @@ -639,6 +641,7 @@ func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error) select { case msg := <-ch: + span.AddEvent("receive_tx") if msg.Status == view.ERROR { return nil, errors.New(string(msg.Payload)) } @@ -655,7 +658,9 @@ func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error) } return tx, nil case <-timeout.C: - return nil, errors.New("timeout reached") + err := errors.New("timeout reached") + span.RecordError(err) + return nil, err } } @@ -760,7 +765,7 @@ func (s *EndorseView) Call(context view.Context) (interface{}, error) { if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("Send back signature [%s][%s]", signatureRequest.Signer, hash.Hashable(sigma)) } - err = session.Send(sigma) + err = session.SendWithContext(context.Context(), sigma) if err != nil { return nil, errors.Wrapf(err, "failed sending signature back") } @@ -792,7 +797,7 @@ func (s *EndorseView) Call(context view.Context) (interface{}, error) { if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("ack response: [%s] from [%s]", hash.Hashable(sigma), view2.GetIdentityProvider(context).DefaultIdentity()) } - if err := session.Send(sigma); err != nil { + if err := session.SendWithContext(context.Context(), sigma); err != nil { return nil, errors.WithMessage(err, "failed sending ack") } diff --git a/token/services/ttx/recipients.go b/token/services/ttx/recipients.go index 1e5a81b9e..f40ba37b5 100644 --- a/token/services/ttx/recipients.go +++ b/token/services/ttx/recipients.go @@ -102,6 +102,8 @@ func RequestRecipientIdentity(context view.Context, recipient view.Identity, opt } func (f *RequestRecipientIdentityView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("request_recipient_identity_view") + defer span.End() w := token.GetManagementService(context, token.WithTMSID(f.TMSID)).WalletManager().OwnerWallet(f.Other) if isSameNode := w != nil; !isSameNode { @@ -114,6 +116,8 @@ func (f *RequestRecipientIdentityView) Call(context view.Context) (interface{}, } func (f *RequestRecipientIdentityView) callWithRecipientData(context view.Context) (interface{}, error) { + span := context.StartSpan("other_recipient_identity_request") + defer span.End() if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("request recipient [%s] is not registered", f.Other) } @@ -132,11 +136,13 @@ func (f *RequestRecipientIdentityView) callWithRecipientData(context view.Contex if err != nil { return nil, errors.Wrapf(err, "failed marshalling recipient request") } - err = session.Send(rrRaw) + span.AddEvent("send_identity_request") + err = session.SendWithContext(context.Context(), rrRaw) if err != nil { return nil, errors.Wrapf(err, "failed to send recipient request") } + span.AddEvent("receive_identity_response") // Wait to receive a view identity ch := session.Receive() var payload []byte @@ -146,12 +152,15 @@ func (f *RequestRecipientIdentityView) callWithRecipientData(context view.Contex select { case msg := <-ch: + span.AddEvent("receive_message") if msg.Status == view.ERROR { return nil, errors.New(string(msg.Payload)) } payload = msg.Payload case <-timeout.C: - return nil, errors.New("timeout reached") + err := errors.New("timeout reached") + span.RecordError(err) + return nil, err } recipientData, err := RecipientDataFromBytes(payload) @@ -160,6 +169,7 @@ func (f *RequestRecipientIdentityView) callWithRecipientData(context view.Contex return nil, errors.Wrapf(err, "failed to unmarshal recipient data") } wm := token.GetManagementService(context, token.WithTMSID(f.TMSID)).WalletManager() + span.AddEvent("register_recipient_identity") if err := wm.RegisterRecipientIdentity(recipientData); err != nil { logger.Errorf("failed to register recipient identity: [%s]", err) return nil, errors.Wrapf(err, "failed to register recipient identity") @@ -169,6 +179,7 @@ func (f *RequestRecipientIdentityView) callWithRecipientData(context view.Contex if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("update endpoint resolver for [%s], bind to [%s]", recipientData.Identity, f.Other) } + span.AddEvent("bind_identity") if err := view2.GetEndpointService(context).Bind(f.Other, recipientData.Identity); err != nil { if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("failed binding [%s] to [%s]", recipientData.Identity, f.Other) @@ -205,12 +216,15 @@ func RespondRequestRecipientIdentityUsingWallet(context view.Context, wallet str } func (s *RespondRequestRecipientIdentityView) Call(context view.Context) (interface{}, error) { + span := context.StartSpan("request_recipient_identity_respond_view") + defer span.End() session, payload, err := session2.ReadFirstMessage(context) if err != nil { logger.Errorf("failed to read first message: [%s]", err) return nil, errors.Wrapf(err, "failed to read first message") } + span.AddEvent("received_first_message") recipientRequest := &RecipientRequest{} if err := recipientRequest.FromBytes(payload); err != nil { logger.Errorf("failed to unmarshal recipient request: [%s][%s]", payload, err) @@ -244,6 +258,7 @@ func (s *RespondRequestRecipientIdentityView) Call(context view.Context) (interf } // TODO: check the other values too } else { + span.AddEvent("generate_identity") // otherwise generate one fresh recipientIdentity, err = w.GetRecipientIdentity() if err != nil { @@ -273,7 +288,8 @@ func (s *RespondRequestRecipientIdentityView) Call(context view.Context) (interf } // Step 3: send the public key back to the invoker - err = session.Send(recipientDataRaw) + span.AddEvent("send_recipient_identity_response") + err = session.SendWithContext(context.Context(), recipientDataRaw) if err != nil { logger.Errorf("failed to send recipient data: [%s]", err) return nil, errors.Wrapf(err, "failed to send recipient data") @@ -284,6 +300,7 @@ func (s *RespondRequestRecipientIdentityView) Call(context view.Context) (interf if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("bind me [%s] to [%s]", context.Me(), recipientData) } + span.AddEvent("bind_identity") err = resolver.Bind(context.Me(), recipientIdentity) if err != nil { logger.Errorf("failed binding [%s] to [%s]", context.Me(), recipientData) @@ -370,7 +387,7 @@ func (f *ExchangeRecipientIdentitiesView) Call(context view.Context) (interface{ if err != nil { return nil, err } - if err := session.Send(requestRaw); err != nil { + if err := session.SendWithContext(context.Context(), requestRaw); err != nil { return nil, err } @@ -475,7 +492,7 @@ func (s *RespondExchangeRecipientIdentitiesView) Call(context view.Context) (int return nil, err } - if err := session.Send(recipientDataRaw); err != nil { + if err := session.SendWithContext(context.Context(), recipientDataRaw); err != nil { return nil, err }