From 2665262e5868b440dc7216c05f6c261ac7586ebd Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 12 Feb 2021 14:00:58 +0100 Subject: [PATCH 1/2] fix: better error message on complete --- errors.go | 3 -- impl/events.go | 8 +++-- impl/responding_test.go | 4 +-- transport.go | 6 ++-- transport/graphsync/graphsync.go | 51 +++++++++++++++++++++++---- transport/graphsync/graphsync_test.go | 4 +-- 6 files changed, 56 insertions(+), 20 deletions(-) diff --git a/errors.go b/errors.go index be3b5462..f50bcbfc 100644 --- a/errors.go +++ b/errors.go @@ -25,9 +25,6 @@ const ErrPause = errorType("pause channel") // use to resume the channel const ErrResume = errorType("resume channel") -// ErrIncomplete indicates a channel did not finish transferring data successfully -const ErrIncomplete = errorType("incomplete response") - // ErrRejected indicates a request was not accepted const ErrRejected = errorType("response rejected") diff --git a/impl/events.go b/impl/events.go index ef285ac0..b9f8e9b6 100644 --- a/impl/events.go +++ b/impl/events.go @@ -286,8 +286,8 @@ func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.C return nil } -func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error { - if success { +func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error { + if completeErr == nil { if chid.Initiator != m.peerID { msg, err := m.completeMessage(chid) if err != nil { @@ -316,7 +316,9 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) } // send an error, but only if we haven't already errored for some reason if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed { - return m.channels.Error(chid, datatransfer.ErrIncomplete) + err := xerrors.Errorf("data transfer channel %s failed to transfer data: %w", completeErr) + log.Warnf(err.Error()) + return m.channels.Error(chid, err) } return nil } diff --git a/impl/responding_test.go b/impl/responding_test.go index d0ed67d3..bde23473 100644 --- a/impl/responding_test.go +++ b/impl/responding_test.go @@ -471,7 +471,7 @@ func TestDataTransferResponding(t *testing.T) { verify: func(t *testing.T, h *receiverHarness) { _, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest) require.NoError(t, err) - err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), true) + err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), nil) require.NoError(t, err) require.Len(t, h.network.SentMessages, 1) response, ok := h.network.SentMessages[0].Message.(datatransfer.Response) @@ -507,7 +507,7 @@ func TestDataTransferResponding(t *testing.T) { verify: func(t *testing.T, h *receiverHarness) { _, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest) require.NoError(t, err) - err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), false) + err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), xerrors.Errorf("err")) require.NoError(t, err) }, }, diff --git a/transport.go b/transport.go index 6abe412a..9f9812c9 100644 --- a/transport.go +++ b/transport.go @@ -49,9 +49,9 @@ type EventsHandler interface { // - err == ErrPause - pause this request (only for new requests) // - err == ErrResume - resume this request (only for update requests) OnRequestReceived(chid ChannelID, msg Request) (Response, error) - // OnResponseCompleted is called when we finish sending data for the given channel ID - // Error returns are logged but otherwise have not effect - OnChannelCompleted(chid ChannelID, success bool) error + // OnChannelCompleted is called when we finish transferring data for the given channel ID + // Error returns are logged but otherwise have no effect + OnChannelCompleted(chid ChannelID, err error) error // OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out. // Error returns are logged but otherwise have no effect diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 15f1bdc7..7f702289 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -165,7 +165,12 @@ func (t *Transport) executeGsRequest(ctx context.Context, internalCtx context.Co } log.Debugf("finished executing graphsync request for channel %s", channelID) - err := t.events.OnChannelCompleted(channelID, lastError == nil) + + var completeErr error + if lastError != nil { + completeErr = xerrors.Errorf("graphsync request failed to complete: %w", lastError) + } + err := t.events.OnChannelCompleted(channelID, completeErr) if err != nil { log.Error(err) } @@ -533,13 +538,45 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req return } - if status != graphsync.RequestCancelled { - success := status == graphsync.RequestCompletedFull - err := t.events.OnChannelCompleted(chid, success) - if err != nil { - log.Error(err) - } + if status == graphsync.RequestCancelled { + return + } + + var completeErr error + if status != graphsync.RequestCompletedFull { + statusStr := gsResponseStatusCodeString(status) + completeErr = xerrors.Errorf("graphsync response to peer %s did not complete: response status code %s", p, statusStr) + } + err := t.events.OnChannelCompleted(chid, completeErr) + if err != nil { + log.Error(err) + } +} + +// Remove this map once this PR lands: https://github.com/ipfs/go-graphsync/pull/148 +var gsResponseStatusCodes = map[graphsync.ResponseStatusCode]string{ + graphsync.RequestAcknowledged: "RequestAcknowledged", + graphsync.AdditionalPeers: "AdditionalPeers", + graphsync.NotEnoughGas: "NotEnoughGas", + graphsync.OtherProtocol: "OtherProtocol", + graphsync.PartialResponse: "PartialResponse", + graphsync.RequestPaused: "RequestPaused", + graphsync.RequestCompletedFull: "RequestCompletedFull", + graphsync.RequestCompletedPartial: "RequestCompletedPartial", + graphsync.RequestRejected: "RequestRejected", + graphsync.RequestFailedBusy: "RequestFailedBusy", + graphsync.RequestFailedUnknown: "RequestFailedUnknown", + graphsync.RequestFailedLegal: "RequestFailedLegal", + graphsync.RequestFailedContentNotFound: "RequestFailedContentNotFound", + graphsync.RequestCancelled: "RequestCancelled", +} + +func gsResponseStatusCodeString(code graphsync.ResponseStatusCode) string { + str, ok := gsResponseStatusCodes[code] + if ok { + return str } + return gsResponseStatusCodes[graphsync.RequestFailedUnknown] } func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncKey) { diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 8a13415d..c477b2db 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -1041,9 +1041,9 @@ func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response d return err } -func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error { +func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error { fe.OnChannelCompletedCalled = true - fe.ChannelCompletedSuccess = success + fe.ChannelCompletedSuccess = completeErr == nil return fe.OnChannelCompletedErr } From 8b2f6fcf8861fcd192fc0f66c4d32fb1f3d9b1e0 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 12 Feb 2021 15:52:42 +0100 Subject: [PATCH 2/2] fix: on complete error message --- impl/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/events.go b/impl/events.go index b9f8e9b6..4d799029 100644 --- a/impl/events.go +++ b/impl/events.go @@ -316,7 +316,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr er } // send an error, but only if we haven't already errored for some reason if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed { - err := xerrors.Errorf("data transfer channel %s failed to transfer data: %w", completeErr) + err := xerrors.Errorf("data transfer channel %s failed to transfer data: %w", chid, completeErr) log.Warnf(err.Error()) return m.channels.Error(chid, err) }