-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit Push Graphsync requests #243
Conversation
Codecov Report
@@ Coverage Diff @@
## master #243 +/- ##
==========================================
- Coverage 67.90% 67.88% -0.02%
==========================================
Files 24 25 +1
Lines 3050 3117 +67
==========================================
+ Hits 2071 2116 +45
- Misses 624 640 +16
- Partials 355 361 +6
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly questions.
@@ -27,6 +28,8 @@ var log = logging.Logger("dt_graphsync") | |||
// cancelled. | |||
const maxGSCancelWait = time.Second | |||
|
|||
const defaultMaxInProgressPushRequests = 6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to bubble this config upto to Lotus so we can set to a much higher number just like we do for the Graphsync simultaneous transfer requests ? The default seems low just like we've seen with Graphsync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is the intent -- and probably we'll just apply the same MaxSimultaneousTransfers setting.
return t | ||
} | ||
|
||
// Start starts the request queue for incoming push requests that trigger outgoing | ||
// graphsync requests | ||
func (t *Transport) Start(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we'll need to call this Start
in Lotus after constructing a New Transport, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
@@ -149,19 +171,46 @@ func (t *Transport) OpenChannel(ctx context.Context, | |||
exts = append(exts, doNotSendExt) | |||
} | |||
|
|||
// Start tracking the data-transfer channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
transport/graphsync/graphsync.go
Outdated
ch := t.trackDTChannel(channelID) | ||
|
||
// if we initiated this data transfer or the channel is open, run it immediately, otherwise | ||
// put it in the request queue to rate limit DDOS attacks for push transfers | ||
if channelID.Initiator == t.peerID || ch.isOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hannahhoward Why do we need the ch.isOpen
condition here ?
@dirkmc What does ch.Open
mean semantically for the data-transfer ? What is the difference between two existing channels where one as ch.Open=true
and one has ch.Open=false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once graphsync has called the outgoing requests hook, the channel is open (isOpen
is true).
I think the idea here is that if the transfer is restarted, data-transfer will call OpenChannel
again. So we don't want to put restarts at the back of the queue (OpenChannel
will cancel the existing request and start a new one in its place)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to do a bit more refactoring here.
In the case where a transfer is restarted while the request is still in the queue we probably want to just replace the original request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. So, if a queued transfer gets restarted, this PR will add a duplicate request to the queue, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for push requests I think it will add a duplicate request to the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we actually have a seperate problem -- it will not, ever, even if the request is in progress, due to the way go-peertaskqueue works (merging tasks). honestly, I need to figure this one out a bit more deeply.
@@ -906,6 +956,13 @@ func (c *dtChannel) open(ctx context.Context, chid datatransfer.ChannelID, dataS | |||
// Mark the channel as open and save the Graphsync request key | |||
c.isOpen = true | |||
c.gsKey = &gsKey | |||
if c.startPaused { | |||
err := c.gs.PauseRequest(c.gsKey.requestID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hannahhoward - What is the need for this new code blob ? Please can you add some docs to this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if someone calls pause before the request gets to the front of the queue -- that's what it's for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
@@ -980,6 +1037,11 @@ func (c *dtChannel) pause() error { | |||
c.lk.Lock() | |||
defer c.lk.Unlock() | |||
|
|||
if c.gsKey == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is to support the case where PauseChannel
is called before we've started the graphsync transfer for a channel that's already opened, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
impl/restart_integration_test.go
Outdated
@@ -214,7 +214,7 @@ func TestRestartPush(t *testing.T) { | |||
|
|||
// WAIT FOR DATA TRANSFER TO FINISH -> SHOULD WORK NOW | |||
// we should get 2 completes | |||
_, _, err = waitF(10*time.Second, 2) | |||
_, _, err = waitF(100000*time.Second, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like maybe a change got added while debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops will fix
transport/graphsync/graphsync.go
Outdated
ch := t.trackDTChannel(channelID) | ||
|
||
// if we initiated this data transfer or the channel is open, run it immediately, otherwise | ||
// put it in the request queue to rate limit DDOS attacks for push transfers | ||
if channelID.Initiator == t.peerID || ch.isOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once graphsync has called the outgoing requests hook, the channel is open (isOpen
is true).
I think the idea here is that if the transfer is restarted, data-transfer will call OpenChannel
again. So we don't want to put restarts at the back of the queue (OpenChannel
will cancel the existing request and start a new one in its place)
transport/graphsync/graphsync.go
Outdated
ch := t.trackDTChannel(channelID) | ||
|
||
// if we initiated this data transfer or the channel is open, run it immediately, otherwise | ||
// put it in the request queue to rate limit DDOS attacks for push transfers | ||
if channelID.Initiator == t.peerID || ch.isOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to do a bit more refactoring here.
In the case where a transfer is restarted while the request is still in the queue we probably want to just replace the original request.
// Start tracking the data-transfer channel | ||
|
||
// Open a graphsync request to the remote peer | ||
req, err := ch.open(ctx, channelID, dataSender, root, stor, doNotSendCids, exts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest passing the ch
object, rather than passing channelID
here and then calling getDTChannel(channelID)
in runDeferredRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the value of that? Just seems like putting more data on the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra memory is only the size of a pointer - the value is that you don't have to call getDTChannel(channelID)
because you just keep a reference to the object. Not a biggie though I think it's fine as is.
} | ||
} | ||
|
||
func (orq *RequestQueue) processRequestWorker(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this model there are many threads that all get "woken up" each time a task is added to the queue, and then all try to pop tasks from the queue, but most of the time all but one will get an empty task list.
I wonder if it would be possible to change the model so that there are initially zero go routines running. When a task is added to the queue a new go routine is started to process it. Each time a new go routine is needed it's started, up to some maximum number of go routines. As tasks complete, if the go routine is no longer needed it stops, all the way down to zero go routines.
This would allow us to use the minimum number of required go routines, and we would no longer need the Start()
method.
I'm not sure if it's possible, but it would be nice if it is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be maybe. I'd rather ship that as an improvement. The logic here is taken directly from go-graphsync, which makes me more comfortable in terms of introducing a new concurrency element in that it's at least been battle tested there.
I've refactored the behavior around restart requests so that we get safer behavior out of them. They now go in the request queue, but receive a higher priority. Moreover, the merging behavior should insure:
One more thing remains -- should we cancel the existing request prior to queueing the restart? I think this might be important, cause if we had several stalled push requests, we might never get the restart request to the top of the queue. I will look into doing this. |
@dirkmc @aarshkshah1992 following @dirkmc's suggestion I did one more refactor to hide some of the request details nitty gritty from the request queue. I also made sure we cancel in progress requests prior to queueing deferred restarts. I'm feeling pretty good about where this is at now. |
add a request queue designed to execute push-graphsync requests in a rate limited fashion
integrate request queue into graphsync transport and demonstrate push request rate limiting
put restarts into the queue, ahead of new requests, and don't duplicate for pending
make request queue more abstract by accepting an interface and do cancellations on inprogress requests right away
b2cc5c7
to
2707216
Compare
// Process incoming data | ||
go t.executeGsRequest(req) | ||
// if we initiated this data transfer or the channel is open, run it immediately, otherwise | ||
// put it in the request queue to rate limit DDOS attacks for push transfers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix docs.
// if we initiated this data transfer or the channel is open, run it immediately, otherwise | ||
// put it in the request queue to rate limit DDOS attacks for push transfers | ||
if channelID.Initiator == t.peerID { | ||
// Start tracking the data-transfer channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this line.
|
||
// Open a graphsync request for data to the remote peer | ||
func (c *dtChannel) open(ctx context.Context, chid datatransfer.ChannelID, dataSender peer.ID, root ipld.Link, stor ipld.Node, doNotSendCids []cid.Cid, exts []graphsync.ExtensionData) (*gsReq, error) { | ||
c.lk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hannahhoward - I think we should also keep the "cancel before a new open logic" in here as well by making a call to cancelInProgressRequests
just in case we've missed an edge case where a request is already running for a channel and we pull a deferred request for the same channel from the queue. We can ignore the error of that call but dosen't hurt to have that to keep things easy to reason about.
@@ -152,14 +179,34 @@ func (t *Transport) OpenChannel(ctx context.Context, | |||
// Start tracking the data-transfer channel | |||
ch := t.trackDTChannel(channelID) | |||
|
|||
// Open a graphsync request to the remote peer | |||
req, err := ch.open(ctx, channelID, dataSender, root, stor, doNotSendCids, exts) | |||
wasInProgress, err := ch.cancelInProgressRequests(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the call to cancelInProgressRequests
may be racy if two processes call OpenChannel
at the same time - I'd suggest putting all this logic in a method on dtChannel
so that it can take place under a single lock
// Start tracking the data-transfer channel | ||
|
||
// Open a graphsync request to the remote peer | ||
req, err := ch.open(ctx, channelID, dataSender, root, stor, doNotSendCids, exts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extra memory is only the size of a pointer - the value is that you don't have to call getDTChannel(channelID)
because you just keep a reference to the object. Not a biggie though I think it's fine as is.
if err != nil { | ||
return nil, err | ||
} | ||
c.startPaused = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to set startPaused to false here? What if there's a restart before unsealing completes?
type deferredRequest struct { | ||
t *Transport | ||
ch *dtChannel | ||
channelID datatransfer.ChannelID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: dtChannel already has a channelID
field
// Open a graphsync request to the remote peer | ||
req, err := dr.ch.open(dr.ctx, dr.channelID, dr.dataSender, dr.root, dr.stor, dr.doNotSendCids, dr.exts) | ||
if err != nil { | ||
log.Warnf("error processing request from %s: %s", dr.dataSender, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can surface this error to the calling code? My concern is that a data transfer will fail, but the markets code will not notice so it will get stuck in a "transferring" state forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should invest in making this throttling logic live in go-graphsync!
For sake of completeness and context, here are the big picture paths we aim to throttle:
- Storage deal:
- Client sends PUSH data-transfer request to provider.
- Provider receives PUSH data-transfer request.
- Provider initiates a go-graphsync request. The provider is the graphsync REQUESTOR.
- Client receives the graphsync request, and acts as the RESPONDER.
- Retrieval:
- Client initiates a PULL data-transfer operation locally.
- data-transfer translates this to a graphsync request. The client is the REQUESTOR.
- Provider receives the graphsync request, and acts as the RESPONDER.
Fortunately, we already have throttling at the graphsync layer in place on the RESPONDER side of things. That is, in 1.iv and 2.iii (client during storage deal, provider during retrieval).
We now want to introduce throttling on the REQUESTOR side of things. This PR introduces it in step 1.ii in the data-transfer layer. But I'm convinced this is better handled at 1.iii and 2.ii in the graphsync layer (provider during storage deal, client during retrieval deal).
- The throttling logic ends up being split across go-data-transfer and go-graphsync, making things even harder to reason about and introducing even more complexity.
- IUC, this PR still leaves the retrieval completely unthrottled on the client side.
- This approach makes it impossible to unify throttling under a single "SimultaneousTransfers" configuration parameter.
- The user now has to think about requestor-side and responder-side throttles individually.
- This is not very useful, as I suspect that in most cases folks will want to apply global throttles (in fact, this was the advertised contract of
SimultaneousTransfers
on the Lotus side!). - That said, for advanced use cases it's useful to retain the ability to adjust each side individiually.
I would personally prefer not to move to go-graphsync, at least for now. The underlying design principle contained in the PR description but not spelled out clearly: graphsync actions initiated by a user should not be rate limited, while those initiated by a remote peer should. If I am a client, and I initiate 100 retrievals at the same time to 100 different miners (i.e. outgoing graphsync requests), presumably, I want them to finish as quickly as possible. I probably don't want that rate limited -- if I want to use all my CPU / RAM resources to do it, that's my choice. And, again, this isn't just for retrievals in Filecoin -- presumably Graphsync could be running in IPFS or any Libp2p context. The Data Transfer Push is a unique use of Graphsync. In this case, the outgoing Graphsync request is NOT initiated by a user but by an automated approval of a data transfer push request from a remote peer. Another peer can monopolize your CPU/RAM resources by sending you lots of data transfer Push requests. A client initiating Pull Requests (i.e. a retrieval client) should have the ability to trigger as many requests as possible without being rate limited. On the other hand, a provider who is responding incoming Push requests by triggering an outgoing graphsync request should be rate limited. (Arguably, this could be resolved in Filecoin with the code implementing in go-graphsync since provider and client are always different processes with their own graphsync instances) Ultimately, I think Graphsync needs a Push mode -- there's really no reason we should need to have this extra libp2p protocol and roundtrip. At that point it would make sense to do things inside of go-graphsync, cause then we can rate limit incoming Push requests while not rating limiting outgoing Pull requests. That's an undertaking. If we want to allocate time to doing this and #244 (sort of a prerequisite), I'm definitely game. In the meantime, this is road that stays consistent with the underlying design principle: actions initiated by a user should not be rate limited, but actions initiated by a remote peer should.
|
The main reason why I think that this throttling has its home in go-graphsync is that I think of go-graphsync as an autonomous engine that receives commands to transfer data out or in, whether from the network or the user, it's not a concern of the library. Being an autonomous, fully asynchronous data exchange agent means that it needs to look after its own health and progress. Part of that is observing throttling limits imposed by its environment. I also don't think a low-level composable library like go-graphsync should make assumptions about who and how it's going to be driven or called (by programmatic behaviour or user action). If we place this logic outside of go-graphsync, we are forcing every user to implement the throttling logic outside (I can also imagine go-ipfs or any other "user-driven" application wanting to apply some limit to gracefully handle an avalanche of user requests), so essentially we are obliging all downstream users to re-implement this concern.
What I mean to say is that the original contract of |
Goals
Implementation
For discussion