Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move block allocation into message queue #140

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, allocator, peerManager)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
Expand Down
13 changes: 11 additions & 2 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type MessageNetwork interface {
}

type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleasePeerMemory(p peer.ID) error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}
Expand Down Expand Up @@ -80,8 +81,16 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Alloc
}
}

// BuildMessage allows you to work modify the next message that is sent in the queue
func (mq *MessageQueue) BuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// AllocateAndBuildMessage allows you to work modify the next message that is sent in the queue.
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (mq *MessageQueue) AllocateAndBuildMessage(size uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
if size > 0 {
select {
case <-mq.allocator.AllocateBlockMemory(mq.p, size):
case <-mq.ctx.Done():
return
}
}
if mq.buildMessage(size, buildMessageFn, notifees) {
mq.signalWork()
}
Expand Down
77 changes: 67 additions & 10 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestStartupAndShutdown(t *testing.T) {
root := testutil.GenerateCids(1)[0]

waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {

// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
waitGroup.Wait()
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestProcessingNotification(t *testing.T) {
status := graphsync.RequestCompletedFull
expectedTopic := "testTopic"
notifee, verifier := testutil.NewTestNotifee(expectedTopic, 5)
messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddResponseCode(responseID, status)
b.AddExtensionData(responseID, extension)
}, []notifications.Notifee{notifee})
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestDedupingMessages(t *testing.T) {
selector := ssb.Matcher().Node()
root := testutil.GenerateCids(1)[0]

messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
}, []notifications.Notifee{})
// wait for send attempt
Expand All @@ -233,7 +233,7 @@ func TestDedupingMessages(t *testing.T) {
selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
root3 := testutil.GenerateCids(1)[0]

messageQueue.BuildMessage(0, func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(0, func(b *gsmsg.Builder) {
b.AddRequest(gsmsg.NewRequest(id2, root2, selector2, priority2))
b.AddRequest(gsmsg.NewRequest(id3, root3, selector3, priority3))
}, []notifications.Notifee{})
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestDedupingMessages(t *testing.T) {
}
}

func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
func TestSendsVeryLargeBlocksResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand All @@ -288,7 +288,7 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {

// generate large blocks before proceeding
blks := testutil.GenerateBlocksOfSize(5, 1000000)
messageQueue.BuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})
waitGroup.Wait()
Expand All @@ -300,13 +300,13 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.True(t, blks[0].Cid().Equals(msgBlks[0].Cid()))

// Send 3 very large blocks
messageQueue.BuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[2].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[2])
}, []notifications.Notifee{})
messageQueue.BuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[3].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[3])
}, []notifications.Notifee{})

Expand All @@ -325,3 +325,60 @@ func TestResponseAssemblerSendsVeryLargeBlocksResponses(t *testing.T) {
require.Len(t, msgBlks, 1, "number of blks in first message was not 1")
require.True(t, blks[3].Cid().Equals(msgBlks[0].Cid()))
}

func TestSendsResponsesMemoryPressure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

p := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage, 0)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)

messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue.Startup()
waitGroup.Add(1)

// start sending block that exceeds memory limit
blks := testutil.GenerateBlocksOfSize(2, 999)
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[0])
}, []notifications.Notifee{})

finishes := make(chan string, 2)
go func() {
// attempt to send second block. Should block until memory is released
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *gsmsg.Builder) {
b.AddBlock(blks[1])
}, []notifications.Notifee{})
finishes <- "sent message"
}()

// assert transaction does not complete within 200ms because it is waiting on memory
ctx2, cancel2 := context.WithTimeout(ctx, 200*time.Millisecond)
select {
case <-finishes:
t.Fatal("transaction failed to wait on memory")
case <-ctx2.Done():
}

// Allow first message to complete sending
<-messagesSent

// assert message is now queued within 200ms
ctx2, cancel2 = context.WithTimeout(ctx, 200*time.Millisecond)
defer cancel2()
select {
case <-finishes:
cancel2()
case <-ctx2.Done():
t.Fatal("timeout waiting for transaction to complete")
}
}
7 changes: 4 additions & 3 deletions peermanager/peermessagemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// PeerQueue is a process that sends messages to a peer
type PeerQueue interface {
PeerProcess
BuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// PeerQueueFactory provides a function that will create a PeerQueue.
Expand All @@ -33,7 +33,8 @@ func NewMessageManager(ctx context.Context, createPeerQueue PeerQueueFactory) *P
}

