-
-
Notifications
You must be signed in to change notification settings - Fork 895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel activity send with published #4559
Conversation
You should allow the parallel tasks to be modified by a env var, to allow hosts to increase it as they grow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty great but I do have some major concerns, see my comments. I've only looked at the receiving side so far, not the sending parallelism.
I'm also now realizing that doing this on receiving queue thing is more complex than I thought and thinking maybe it does make sense to reconsider doing it fully on the sender's side, similar though not exactly like @db0 said, since that's the side that has a persistent queue (compared to any in-memory queue we might add).
crates/apub/src/http/inbox.rs
Outdated
|
||
/// Queue of incoming activities, ordered by oldest published first | ||
static ACTIVITY_QUEUE: Lazy<Arc<RwLock<BinaryHeap<InboxActivity>>>> = | ||
Lazy::new(|| Arc::new(RwLock::new(BinaryHeap::new()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue here with missing backpressure: The activity sending system intentionally has backpressure as in when you can't process activities quickly enough your responses will get slower which makes the sender send slower.
Since this heap is unbounded and the HTTP server immediately responds with Ok, regardless of whether the event has been processed, this means that if you are 1M events behind with this change the receiving queue will start receiving events at full speed until all million activities are in RAM here, regardless of how fast you're actually processing them.
This will both cause OOM issues as well as losing all of those events which makes the queue unreliable.
There needs to be backpressure of some form - for example, the heap insert should wait until there's less than a limited amount of items in the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats true. Then the solution would be to have a hashmap with one heap per instance, and if the heap already contains > 5 activities for this origin instance, sleep in http handler until it decreases and then insert.
} | ||
}) | ||
}) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mhh. This is very similar to what I imagined, but it's different in a major way that might need consideration.
The main thing is that you only have a single receive queue but within that queue do fully parallel processing of activities (with delay). I had imagined one fully sequential queue per remote instance, with each queue being processed in parallel.
A single queue has the problem that you still don't guarantee the dependencies are processed in order. Even with the delay, let's say you have these events in the queue:
- T-1.01s: user N upvote post X
- T-1.00s: user N downvote post X
The activities are both received correctly in the queue, and regardless of receive order have been priority-ordered by the timestamp. . But the problem is that both can be taken by different of your available_parallelism
workers at the same time (The 1s delay has been met so both are up for processing). That means the downvote might be processed before the upvote and then wrongly overwritten.
With my suggestion both would be processed sequentially which means this problem doesn't happen.
The advantage of your method is that it also solves a slow rust to database connection (like db0 had) because the internal latency doesn't matter. But the delay alone doesn't really solve the sequentiality problem, so the result can be wrong.
Sequential per-instances queues still solves #4529 the roundtrip between the instances is not part of the latency, only the internal latency remains.
I'm not sure what the best way forward is though. I do see now looking at this code that per-instance receive queues would be fairly complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a different concern. Consider a cluster of X lemmy servers; this cannot be handled in process if order matters and lemmy is intended to scale beyond a single process.
This is something that would normally be handled by an out of process job queue like kafka, sidekiq, rabbitmq, etc.
You push the job into the message queue, response 200, and then move on, on the web server side. Then a (potential) cluster of workers pops off the job from the queue in the background.
Those workers can then either post back to the sender (in the case of lemmy the sending instance) "I've done the work you wanted" or the sender can be given a ticket to check back in with periodically (if there's truly an order that needs enforced on the sending side).
I think that latter strategy (where the sender paces itself based on status updates from the receiver) is out based on what I'm reading as we're working within the constraints of ActivityPub which means your queue needs to be smart enough to block where appropriate and not take the wrong jobs (correct me if I'm wrong -- I'm just now jumping into the lemmy code and ActivityPub after experiencing federation issues from timeouts being hit with a rather low load on the server).
I think at a minimum this would need to use PostgreSQL (without adding software to the lemmy stack) to implement a job queue table that various lemmy servers could pull from when they're not otherwise busy to handle background work. That could also be used to resolve your (@phiresky) concern about multiple jobs being taken at the same time by tracking dependence, and not flagging the latter job for execution until the earlier job is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how practical this is but, it seems like the ideal solution would be to make this eventually consistent by using the timestamp not for ordering but to make accept/reject judgements.
T-1.01s: user N upvote post X
T-1.00s: user N downvote post X
In that case, you receive the first event, it's the most recent state for user N
on post X
so you take that as new current state for user N
. Then you see the second event which happened in the past, and upon processing disregard it as irrelevant.
That can also open the door to a bunch of different potential race condition situations between complex endpoints and database state though (where e.g., two threads are working on changes affecting the same posts/likes/etc at the same time). So, that would probably have to be done selectively based on the "type"(?) of activity.
It might be helpful if such a thing could be done for e.g., likes as they're the most likely to be high frequency requests I'd imagine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race conditions shouldn't be a problem if old events are ignored by doing the upsert with a WHERE
clause that compares the existing row's timestamp. UPDATE
will redo the comparison if needed after locking the row.
https://www.postgresql.org/docs/current/transaction-iso.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race conditions shouldn't be a problem if old events are ignored by doing the upsert with a
WHERE
clause that compares the existing row's timestamp.UPDATE
will redo the comparison if needed after locking the row.https://www.postgresql.org/docs/current/transaction-iso.html
This is true for anything that's represented via a single row. The concern would be more about anything that triggers a complex update that involves multiple tables and/or rows. Perhaps that isn't much of a concern currently though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that using timestamps to reject older events will require some tables to be changed since removal timestamps are not stored for many things. This should be done after #4459 is merged so there's no conflict (or maybe in the same PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your comment on multiple processes is a good point why an in memory queue is probably a bad idea. I think it's possible to solve this whole thing only on the outgoing side without requiring any further changes with what I've mentioned in point 9 of #4529 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we process incoming activities sequentially then we will again run into the same problem sooner or later. Then not caused by high ping but because one instance is sending more activities than the other one can handle in the timeframe. So parallel processing is definitely necessary, also to utilize all available server resources when catching up after an instance fell back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallel processing does already happen automatically when the sender sends requests simulteneously (for the same reason why lemmy is able to handle more than one http request at once). So it is fixable on the sender side only even if individual HTTP responses are "slow"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: could sleep based on remaining time until head activity reaches 1s | ||
// or simply use `WORK_FINISHED_RECHECK_DELAY` from lemmy_federate | ||
sleep(Duration::from_millis(100)).await; | ||
// TODO: need cancel? lemmy seems to shutdown just fine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lemmy shutdown works fine because you're using tokio::spawn
. Tokio spawn is nice but when you stop the process (with any signal), all spawned tasks are immediately stopped polling - this means they will suddenly stop at any await point. This is not good for this queue because then every event still in the queue is essentially lost and any in-flight handling will be broken.
I think this needs to use a non-spawn method of starting the tasks. It's a bit more complex... Because we need to make sure (a) no new items are added to the queue during shutdown and (b) the queue is emptied before the process exits (since it's in memory). You should be able to use the CancellableTask I implemented for the federation. Use that outside of the loop
(not instead of it), and mostly ignore the cancellation token you receive. We don't want to cancel this task except when the queue is empty. That way the task will only allow exiting the process when the queue is fully emptied.
Here's some pseudo code
CancellableTask.spawn(|cancel| {
// this lambda only exits when the queue is completely empty.
while (queue not empty) {
if (oldest queue item less than a second ago) {
sleep (100); continue;
}
take item and process it.
}
if cancel.cancelled { return ao}
else sleep (100) // need to sleep because the task is restarted immediately otherwise
})
Then, in addition, in the queue push handler (shared_inbox function) You need to if the process is shutting down reject the HTTP request with 50x. For example by having the binary heap be optional and erasing it once the process is in the shutdown phase - actually I guess this is similar to that ActivityChannel::retrieve_activity thing we have? There's an optional and shutdown() method there but it's only used for local activities. Maybe some overlap can be used as an advantage.
I'll work on a draft implementation of point (9) of #4529 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just (finally) looked at the sending side of this PR. I see there's a lot of locking added so that the inflight requests can be kept track of - plus the last_successful_id problem isn't yet solved.
I think the sending can instead be solved without any additional locking or state sharing between tasks by using a stream with .buffered(N)
. The main task would use either the stream! macro (it's very useful) or use a mpsc channel with size 10 to send activities (with ReceiverStream. The stream would be mapped to futures of the HTTP result, and polled using while let Some(done) = stream.next().await
to update the federation_queue_state (last_successful_id).
Just realized one more issue with both my above suggestion and this PR is that during failures we also need to keep track of the oldest activity (failing) and store the updated attempts of that in the database. My above mentioned method solveds keeping track of the oldest successful send, but not the oldest failing send. It's also not easy with how you implemented it. Not sure yet what a good solution would be. |
Its no problem if we send the same activity multiple times, as duplicates get ignored by the receiving side. So we only need to store the last successful, and retry from there after restart. At runtime we keep an in-memory list of successful activities, and once eg |
instance_queue.push(InboxActivity { | ||
request_parts, | ||
bytes, | ||
published, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wont work well if published
is in the future, eg due to out of sync clocks. Should probably handle the activity immediately in that case.
Closing this as we will use a different approach. |
This change consists of three main parts:
published
timestamp to outgoing activities. Very simple, same way we add the context field.published
timestamp. Activities are only processed after one second has passed, so that delayed activities can be handled in the correct order.Federation tests are already passing. I tested the parallel sending part manually by creating many posts in api tests, adding some sleep next to activity send, and watching the number of inflight requests which grew to the limit of 5 as expected. For receiving there is a unit test checking that the order is correct. More tests would be good but now sure how to add them without major code changes, suggestions welcome.
It still needs some cleanup to get rid of all the unwraps.
Requires LemmyNet/activitypub-federation-rust#101