Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataplane mockup #135

Merged
merged 60 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
a0bf351
Merge branch 'bytes' into dataplane_mockup
frankmcsherry Feb 20, 2018
33b40a3
dataplane mockp
frankmcsherry Feb 20, 2018
0eae052
Merge branch 'master' into dataplane_mockup
frankmcsherry Feb 20, 2018
96748ca
ByteExchange added
frankmcsherry Feb 22, 2018
8330cb8
don't panic on shutdown returning empty buffers
frankmcsherry Feb 22, 2018
666a710
updates
frankmcsherry Feb 22, 2018
58d9f70
Merge branch 'master' into dataplane_mockup
frankmcsherry Feb 22, 2018
2dcfe1a
tracking master
frankmcsherry Mar 2, 2018
72290be
Merge branch 'master' into dataplane_mockup
frankmcsherry Mar 14, 2018
4826c63
merge
frankmcsherry May 7, 2018
ecb6bbf
Merge branch 'master' into dataplane_mockup
frankmcsherry May 7, 2018
f895f28
default size
frankmcsherry May 7, 2018
f7574fe
remove vestigial drain
frankmcsherry May 10, 2018
2850a71
new trait
frankmcsherry May 11, 2018
2bc8ab8
bytes update
frankmcsherry May 14, 2018
7e1929b
communication update
frankmcsherry May 15, 2018
f0bf2fd
bytes oriented
frankmcsherry May 15, 2018
20064dd
communication revamp
frankmcsherry May 28, 2018
4eeb0d3
merge
frankmcsherry May 28, 2018
8030b45
use Message::push_at
frankmcsherry May 28, 2018
0a9c7d6
no clue
frankmcsherry May 29, 2018
802bc40
merge master
frankmcsherry Jun 7, 2018
7f5aefd
Merge branch 'master' into dataplane_mockup
frankmcsherry Jun 7, 2018
35ed728
Merge branch 'master' into dataplane_mockup
frankmcsherry Jun 12, 2018
b314103
merge, examples fixed
frankmcsherry Jun 12, 2018
dcf1595
merge
frankmcsherry Jun 12, 2018
9fd21fd
Merge branch 'master' into dataplane_mockup
frankmcsherry Jun 21, 2018
9e80451
Merge branch 'master' into dataplane_mockup
frankmcsherry Jun 26, 2018
8875539
re-enable logging
frankmcsherry Jun 26, 2018
e832f76
reintroduce broadcast
frankmcsherry Jun 26, 2018
42c47e9
push more
frankmcsherry Jun 27, 2018
1e31738
re-enable execution filtering
frankmcsherry Jun 28, 2018
0ccc5f7
re-introduce thread process
frankmcsherry Jun 28, 2018
e0cd029
merge
frankmcsherry Jul 4, 2018
d585791
Merge branch 'master' into dataplane_mockup
frankmcsherry Jul 9, 2018
3c2f4ec
Merge branch 'master' into dataplane_mockup
frankmcsherry Jul 25, 2018
667bcc6
new tcp beginnings
frankmcsherry Jul 26, 2018
ae34b8f
compiling checkpoint
frankmcsherry Jul 27, 2018
1ae27f1
reorganization
frankmcsherry Jul 27, 2018
b09e61b
hello example works
frankmcsherry Jul 31, 2018
b39dff2
seems to work
frankmcsherry Jul 31, 2018
6cfff5a
working, better perf
frankmcsherry Aug 2, 2018
372752e
more reliable performance
frankmcsherry Aug 3, 2018
62629f6
unknown
frankmcsherry Aug 13, 2018
3392352
end with empty message
frankmcsherry Aug 13, 2018
faf245c
re-export communication
frankmcsherry Aug 23, 2018
8f29c3f
oops
frankmcsherry Aug 23, 2018
45b98f1
zerocopy process allocator
frankmcsherry Aug 24, 2018
20c2e38
tidy
frankmcsherry Aug 25, 2018
b37bb3e
sizing bug in bytesslab
frankmcsherry Aug 25, 2018
06a36eb
sizing bug in bytesslab
frankmcsherry Aug 25, 2018
afb5b04
use prior default process allocator
frankmcsherry Aug 27, 2018
8124a46
documenting, reorganization, perf bugs
frankmcsherry Aug 28, 2018
83a3806
choice of inequality is important
frankmcsherry Aug 28, 2018
db5a856
stop eager posting
frankmcsherry Aug 29, 2018
400665c
add try_merge functionality
frankmcsherry Aug 30, 2018
aa7dab4
merge_queue
frankmcsherry Aug 30, 2018
a002ffa
drop in-progress on resize
frankmcsherry Aug 31, 2018
88cfb72
panic propagation
frankmcsherry Sep 3, 2018
03b44d6
public fns, process serialization
frankmcsherry Sep 4, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ license = "MIT"
[dependencies]
abomonation = "0.5"
abomonation_derive = "0.3"
timely_communication = "0.5"
#timely_communication = "0.5"
timely_communication = { path = "./communication"}
byteorder="1"
time="0.1.34"
bytes = { path = "./bytes" }