// BuildMessage allows you to modify the next message that is sent for the given peer
func (pmm *PeerMessageManager) BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
func (pmm *PeerMessageManager) AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.BuildMessage(blkSize, buildMessageFn, notifees)
pq.AllocateAndBuildMessage(blkSize, buildMessageFn, notifees)
}
8 changes: 4 additions & 4 deletions peermanager/peermessagemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type fakePeer struct {
messagesSent chan messageSent
}

func (fp *fakePeer) BuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
buildMessage(builder)
message, err := builder.Build()
Expand Down Expand Up @@ -76,14 +76,14 @@ func TestSendingMessagesToPeers(t *testing.T) {
peerManager := NewMessageManager(ctx, peerQueueFactory)

request := gsmsg.NewRequest(id, root, selector, priority)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
peerManager.BuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[1], 0, func(b *gsmsg.Builder) {
b.AddRequest(request)
}, []notifications.Notifee{})
cancelRequest := gsmsg.CancelRequest(id)
peerManager.BuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
peerManager.AllocateAndBuildMessage(tp[0], 0, func(b *gsmsg.Builder) {
b.AddRequest(cancelRequest)
}, []notifications.Notifee{})

Expand Down
4 changes: 2 additions & 2 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type inProgressRequestStatus struct {

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// AsyncLoader is an interface for loading links asynchronously, returning
Expand Down Expand Up @@ -566,7 +566,7 @@ const requestNetworkError = "request_network_error"
func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
rm.peerHandler.BuildMessage(p, 0, func(builder *gsmsg.Builder) {
rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) {
builder.AddRequest(request)
}, []notifications.Notifee{failNotifee})
}
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type fakePeerHandler struct {
requestRecordChan chan requestRecord
}

func (fph *fakePeerHandler) BuildMessage(p peer.ID, blkSize uint64,
func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *gsmsg.Builder), notifees []notifications.Notifee) {
builder := gsmsg.NewBuilder(gsmsg.Topic(0))
requestBuilder(builder)
Expand Down
21 changes: 4 additions & 17 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,26 @@ type ResponseBuilder interface {
}

// PeerMessageHandler is an interface that can queue a response for a given peer to go out over the network
// If blkSize > 0, message building may block until enough memory has been freed from the queues to allocate the message.
type PeerMessageHandler interface {
BuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// Allocator is an interface that can manage memory allocated for blocks
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildResponseFn func(*gsmsg.Builder), notifees []notifications.Notifee)
}

// ResponseAssembler manages assembling responses to go out over the network
// in libp2p messages
type ResponseAssembler struct {
*peermanager.PeerManager
allocator Allocator
peerHandler PeerMessageHandler
ctx context.Context
}

// New generates a new ResponseAssembler for sending responses
func New(ctx context.Context, allocator Allocator, peerHandler PeerMessageHandler) *ResponseAssembler {
func New(ctx context.Context, peerHandler PeerMessageHandler) *ResponseAssembler {
return &ResponseAssembler{
PeerManager: peermanager.New(ctx, func(ctx context.Context, p peer.ID) peermanager.PeerHandler {
return newTracker()
}),
ctx: ctx,
allocator: allocator,
peerHandler: peerHandler,
}
}
Expand Down Expand Up @@ -110,14 +104,7 @@ func (ra *ResponseAssembler) execute(p peer.ID, operations []responseOperation,
for _, op := range operations {
size += op.size()
}
if size > 0 {
select {
case <-ra.allocator.AllocateBlockMemory(p, size):
case <-ra.ctx.Done():
return
}
}
ra.peerHandler.BuildMessage(p, size, func(builder *gsmsg.Builder) {
ra.peerHandler.AllocateAndBuildMessage(p, size, func(builder *gsmsg.Builder) {
for _, op := range operations {
op.build(builder)
}
Expand Down
Loading