Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify response completion #304

Merged
merged 2 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ const (
Running
// Paused means a request is paused
Paused
// CompletingSend means we have processed a query and are waiting for data to
// go over the network
CompletingSend
)

func (rs RequestState) String() string {
Expand All @@ -355,6 +358,8 @@ func (rs RequestState) String() string {
return "running"
case Paused:
return "paused"
case CompletingSend:
return "completing send"
default:
return "unrecognized request state"
}
Expand Down
2 changes: 0 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager,
outgoingBlockHooks,
requestUpdatedHooks,
requestorCancelledListeners,
responseAssembler,
network.ConnectionManager(),
)
graphSync := &GraphSync{
network: network,
Expand Down
10 changes: 10 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
}

// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
Expand Down
14 changes: 14 additions & 0 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,17 @@ func (psm *peerStateMessage) handle(rm *ResponseManager) {
case <-rm.ctx.Done():
}
}

type terminateRequestMessage struct {
p peer.ID
requestID graphsync.RequestID
done chan<- struct{}
}

func (trm *terminateRequestMessage) handle(rm *ResponseManager) {
rm.terminateRequest(responseKey{trm.p, trm.requestID})
select {
case <-rm.ctx.Done():
case trm.done <- struct{}{}:
}
}
37 changes: 10 additions & 27 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
Expand Down Expand Up @@ -54,32 +53,26 @@ type ResponseSignals struct {

// QueryExecutor is responsible for performing individual requests by executing their traversals
type QueryExecutor struct {
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
connManager network.ConnManager
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
responseAssembler ResponseAssembler
}

// New creates a new QueryExecutor
func New(ctx context.Context,
manager Manager,
blockHooks BlockHooks,
updateHooks UpdateHooks,
cancelledListeners CancelledListeners,
responseAssembler ResponseAssembler,
connManager network.ConnManager,
) *QueryExecutor {
qm := &QueryExecutor{
blockHooks: blockHooks,
updateHooks: updateHooks,
cancelledListeners: cancelledListeners,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
connManager: connManager,
blockHooks: blockHooks,
updateHooks: updateHooks,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
}
return qm
}
Expand All @@ -106,11 +99,6 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
isCancelled := err != nil && ipldutil.IsContextCancelErr(err)
if isCancelled {
qe.connManager.Unprotect(pid, rt.Request.ID().Tag())
qe.cancelledListeners.NotifyCancelledListeners(pid, rt.Request)
}
qe.manager.FinishTask(task, err)
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
return false
Expand Down Expand Up @@ -286,11 +274,6 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
Expand Down
72 changes: 25 additions & 47 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
Expand All @@ -44,7 +43,6 @@ func TestOneBlockTask(t *testing.T) {
notifeeExpect(t, td, 1, td.responseCode)
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
}

func TestSmallGraphTask(t *testing.T) {
Expand Down Expand Up @@ -83,7 +81,6 @@ func TestSmallGraphTask(t *testing.T) {
notifeeExpect(t, td, 10, td.responseCode) // AddNotifee called on all blocks
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by hook", func(t *testing.T) {
Expand All @@ -98,7 +95,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by signal", func(t *testing.T) {
Expand All @@ -117,7 +113,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("partial cancelled by hook", func(t *testing.T) {
Expand All @@ -130,7 +125,6 @@ func TestSmallGraphTask(t *testing.T) {
transactionExpect(t, td, []int{6, 7}, ipldutil.ContextCancelError{}.Error()) // last 2 transactions are ContextCancelled

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.cancelledCalls)
require.Equal(t, 1, td.clearRequestCalls)
})

Expand All @@ -153,7 +147,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, 0, td.clearRequestCalls)
// cancelled by signal doesn't mean we get a cancelled call here
// ErrCancelledByCommand is treated differently to a context cancellation error
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by hook", func(t *testing.T) {
Expand All @@ -168,7 +161,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by signal", func(t *testing.T) {
Expand All @@ -189,7 +181,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by hook", func(t *testing.T) {
Expand All @@ -204,7 +195,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by signal", func(t *testing.T) {
Expand All @@ -225,7 +215,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("first block wont load", func(t *testing.T) {
Expand All @@ -238,7 +227,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})
}

Expand Down Expand Up @@ -277,34 +265,31 @@ func newRandomBlock(index int64) *blockData {
}

type testData struct {
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
connManager *testutil.TestConnManager
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
cancelledListeners *listeners.RequestorCancelledListeners
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
cancelledCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
}

func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, *QueryExecutor) {
Expand All @@ -318,10 +303,8 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.task = &peertask.Task{}
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task}
td.responseAssembler = &fauxResponseAssembler{}
td.connManager = testutil.NewTestConnManager()
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
td.requestID = graphsync.RequestID(rand.Int31())
td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
td.requestSelector = basicnode.NewInt(rand.Int63())
Expand Down Expand Up @@ -401,18 +384,13 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.responseAssembler.responseBuilder.pauseCb = func() {
td.pauseCalls++
}
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.cancelledCalls++
})

qe := New(
td.ctx,
td.manager,
td.blockHooks,
td.updateHooks,
td.cancelledListeners,
td.responseAssembler,
td.connManager,
)
return td, qe
}
Expand Down
4 changes: 2 additions & 2 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestCancellationQueryInProgress(t *testing.T) {
})
cancelledListenerCalled := make(chan struct{}, 1)
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.connManager.RefuteProtected(t, td.p)
cancelledListenerCalled <- struct{}{}
})
responseManager.Startup()
Expand All @@ -105,6 +104,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
close(waitForCancel)

testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()
}
Expand Down Expand Up @@ -1138,7 +1138,7 @@ func (td *testData) alternateLoaderResponseManager() *ResponseManager {
}

func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecutor.QueryExecutor {
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.cancelledListeners, td.responseAssembler, td.connManager)
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.responseAssembler)
}

func (td *testData) assertPausedRequest() {
Expand Down
Loading