Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incoming Block Hooks #68

Merged
merged 7 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ type IncomingResponseHookActions interface {
UpdateRequestWithExtensions(...ExtensionData)
}

// IncomingBlockHookActions are actions that incoming block hook can take
// to change the execution of a request
type IncomingBlockHookActions interface {
TerminateWithError(error)
UpdateRequestWithExtensions(...ExtensionData)
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
// change execution of the response
type RequestUpdatedHookActions interface {
Expand All @@ -197,9 +204,18 @@ type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions Inco

// OnIncomingResponseHook is a hook that runs each time a new response is received.
// It receives the peer that sent the response and all data about the response.
// If it returns an error processing is halted and the original request is cancelled.
// It receives an interface for customizing how we handle the ongoing execution of the request
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData, hookActions IncomingResponseHookActions)

// OnIncomingBlockHook is a hook that runs each time a new block is validated as
// part of the response, regardless of whether it came locally or over the network
// It receives that sent the response, the most recent response, a link for the block received,
// and the size of the block received
// The difference between BlockSize & BlockSizeOnWire can be used to determine
// where the block came from (Local vs remote)
// It receives an interface for customizing how we handle the ongoing execution of the request
type OnIncomingBlockHook func(p peer.ID, responseData ResponseData, blockData BlockData, hookActions IncomingBlockHookActions)

// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
// It receives an interface for customizing how we handle executing this request
Expand Down Expand Up @@ -237,6 +253,9 @@ type GraphExchange interface {
// RegisterIncomingResponseHook adds a hook that runs when a response is received
RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
RegisterIncomingBlockHook(OnIncomingBlockHook) UnregisterHookFunc

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc

Expand Down
10 changes: 9 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type GraphSync struct {
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -75,7 +76,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
asyncLoader := asyncloader.New(ctx, loader, storer)
incomingResponseHooks := requestorhooks.NewResponseHooks()
outgoingRequestHooks := requestorhooks.NewRequestHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks)
incomingBlockHooks := requestorhooks.NewBlockHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
Expand All @@ -102,6 +104,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
Expand Down Expand Up @@ -169,6 +172,11 @@ func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResp
return gs.completedResponseListeners.Register(listener)
}

// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
return gs.incomingBlockHooks.Register(hook)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
61 changes: 61 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,67 @@ func TestPauseResumeViaUpdate(t *testing.T) {
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var receivedReponseData []byte
var receivedUpdateData []byte
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

stopPoint := 50
blocksReceived := 0
requestor.RegisterIncomingBlockHook(func(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
blocksReceived++
if response.Status() == graphsync.RequestPaused && blocksReceived == stopPoint {
var has bool
receivedReponseData, has = response.Extension(td.extensionName)
if has {
hookActions.UpdateRequestWithExtensions(td.extensionUpdate)
}
}
})

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
blocksSent := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
_, has := requestData.Extension(td.extensionName)
if has {
blocksSent++
if blocksSent == stopPoint {
hookActions.SendExtensionData(td.extensionResponse)
hookActions.PauseResponse()
}
} else {
hookActions.TerminateWithError(errors.New("should have sent extension"))
}
})
responder.RegisterRequestUpdatedHook(func(p peer.ID, request graphsync.RequestData, update graphsync.RequestData, hookActions graphsync.RequestUpdatedHookActions) {
var has bool
receivedUpdateData, has = update.Extension(td.extensionName)
if has {
hookActions.UnpauseResponse()
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
14 changes: 11 additions & 3 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.R

unverifiedBlockStore := unverifiedblockstore.New(storer)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
Expand All @@ -326,11 +326,19 @@ func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.R
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return localData, nil
return types.AsyncLoadResult{
Data: localData,
Err: nil,
Local: true,
}
}
}
}
return data, err
return types.AsyncLoadResult{
Data: data,
Err: err,
Local: false,
}
})

return responseCache, loadAttemptQueue
Expand Down
18 changes: 5 additions & 13 deletions requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ func NewLoadRequest(requestID graphsync.RequestID,
}

// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(graphsync.RequestID, ipld.Link) ([]byte, error)
// and returns an async load result
type LoadAttempter func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult

// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
Expand All @@ -48,14 +45,9 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
response, err := laq.loadAttempter(lr.requestID, lr.link)
if err != nil {
lr.resultChan <- types.AsyncLoadResult{Data: nil, Err: err}
close(lr.resultChan)
return
}
if response != nil {
lr.resultChan <- types.AsyncLoadResult{Data: response, Err: nil}
response := laq.loadAttempter(lr.requestID, lr.link)
if response.Err != nil || response.Data != nil {
lr.resultChan <- response
close(lr.resultChan)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
callCount++
return testutil.RandomBytes(100), nil
return types.AsyncLoadResult{
Data: testutil.RandomBytes(100),
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -45,9 +47,11 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
callCount++
return nil, fmt.Errorf("something went wrong")
return types.AsyncLoadResult{
Err: fmt.Errorf("something went wrong"),
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -69,13 +73,15 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}

loadAttemptQueue := New(loadAttempter)
Expand All @@ -99,14 +105,16 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}
loadAttemptQueue := New(loadAttempter)

Expand All @@ -132,14 +140,16 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(graphsync.RequestID, ipld.Link) ([]byte, error) {
loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return types.AsyncLoadResult{
Data: result,
}
}
loadAttemptQueue := New(loadAttempter)

Expand Down
44 changes: 44 additions & 0 deletions requestmanager/hooks/blockhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package hooks

import (
"github.com/hannahhoward/go-pubsub"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
)

// IncomingBlockHooks is a set of incoming block hooks that can be processed
type IncomingBlockHooks struct {
pubSub *pubsub.PubSub
}

type internalBlockHookEvent struct {
p peer.ID
response graphsync.ResponseData
block graphsync.BlockData
rha *updateHookActions
}

func blockHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalBlockHookEvent)
hook := subscriberFn.(graphsync.OnIncomingBlockHook)
hook(ie.p, ie.response, ie.block, ie.rha)
return ie.rha.err
}

// NewBlockHooks returns a new list of incoming request hooks
func NewBlockHooks() *IncomingBlockHooks {
return &IncomingBlockHooks{pubSub: pubsub.New(blockHookDispatcher)}
}

// Register registers an extension to process incoming responses
func (ibh *IncomingBlockHooks) Register(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(ibh.pubSub.Subscribe(hook))
}

// ProcessBlockHooks runs response hooks against an incoming response
func (ibh *IncomingBlockHooks) ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) UpdateResult {
rha := &updateHookActions{}
_ = ibh.pubSub.Publish(internalBlockHookEvent{p, response, block, rha})
return rha.result()
}
Loading