[dev-dependencies]
timely_sort="0.1.6"
Expand Down
67 changes: 40 additions & 27 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! drop(shared2);
//! drop(shared3);
//!
//! if let Ok(bytes) = shared4.try_recover() {
//! if let Ok(bytes) = shared4.try_recover::<Vec<u8>>() {
//! assert_eq!(bytes[200..1024].to_vec(), [1u8;824].to_vec());
//! assert_eq!(bytes[60..100].to_vec(), [2u8;40].to_vec());
//! assert_eq!(bytes[100..200].to_vec(), [3u8;100].to_vec());
Expand All @@ -42,9 +42,10 @@ pub mod rc {

use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::any::Any;

/// A thread-local byte buffer backed by a shared allocation.
pub struct Bytes<B: DerefMut<Target=[u8]>> {
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -54,20 +55,24 @@ pub mod rc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Rc<B>,
sequestered: Rc<Box<Any>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to switch to Box<Any>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this was because otherwise the B type pervades the rest of the code, and everything needs to be generic with respect to B, or written for one specific B, and it seemed likely that we might have more than one type (e.g. Vec<u8> for process-local stuff, and maybe something different (Arc<Vec<u8>>?) for networking stuff).

(on the road at the moment, will try to be more helpful soon!)

}

impl<B: DerefMut<Target=[u8]>> Bytes<B> {
impl Bytes {

/// Create a new instance from a byte allocation.
pub fn from(bytes: B) -> Bytes<B> {
pub fn from<B>(bytes: B) -> Bytes where B: DerefMut<Target=[u8]>+'static {

let mut rc = Rc::new(bytes);
let mut boxed = Box::new(bytes) as Box<Any>;

let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
let sequestered = Rc::new(boxed);

Bytes {
ptr: Rc::get_mut(&mut rc).unwrap().as_mut_ptr(),
len: rc.len(),
sequestered: rc,
ptr,
len,
sequestered,
}
}

Expand All @@ -77,7 +82,7 @@ pub mod rc {
///
/// This method uses an `unsafe` region to advance the pointer by `index`. It first
/// tests `index` against `self.len`, which should ensure that the offset is in-bounds.
pub fn extract_to(&mut self, index: usize) -> Bytes<B> {
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

Expand All @@ -97,9 +102,9 @@ pub mod rc {
///
/// This method either results in the underlying storage if it is uniquely held, or the
/// input `Bytes` if it is not uniquely held.
pub fn try_recover(self) -> Result<B, Bytes<B>> {
pub fn try_recover<B>(self) -> Result<B, Bytes> where B: DerefMut<Target=[u8]>+'static {
match Rc::try_unwrap(self.sequestered) {
Ok(bytes) => Ok(bytes),
Ok(bytes) => Ok(*bytes.downcast::<B>().unwrap()),
Err(rc) => Err(Bytes {
ptr: self.ptr,
len: self.len,
Expand All @@ -109,14 +114,14 @@ pub mod rc {
}
}

impl<B: DerefMut<Target=[u8]>> Deref for Bytes<B> {
impl Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<B: DerefMut<Target=[u8]>> DerefMut for Bytes<B> {
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
Expand All @@ -128,9 +133,10 @@ pub mod arc {

use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::any::Any;

/// A thread-safe byte buffer backed by a shared allocation.
pub struct Bytes<B: DerefMut<Target=[u8]>> {
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -140,20 +146,26 @@ pub mod arc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Arc<B>,
sequestered: Arc<Box<Any>>,
}

impl<B: DerefMut<Target=[u8]>> Bytes<B> {
unsafe impl Send for Bytes { }

impl Bytes {

/// Create a new instance from a byte allocation.
pub fn from(bytes: B) -> Bytes<B> {
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {

let mut boxed = Box::new(bytes) as Box<Any>;

let mut arc = Arc::new(bytes);
let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
let sequestered = Arc::new(boxed);

Bytes {
ptr: Arc::get_mut(&mut arc).unwrap().as_mut_ptr(),
len: arc.len(),
sequestered: arc,
ptr,
len,
sequestered,
}
}

Expand All @@ -163,7 +175,7 @@ pub mod arc {
///
/// This method uses an `unsafe` region to advance the pointer by `index`. It first
/// tests `index` against `self.len`, which should ensure that the offset is in-bounds.
pub fn extract_to(&mut self, index: usize) -> Bytes<B> {
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

Expand All @@ -183,9 +195,10 @@ pub mod arc {
///
/// This method either results in the underlying storage if it is uniquely held, or the
/// input `Bytes` if it is not uniquely held.
pub fn try_recover(self) -> Result<B, Bytes<B>> {
pub fn try_recover<B>(self) -> Result<B, Bytes> where B: DerefMut<Target=[u8]>+'static {
// println!("Trying recovery; strong count: {:?}", Arc::strong_count(&self.sequestered));
match Arc::try_unwrap(self.sequestered) {
Ok(bytes) => Ok(bytes),
Ok(bytes) => Ok(*bytes.downcast::<B>().unwrap()),
Err(arc) => Err(Bytes {
ptr: self.ptr,
len: self.len,
Expand All @@ -195,14 +208,14 @@ pub mod arc {
}
}

impl<B: DerefMut<Target=[u8]>> Deref for Bytes<B> {
impl Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<B: DerefMut<Target=[u8]>> DerefMut for Bytes<B> {
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
Expand Down
1 change: 1 addition & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ getopts={version="0.2.14", optional=true}
time="0.1.34"
abomonation = "0.5"
abomonation_derive = "0.3"
bytes = { path = "../bytes" }

[[examples]]
features = "arg_parse"
Expand Down
5 changes: 5 additions & 0 deletions communication/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ fn main() {
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {

allocator.pre_work();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message);
received += 1;
}

allocator.post_work();
}

allocator.index()
Expand Down
6 changes: 4 additions & 2 deletions communication/src/allocator/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ impl<T:Data> Pull<T> for Puller<T> {
let log_sender = &self.log_sender;
if inner.is_some() { inner }
else {
self.current = self.receiver.try_recv().ok().map(|mut bytes| {
self.current = self.receiver.try_recv().ok().map(|bytes| {
log_sender.when_enabled(|l| l.log(
::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: None,
is_start: true,
})));
let result = <T as Serialize>::from_bytes(&mut bytes);

let bytes = ::bytes::arc::Bytes::from(bytes);
let result = <T as Serialize>::from_bytes(bytes);
log_sender.when_enabled(|l| l.log(
::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: None,
Expand Down
73 changes: 61 additions & 12 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! for example closures whose type arguments must be specified.

use allocator::{Allocate, Thread, Process, Binary};
use allocator::process_binary::{ProcessBinary, ProcessBinaryBuilder};
use {Push, Pull, Data};

/// Enumerates known implementors of `Allocate`.
Expand All @@ -12,31 +13,52 @@ pub enum Generic {
Thread(Thread),
Process(Process),
Binary(Binary),
ProcessBinary(ProcessBinary<::allocator::process_binary::vec::VecBytesExchange>),
}

impl Generic {
/// The index of the worker out of `(0..self.peers())`.
pub fn index(&self) -> usize {
match *self {
Generic::Thread(ref t) => t.index(),
Generic::Process(ref p) => p.index(),
Generic::Binary(ref b) => b.index(),
match self {
&Generic::Thread(ref t) => t.index(),
&Generic::Process(ref p) => p.index(),
&Generic::Binary(ref b) => b.index(),
&Generic::ProcessBinary(ref pb) => pb.index(),
}
}
/// The number of workers.
pub fn peers(&self) -> usize {
match *self {
Generic::Thread(ref t) => t.peers(),
Generic::Process(ref p) => p.peers(),
Generic::Binary(ref b) => b.peers(),
match self {
&Generic::Thread(ref t) => t.peers(),
&Generic::Process(ref p) => p.peers(),
&Generic::Binary(ref b) => b.peers(),
&Generic::ProcessBinary(ref pb) => pb.peers(),
}
}
/// Constructs several send endpoints and one receive endpoint.
pub fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
match *self {
Generic::Thread(ref mut t) => t.allocate(),
Generic::Process(ref mut p) => p.allocate(),
Generic::Binary(ref mut b) => b.allocate(),
match self {
&mut Generic::Thread(ref mut t) => t.allocate(),
&mut Generic::Process(ref mut p) => p.allocate(),
&mut Generic::Binary(ref mut b) => b.allocate(),
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(),
}
}

pub fn pre_work(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.pre_work(),
&mut Generic::Process(ref mut p) => p.pre_work(),
&mut Generic::Binary(ref mut b) => b.pre_work(),
&mut Generic::ProcessBinary(ref mut pb) => pb.pre_work(),
}
}
pub fn post_work(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.post_work(),
&mut Generic::Process(ref mut p) => p.post_work(),
&mut Generic::Binary(ref mut b) => b.post_work(),
&mut Generic::ProcessBinary(ref mut pb) => pb.post_work(),
}
}
}
Expand All @@ -47,4 +69,31 @@ impl Allocate for Generic {
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
self.allocate()
}

fn pre_work(&mut self) { self.pre_work(); }
fn post_work(&mut self) { self.post_work(); }
}


/// Enumerations of constructable implementors of `Allocate`.
///
/// The builder variants are meant to be `Send`, so that they can be moved across threads,
/// whereas the allocator they construct may not. As an example, the `ProcessBinary` type
/// contains `Rc` wrapped state, and so cannot itself be moved across threads.
pub enum GenericBuilder {
Thread(Thread),
Process(Process),
Binary(Binary),
ProcessBinary(ProcessBinaryBuilder<::allocator::process_binary::vec::VecBytesExchange>),
}

impl GenericBuilder {
pub fn build(self) -> Generic {
match self {
GenericBuilder::Thread(t) => Generic::Thread(t),
GenericBuilder::Process(p) => Generic::Process(p),
GenericBuilder::Binary(b) => Generic::Binary(b),
GenericBuilder::ProcessBinary(pb) => Generic::ProcessBinary(pb.build()),
}
}
}
6 changes: 5 additions & 1 deletion communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
pub use self::thread::Thread;
pub use self::process::Process;
pub use self::binary::Binary;
pub use self::generic::Generic;
pub use self::generic::{Generic, GenericBuilder};

pub mod thread;
pub mod process;
pub mod binary;
pub mod generic;
pub mod process_binary;

use {Data, Push, Pull};

Expand All @@ -22,4 +23,7 @@ pub trait Allocate {
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>);

fn pre_work(&mut self) { }
fn post_work(&mut self) { }
}
Loading