Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataplane mockup #135

Merged
merged 60 commits into from
Sep 4, 2018
Merged

Dataplane mockup #135

merged 60 commits into from
Sep 4, 2018

Conversation

frankmcsherry
Copy link
Member

This PR is mostly for entertainment / sharing at the moment. It mocks up a dataplane setting in the process-local context by forcing all channels to serialize data to byte buffers which are then swapped between workers, and read back out the next time around. The goal here is to have a setting in which serialization occurs but without the overheads of communication threads, sockets, the kernel, etc. This should let us get a clearer view of what overheads exist in a dataplane implementation.

This introduces a few interesting concepts, like that

  1. perhaps the Allocator trait should have functions pre_work(&mut self) and post_work(&mut self) which should be called before and after doing any work (in our case, collecting messages and flushing unsent buffers, respectively).

  2. Allocators may need builders, because they may contain strictly thread-local types (e.g. Rc) which cannot be moved between threads, and therefore should be built only once in-thread.

  3. The new ProcessBinary allocator has channels serialize to a common Vec<u8> which is then "put on the wire" as a large hunk, rather than the Binary allocator which has them serialize into a fresh Vec<u8> each time (barf!).

@JosephWagner JosephWagner mentioned this pull request Feb 22, 2018
@frankmcsherry frankmcsherry mentioned this pull request Feb 22, 2018
@frankmcsherry
Copy link
Member Author

The current version now uses Bytes from the ./bytes crate, a simplified version of the crate of the same name. The intent is that instead of minting owned Vec<u8> allocations from incoming binary data, we hand out disjoint mutable views over the input bytes, which then behave similarly.

This has a few consequences, one of which is that it is very easy to accidentally hold on to the Bytes instances, and thereby block the reclamation of resources (the underlying shared allocation). This currently has the effect of blocking further transmission, as the optional sends in publish() only occur when all buffers have been returned. Perhaps this should be simplified to a different signaling protocol.

Performance-wise, the exchange.rs example seems to hit the same throughput on two workers as timely master, where we trade a memory copy from serialization against the extra allocations required in the owned case. There should be cases where each approach is better than the other, and we should probably check out a fair bit more cases before merging.

@JosephWagner
Copy link
Contributor

I'm having a hard time envisioning cases where the old method would perform better than the new method. Maybe, perhaps, in the case where there are only a handful of small messages?

I'm happy to check out more cases -- were you envisioning a synthetic test where we tweak message size and other parameters, or did you mean looking at performance on 'real-world' workloads?

@@ -54,20 +55,24 @@ pub mod rc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Rc<B>,
sequestered: Rc<Box<Any>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to switch to Box<Any>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this was because otherwise the B type pervades the rest of the code, and everything needs to be generic with respect to B, or written for one specific B, and it seemed likely that we might have more than one type (e.g. Vec<u8> for process-local stuff, and maybe something different (Arc<Vec<u8>>?) for networking stuff).

(on the road at the moment, will try to be more helpful soon!)

@frankmcsherry
Copy link
Member Author

I'm having a hard time envisioning cases where the old method would perform better than the new method. Maybe, perhaps, in the case where there are only a handful of small messages?

I think the main issue would be that the serialization that happens here isn't (yet) for free, so it turns in to an additional copy that the current version avoids. On the other hand, the current version has do screw around with memory a bit more. In principle, the new version could serialize in place rather than writing typed data and then serializing, but very much "in principle" at the moment.

Another case is where for whatever reason one worker wants to send another worker a massive amount of state. For example, @antiguru has a project where he re-balances work between multiple workers, and if they happen to be in the same process this could be very cheap with an owned-pointer (just move the pointer) but relatively expensive if everything needs to be serialized.

Now back from traveling, and will attempt to start to think about some of these things. :)

@frankmcsherry
Copy link
Member Author

The most recent commit is a fairly major revamp of communication, which promotes the existence of binary serialized data more prominently. The intent here is to make it easier (and more transparent) to work with different representations of data. Previously this was done with the Content type which was backed by either typed or binary data, and between which we tap-danced carefully.

The communication layer now exchanges Message<T> types, where a message contains a TypedOrBinary enum, which looks like so:

enum TypedOrBinary<T> {
    Binary(Abomonated<T, Bytes>),
    Typed(T),
}

The idea is that each thing your receive from (or send to) the communication plane can be either binary or typed, and you need to deal with that.

To help you "deal with that" there is another enum, RefOrMut, which looks like so:

pub enum RefOrMut<'a, T> where T: 'a {
    Ref(&'a T),
    Mut(&'a mut T),
}

This enum allows us to transit references that are either uniquely owned (and suitable for claiming, with something like ::std::mem::swap()) or references that can be read but not otherwise interacted with. The general pattern is that binary data give rise to RefOrMut::Ref instances and typed data give rise to RefOrMut::Mut instances.

This is all relatively new, so there are some ergonomic pain points. One is that the output sessions in operators used to have a give_content method that would accept either typed or serialized data, which made it easy to pass data along through e.g. concat and various enter and leave operations. This no longer exists, intentionally as part of an effort to avoid "dangling" references to ref-counted binary data; if you want to send something you currently need to turn it in to typed data (or somehow take ownership).

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 4, 2018

We are going to land this. There is one main breaking change, which is that the unary and binary operators provide data as a RefOrMut<Vec<Data>>. This type wraps either owned data or serialized data, and it is your job to extract it!

The previous version of the code has a Content type that implements DerefMut and does deserialization there, when you need mutable access. This is, retrospectively, very distasteful and you should be able to pull the data out yourself!

The RefOrMut type implements Deref, so you are able to act as if you have a &Vec<Data> without doing any work. However, to get owned access to the contained records you have to do some work.

The most common idiom is to maintain a vector into which you swap the data. For example,

    fn map<D2: Data>(&self, logic: impl Fn(D)->D2+'static) -> Stream<S, D2> {
        let mut vector = Vec::new();
        self.unary(Pipeline, "Map", move |_,_| move |input, output| {
            input.for_each(|time, data| {
                data.swap(&mut vector);
                output.session(&time).give_iterator(vector.drain(..).map(|x| logic(x)));
            });
        })
    }

Here the data.swap(&mut vector) call with either acquire the owned data if it is indeed owned, or deserialize into vector if it is serialized data. Unlike the prior version of the code, it will not allocate a new vector for you (hooray!).

I think this is the only "problematic" break, although it should be pretty easy to fix. If that is wrong (it is hard to fix, or there are other breaks) drop a note here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants