Skip to content

Commit

Permalink
Added traces for views (hyperledger-labs#710)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios authored Jul 21, 2024
1 parent aa40343 commit 8625bd5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 16 deletions.
24 changes: 22 additions & 2 deletions token/services/network/orion/approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func NewRequestApprovalView(
}

func (r *RequestApprovalView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("approval_request_view")
defer span.End()
sm, err := r.DBManager.GetSessionManager(r.Network)
if err != nil {
return nil, errors.WithMessagef(err, "failed getting session manager for network [%s]", r.Network)
Expand All @@ -75,17 +77,22 @@ func (r *RequestApprovalView) Call(context view.Context) (interface{}, error) {
TxID: r.TxID,
Request: r.RequestRaw,
}
if err := session.Send(request); err != nil {
span.AddEvent("send_approval_request")
if err := session.SendWithContext(context.Context(), request); err != nil {
return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", sm.CustodianID)
}
response := &ApprovalResponse{}
span.AddEvent("receive_approval_response")
if err := session.ReceiveWithTimeout(response, 30*time.Second); err != nil {
span.RecordError(err)
return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", sm.CustodianID)
}
span.AddEvent("read_tx_envelope")
env := sm.Orion.TransactionManager().NewEnvelope()
if err := env.FromBytes(response.Envelope); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal transaction")
}
span.AddEvent("return_tx_envelope")
return env, nil
}

Expand All @@ -94,8 +101,11 @@ type RequestApprovalResponderView struct {
}

func (r *RequestApprovalResponderView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("approval_respond_view")
defer span.End()
// receive request
session := session2.JSON(context)
span.AddEvent("receive_approval_request")
request := &ApprovalRequest{}
if err := session.Receive(request); err != nil {
return nil, errors.Wrapf(err, "failed to receive request")
Expand All @@ -106,13 +116,16 @@ func (r *RequestApprovalResponderView) Call(context view.Context) (interface{},
if err != nil {
return nil, errors.Wrapf(err, "failed to process request")
}
if err := session.Send(&ApprovalResponse{Envelope: txRaw}); err != nil {
span.AddEvent("send_approval_response")
if err := session.SendWithContext(context.Context(), &ApprovalResponse{Envelope: txRaw}); err != nil {
return nil, errors.Wrapf(err, "failed to send response")
}
return nil, nil
}

func (r *RequestApprovalResponderView) process(context view.Context, request *ApprovalRequest) ([]byte, error) {
span := context.StartSpan("approval_request_process")
defer span.End()
ds, err := driver.GetTokenDriverService(context)
if err != nil {
return nil, errors.Wrapf(err, "failed to get token driver")
Expand All @@ -121,6 +134,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap
if err != nil {
return nil, errors.Wrapf(err, "failed to get session manager for network [%s]", request.Network)
}
span.AddEvent("fetch_public_params")
pp, err := sm.PublicParameters(ds, request.Namespace)
if err != nil {
return nil, errors.Wrapf(err, "failed to get public parameters for network [%s]", request.Network)
Expand All @@ -135,6 +149,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap
numRetries := 5
sleepDuration := 1 * time.Second
for i := 0; i < numRetries; i++ {
span.AddEvent("try_validate")
envelopeRaw, retry, err := r.validate(context, request, validator)
if err != nil {
if !retry {
Expand All @@ -143,6 +158,7 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap
}
logger.Errorf("failed to commit transaction [%s], retry [%d]", err, i)
// was the transaction committed, by any chance?
span.AddEvent("fetch_tx_status")
status, err := txStatusFetcher.process(context, &TxStatusRequest{
Network: request.Network,
Namespace: request.Namespace,
Expand Down Expand Up @@ -173,6 +189,8 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap
}

func (r *RequestApprovalResponderView) validate(context view.Context, request *ApprovalRequest, validator driver.Validator) ([]byte, bool, error) {
span := context.StartSpan("tx_request_validation")
defer span.End()
sm, err := r.dbManager.GetSessionManager(request.Network)
if err != nil {
return nil, true, errors.Wrapf(err, "failed to get session manager for network [%s]", request.Network)
Expand All @@ -185,6 +203,7 @@ func (r *RequestApprovalResponderView) validate(context view.Context, request *A
if err != nil {
return nil, true, errors.Wrapf(err, "failed to get query executor for orion network [%s]", request.Network)
}
span.AddEvent("validate_request")
actions, attributes, err := token.NewValidator(validator).UnmarshallAndVerifyWithMetadata(
context.Context(),
&LedgerWrapper{qe: qe},
Expand Down Expand Up @@ -212,6 +231,7 @@ func (r *RequestApprovalResponderView) validate(context view.Context, request *A
return nil, false, errors.Wrapf(err, "failed to write action")
}
}
span.AddEvent("commit_token_request")
err = t.CommitTokenRequest(attributes[common.TokenRequestToSign], true)
if err != nil {
return nil, false, errors.Wrapf(err, "failed to commit token request")
Expand Down
14 changes: 12 additions & 2 deletions token/services/network/orion/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func NewBroadcastView(dbManager *DBManager, network string, blob interface{}) *B
}

func (r *BroadcastView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("broadcast_view")
defer span.End()
sm, err := r.DBManager.GetSessionManager(r.Network)
if err != nil {
return nil, errors.Wrapf(err, "failed getting session manager for network [%s]", r.Network)
Expand Down Expand Up @@ -67,10 +69,12 @@ func (r *BroadcastView) Call(context view.Context) (interface{}, error) {
Network: r.Network,
Blob: blob,
}
if err := session.Send(request); err != nil {
span.AddEvent("send_broadcast_request")
if err := session.SendWithContext(context.Context(), request); err != nil {
return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", custodian)
}
response := &BroadcastResponse{}
span.AddEvent("receive_broadcast_response")
if err := session.ReceiveWithTimeout(response, 30*time.Second); err != nil {
return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", custodian)
}
Expand All @@ -85,9 +89,12 @@ type BroadcastResponderView struct {
}

func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("broadcast_responder_view")
defer span.End()
// receive request
session := session2.JSON(context)
request := &BroadcastRequest{}
span.AddEvent("receive_request")
if err := session.Receive(request); err != nil {
return nil, errors.Wrapf(err, "failed to receive request")
}
Expand All @@ -105,7 +112,9 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error)
numRetries := 5
sleepDuration := 1 * time.Second
for i := 0; i < numRetries; i++ {
span.AddEvent("try_broadcast")
if _, txID, err2 := r.broadcast(context, sm, request); err2 != nil {
span.RecordError(err2)
logger.Errorf("failed to broadcast to [%s], txID [%s] with err [%s], retry [%d]", sm.CustodianID, txID, err2, i)
if strings.Contains(err2.Error(), "is not valid") {
err = err2
Expand All @@ -114,6 +123,7 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error)
if len(txID) != 0 {
// was the transaction committed, by any chance?
logger.Errorf("check transaction [%s] status on [%s], retry [%d]", txID, sm.CustodianID, i)
span.AddEvent("fetch_tx_status")
status, err := txStatusFetcher.process(context, &TxStatusRequest{
Network: request.Network,
TxID: txID,
Expand Down Expand Up @@ -145,7 +155,7 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error)
if !done {
broadcastError = fmt.Sprintf("failed to broadcast to [%s] with err [%s]", sm.CustodianID, err)
}
if err := session.Send(&BroadcastResponse{Err: broadcastError}); err != nil {
if err := session.SendWithContext(context.Context(), &BroadcastResponse{Err: broadcastError}); err != nil {
return nil, errors.Wrapf(err, "failed to send response")
}
return nil, nil
Expand Down
13 changes: 11 additions & 2 deletions token/services/network/orion/txstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func NewRequestTxStatusView(network string, namespace string, txID string, dbMan
}

func (r *RequestTxStatusView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("request_tx_status_view")
defer span.End()
sm, err := r.dbManager.GetSessionManager(r.Network)
if err != nil {
return nil, errors.WithMessagef(err, "failed getting session manager for network [%s]", r.Network)
Expand All @@ -59,10 +61,12 @@ func (r *RequestTxStatusView) Call(context view.Context) (interface{}, error) {
Namespace: r.Namespace,
TxID: r.TxID,
}
if err := session.Send(request); err != nil {
span.AddEvent("send_tx_status_request")
if err := session.SendWithContext(context.Context(), request); err != nil {
return nil, errors.Wrapf(err, "failed to send request to custodian [%s]", custodian)
}
response := &TxStatusResponse{}
span.AddEvent("receive_tx_status_response")
if err := session.Receive(response); err != nil {
return nil, errors.Wrapf(err, "failed to receive response from custodian [%s]", custodian)
}
Expand All @@ -74,19 +78,24 @@ type RequestTxStatusResponderView struct {
}

func (r *RequestTxStatusResponderView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("request_tx_status_responder_view")
defer span.End()
// receive request
session := session2.JSON(context)
request := &TxStatusRequest{}
span.AddEvent("receive_tx_status_request")
if err := session.Receive(request); err != nil {
return nil, errors.Wrapf(err, "failed to receive request")
}
logger.Debugf("request: %+v", request)

span.AddEvent("process_tx_status_request")
response, err := r.process(context, request)
if err != nil {
return nil, errors.Wrapf(err, "failed to process request")
}
if err := session.Send(response); err != nil {
span.AddEvent("send_tx_status_response")
if err := session.SendWithContext(context.Context(), response); err != nil {
return nil, errors.Wrapf(err, "failed to send response")
}
return nil, nil
Expand Down
15 changes: 10 additions & 5 deletions token/services/ttx/endorse.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *CollectEndorsementsView) signRemote(context view.Context, party view.Id
if err != nil {
return nil, err
}
err = session.Send(signatureRequestRaw)
err = session.SendWithContext(context.Context(), signatureRequestRaw)
if err != nil {
return nil, errors.Wrap(err, "failed sending transaction content")
}
Expand Down Expand Up @@ -564,7 +564,7 @@ func (c *CollectEndorsementsView) distributeEnv(context view.Context, env *netwo
// Wait to receive a content back
ch := session.Receive()
// Send the content
err = session.Send(txRaw)
err = session.SendWithContext(context.Context(), txRaw)
if err != nil {
return errors.Wrap(err, "failed sending transaction content")
}
Expand Down Expand Up @@ -631,6 +631,8 @@ func NewReceiveTransactionView(network string) *ReceiveTransactionView {
}

func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("receive_tx_view")
defer span.End()
// Wait to receive a transaction back
ch := context.Session().Receive()

Expand All @@ -639,6 +641,7 @@ func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error)

select {
case msg := <-ch:
span.AddEvent("receive_tx")
if msg.Status == view.ERROR {
return nil, errors.New(string(msg.Payload))
}
Expand All @@ -655,7 +658,9 @@ func (f *ReceiveTransactionView) Call(context view.Context) (interface{}, error)
}
return tx, nil
case <-timeout.C:
return nil, errors.New("timeout reached")
err := errors.New("timeout reached")
span.RecordError(err)
return nil, err
}
}

Expand Down Expand Up @@ -760,7 +765,7 @@ func (s *EndorseView) Call(context view.Context) (interface{}, error) {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Send back signature [%s][%s]", signatureRequest.Signer, hash.Hashable(sigma))
}
err = session.Send(sigma)
err = session.SendWithContext(context.Context(), sigma)
if err != nil {
return nil, errors.Wrapf(err, "failed sending signature back")
}
Expand Down Expand Up @@ -792,7 +797,7 @@ func (s *EndorseView) Call(context view.Context) (interface{}, error) {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("ack response: [%s] from [%s]", hash.Hashable(sigma), view2.GetIdentityProvider(context).DefaultIdentity())
}
if err := session.Send(sigma); err != nil {
if err := session.SendWithContext(context.Context(), sigma); err != nil {
return nil, errors.WithMessage(err, "failed sending ack")
}

Expand Down
Loading

0 comments on commit 8625bd5

Please sign in to comment.