-
-
Notifications
You must be signed in to change notification settings - Fork 372
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SCTP] limit the bytes in the PendingQueue by using a semaphore #367
Conversation
Codecov ReportBase: 59.97% // Head: 59.87% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #367 +/- ##
==========================================
- Coverage 59.97% 59.87% -0.10%
==========================================
Files 504 504
Lines 48070 48000 -70
Branches 12519 12516 -3
==========================================
- Hits 28828 28739 -89
- Misses 10021 10025 +4
- Partials 9221 9236 +15
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Converted to draft because this has some weird behaviour on my branch where I have all the performance PRs merged. It causes the sender to no longer print its Apart from that this works well. Speeds are good (maybe even a little improved probably because of less pressure on the allocator?). And the backpressure does its thing. The amount of packets sent are close the packets received.
Edit: this is even stranger, the receiver too stops the printing at some point (way later) but netstat still shows traffic being delivered and the process is still using the same amount of CPU... wth |
sctp/src/queue/pending_queue.rs
Outdated
|
||
impl PushLimitSemaphore { | ||
/// blocks until the credits where sucessfully taken | ||
fn aquire(&self, credits: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ultimately called from an async context right? It's not goodo to block in a way that doesn't yield to the tokio runtime. In fact if we manage to block all threads it's possible to dead lock the whole runtime since no forward progress an be made to release any permits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell the stream presents a sync interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm nope you are right, I was fooled because write, write_sctp, and append are sync code. They are called a lot in async context.
So these should just become async and use the tokio semaphore?
Edit: The tokio semaphore does not really provide the interface we need, they only allow taking/releasing one permit at a time.
Edit2: TIL tokio does not have a CondVar so building the same thing with async primitives isn't easy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tokio semaphore does not really provide the interface we need, they only allow taking/releasing one permit at a time.
It does? add_permits and acquire_many. acquire_many
presumably blocks until there are enough permits, so it can act as a condvar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.... never mind. I was looking at outdated docs. I'll just update to this.
But it will still be a breaking change to public API, is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually no it doesn't have what we need. It releases the permits as soon as the guard is dropped.
We need to aquire capacity in one place and grant capacity in another place. I don't think that is doable with the API tokio Semaphore presents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you can move the permit guard around but perhaps that's a fools errand. Maybe ask in the tokio discord if they have thoughts, otherwise your lock based approach with a tokio::sync::Notify
is appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll ask on their discord if anyone has a better idea but yeah the Notify based impl would satisfy me too.
I did not see the stange missing prints with the async version so maybe this was actually caused by blocking something... I'll keep it running for a while and see if it occurs again.
In any case what are your thoughts on breaking the API? Do you see a way around it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't care much about breaking because we are still pre 1.0.0 and are finding out the way these things should work to be optimal.
It is interesting that this change would undo work by @melekes in #344 which was quite recent. That's not necessarily wrong though, but it would be good to have @melekes's thoughts too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind bringing back async
if it's needed. I changed it to sync
because there were no reason for it to be async
.
…in if the pending queue is full
Edit 3: Thought it might be nice to not make everyone read my ramblings when they dont have to. I misunderstood the AsyncWrite traits doc language and got confused. You may disregard this comment entirely... Edit: Gotta go back on this. Looking at how tokio implements this for their TCP socket they don't seem to care and flush is just a noop. https://docs.rs/tokio/1.23.0/src/tokio/net/tcp/split.rs.html#374-405 This does NOT seem to match with the language in the AsyncWrite trait Edit 2: On second thought, I might have misinterpreted the traits language, flush apparently doesn't need to wait until all data has been received, it just has to make sure it will be received at some point. So as long as the data has been put into the pending queue and the association lives long enough to send all the data we should be fine...? -- original -- I think the AsyncWriter implementation was and is broken with respect to the poll_flush and poll_shutdown functions.
I'll need your opinion on how to proceed on this. I think it's clear that this is an entire new can of worms that I'd like not to open in this PR. Secondly I don't think this PR makes the situation significantly worse than before. poll_flush is now essentially a no-op, but it didn't do the right thing before this PR anyways. Honestly I'd recommend just deleting (or disabling) the AsyncWriter impl on the Stream. At least poll_flush() does not do what the trait says it must do. (I think the same goes for the implementation for the datachannel). I'm sorry if this sounds harsh, because it looks like a lot of work went into these, but the current stream just doesn't support a correct implementation of poll_flush. I thought about how to fix this but it definitely isn't easy because you need to tie the amount of buffered / inflight bytes to some kind of signalling that needs to be reset if new bytes came in before anyone could see the signal. To summarize the options in order of my preference:
Sorry that this PR is such a mess with regards to the AsyncWriter impl, I am learning about this stuff as I go. |
I'd recommend to be critical about adding more config options as they tend to grow indefinitely and most users don't change defaults any way. In this case, if we know upper bound of UDP socket, then can't we deduct the max queue len from it? |
Agreed. I think finding out the upper limit for the UDP sockets is platform dependent. One can get the current sendbuf size, but I don't know if this is a good value it might be too little for optimal througput. Just playing around with the limit indicates that as little as 128kB as the upper limit seems enough to sustain maximum throughput. MacOS seems to use this value as the send buffer size for TCP according to |
Just realized this has a bug with messages that are bigger than the limit, they will never be able to acquire enough permits. But the append function is used with this comment Edit:
So, this is important. |
I think if chunks interlace, the reader won't be able to reconstruct the message and will skip it entirely. |
Yeah the rfc states that the fragments of one message must be in sequential TSNs. Fixed it by serializing the access to the semaphore. This whole thing is kinda wonky now, but I hope my comments in the code explain whats going on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your work 👍 while I do agree with the general direction, this PR brings more complexity, which we should try to avoid. A possible solution is to make inner pending queues non blocking, which may require some architecture changes. However, I believe we should still try to seek a solution easier to maintain and understand for future contributors.
I don't agree. The point of this PR is, that the Association might be sending slower than the application is pushing new data into it. If this situation exists for a sustained amount of time, the data will accumulate in the pending queue. This leads to more and more memory being allocated and in the end the OOM killer will kick in. Edit: to make my point clearer: at some point the association needs to apply backpressure to the application. Otherwise at some point of the system the data will accumulate as described above. There might be other ways / better places to apply this concept, but it is necessary IMO. Edit 2: one example for these situations is the throughput example that started all of these PRs. If this PR is not merged, the throughput example just sends packets as fast as possible which overloads my 16Gb of RAM in seconds causing the OOM killer to kick in. |
Co-authored-by: Anton <[email protected]>
Co-authored-by: Anton <[email protected]>
I agree ☝️ To make my point more clear 😄 I'd rather see simple bounded channels / queues than the current version with locks & semaphores. But I guess we can refactor later |
I don't see how that would work, but if it does it would for sure be better than this weird mutex-semaphore combo. |
let mut ordered_queue = self.ordered_queue.write(); | ||
ordered_queue.push_back(c); | ||
} | ||
drop(sem_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure we need to drop lock manually? why it's not dropped automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope it's not required, but for functions where the Lock is not needed the whole time I like to do it like this. I know this would be dropped and unlocked at the end of the block anyways but I think it communicates intent better.
If you don't like it, it can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially since the construct with the sem_lock isn't required by any language level checks to actually push to the queue I think it is good to make sure the lock actually lives as long, but not longer than needed.
// unwrap ok because we never close the semaphore unless we have dropped self | ||
permits.unwrap().forget(); | ||
self.append_unlimited(chunks, total_user_data_len); | ||
drop(sem_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
self.queue_len.fetch_add(1, Ordering::SeqCst); | ||
} | ||
|
||
drop(sem_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Co-authored-by: Anton <[email protected]>
Co-authored-by: Anton <[email protected]>
As discussed in #360 the pending queue can grow indefinitly if the sender writes packets faster than the association is able to transmit them.
This PR solves this by enforcing a limit on the pendig queue. This blocks the sender until enough space is free.