Skip to content

Commit

Permalink
Merge pull request #135 from frankmcsherry/dataplane_mockup
Browse files Browse the repository at this point in the history
Dataplane mockup
  • Loading branch information
frankmcsherry authored Sep 4, 2018
2 parents 63f1567 + 03b44d6 commit e70fcde
Show file tree
Hide file tree
Showing 72 changed files with 2,318 additions and 1,178 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ license = "MIT"
#build = "booktests.rs"

[dependencies]
abomonation = "0.5"
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
abomonation_derive = "0.3"
timely_communication = "0.6"
byteorder="1"
bytes = { path = "./bytes" }
timely_communication = { path = "./communication"}
time="0.1.34"

[dev-dependencies]
Expand Down
155 changes: 127 additions & 28 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! drop(shared2);
//! drop(shared3);
//!
//! if let Ok(bytes) = shared4.try_recover() {
//! if let Ok(bytes) = shared4.try_recover::<Vec<u8>>() {
//! assert_eq!(bytes[200..1024].to_vec(), [1u8;824].to_vec());
//! assert_eq!(bytes[60..100].to_vec(), [2u8;40].to_vec());
//! assert_eq!(bytes[100..200].to_vec(), [3u8;100].to_vec());
Expand All @@ -42,9 +42,10 @@ pub mod rc {

use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::any::Any;

/// A thread-local byte buffer backed by a shared allocation.
pub struct Bytes<B: DerefMut<Target=[u8]>> {
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -54,20 +55,24 @@ pub mod rc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Rc<B>,
sequestered: Rc<Box<Any>>,
}

impl<B: DerefMut<Target=[u8]>> Bytes<B> {
impl Bytes {

/// Create a new instance from a byte allocation.
pub fn from(bytes: B) -> Bytes<B> {
pub fn from<B>(bytes: B) -> Bytes where B: DerefMut<Target=[u8]>+'static {

let mut rc = Rc::new(bytes);
let mut boxed = Box::new(bytes) as Box<Any>;

let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
let sequestered = Rc::new(boxed);

Bytes {
ptr: Rc::get_mut(&mut rc).unwrap().as_mut_ptr(),
len: rc.len(),
sequestered: rc,
ptr,
len,
sequestered,
}
}

Expand All @@ -77,7 +82,7 @@ pub mod rc {
///
/// This method uses an `unsafe` region to advance the pointer by `index`. It first
/// tests `index` against `self.len`, which should ensure that the offset is in-bounds.
pub fn extract_to(&mut self, index: usize) -> Bytes<B> {
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

Expand All @@ -97,9 +102,9 @@ pub mod rc {
///
/// This method either results in the underlying storage if it is uniquely held, or the
/// input `Bytes` if it is not uniquely held.
pub fn try_recover(self) -> Result<B, Bytes<B>> {
pub fn try_recover<B>(self) -> Result<B, Bytes> where B: DerefMut<Target=[u8]>+'static {
match Rc::try_unwrap(self.sequestered) {
Ok(bytes) => Ok(bytes),
Ok(bytes) => Ok(*bytes.downcast::<B>().unwrap()),
Err(rc) => Err(Bytes {
ptr: self.ptr,
len: self.len,
Expand All @@ -109,14 +114,14 @@ pub mod rc {
}
}

impl<B: DerefMut<Target=[u8]>> Deref for Bytes<B> {
impl Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<B: DerefMut<Target=[u8]>> DerefMut for Bytes<B> {
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
Expand All @@ -128,9 +133,10 @@ pub mod arc {

use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::any::Any;

/// A thread-safe byte buffer backed by a shared allocation.
pub struct Bytes<B: DerefMut<Target=[u8]>> {
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -140,20 +146,26 @@ pub mod arc {
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a strong statement about this.
sequestered: Arc<B>,
sequestered: Arc<Box<Any>>,
}

impl<B: DerefMut<Target=[u8]>> Bytes<B> {
unsafe impl Send for Bytes { }

impl Bytes {

/// Create a new instance from a byte allocation.
pub fn from(bytes: B) -> Bytes<B> {
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {

let mut arc = Arc::new(bytes);
let mut boxed = Box::new(bytes) as Box<Any>;

let ptr = boxed.downcast_mut::<B>().unwrap().as_mut_ptr();
let len = boxed.downcast_ref::<B>().unwrap().len();
let sequestered = Arc::new(boxed);

Bytes {
ptr: Arc::get_mut(&mut arc).unwrap().as_mut_ptr(),
len: arc.len(),
sequestered: arc,
ptr,
len,
sequestered,
}
}

Expand All @@ -163,7 +175,7 @@ pub mod arc {
///
/// This method uses an `unsafe` region to advance the pointer by `index`. It first
/// tests `index` against `self.len`, which should ensure that the offset is in-bounds.
pub fn extract_to(&mut self, index: usize) -> Bytes<B> {
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

Expand All @@ -183,28 +195,115 @@ pub mod arc {
///
/// This method either results in the underlying storage if it is uniquely held, or the
/// input `Bytes` if it is not uniquely held.
pub fn try_recover(self) -> Result<B, Bytes<B>> {
///
/// #Examples
///
/// ```
/// use bytes::arc::Bytes;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
///
/// drop(shared1);
/// drop(shared2);
/// drop(shared4);
/// let recovered = shared3.try_recover::<Vec<u8>>().ok().expect("recovery failed");
/// assert!(recovered.len() == 1024);
/// ```
pub fn try_recover<B>(self) -> Result<B, Bytes> where B: DerefMut<Target=[u8]>+'static {
// println!("Trying recovery; strong count: {:?}", Arc::strong_count(&self.sequestered));
match Arc::try_unwrap(self.sequestered) {
Ok(bytes) => Ok(bytes),
Ok(bytes) => Ok(*bytes.downcast::<B>().unwrap()),
Err(arc) => Err(Bytes {
ptr: self.ptr,
len: self.len,
sequestered: arc,
}),
}
}

/// Regenerates the Bytes if it is uniquely held.
///
/// If uniquely held, this method recovers the initial pointer and length
/// of the sequestered allocation and re-initialized the Bytes. The return
/// value indicates whether this occurred.
///
/// #Examples
///
/// ```
/// use bytes::arc::Bytes;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
///
/// drop(shared1);
/// drop(shared2);
/// drop(shared4);
/// assert!(shared3.try_regenerate::<Vec<u8>>());
/// assert!(shared3.len() == 1024);
/// ```
pub fn try_regenerate<B>(&mut self) -> bool where B: DerefMut<Target=[u8]>+'static {
if let Some(boxed) = Arc::get_mut(&mut self.sequestered) {
let downcast = boxed.downcast_mut::<B>().expect("Downcast failed");
self.ptr = downcast.as_mut_ptr();
self.len = downcast.len();
true
}
else {
false
}
}

/// Attempts to merge adjacent slices from the same allocation.
///
/// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
/// If the merge fails self is unmodified and the result is `Err(other)`, returning the
/// bytes supplied as input.
///
/// #Examples
///
/// ```
/// use bytes::arc::Bytes;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
///
/// // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
/// shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
/// shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
/// ```
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
use ::std::sync::Arc;
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.offset(self.len as isize) }, other.ptr) {
self.len += other.len;
Ok(())
}
else {
Err(other)
}
}
}

impl<B: DerefMut<Target=[u8]>> Deref for Bytes<B> {
impl Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl<B: DerefMut<Target=[u8]>> DerefMut for Bytes<B> {
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}
}
}
5 changes: 3 additions & 2 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ default=["arg_parse"]
arg_parse=["getopts"]

[dependencies]
byteorder="1"
#byteorder="1"
getopts={version="0.2.14", optional=true}
time="0.1.34"
abomonation = "0.5"
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
abomonation_derive = "0.3"
bytes = { path = "../bytes" }

[profile.release]
opt-level = 3
Expand Down
12 changes: 10 additions & 2 deletions communication/examples/hello.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
extern crate timely_communication;

use std::ops::Deref;
use timely_communication::Message;

fn main() {

// extract the configuration from user-supplied arguments, initialize the computation.
Expand All @@ -14,18 +17,23 @@ fn main() {

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(format!("hello, {}", i));
senders[i].send(Message::from_typed(format!("hello, {}", i)));
senders[i].done();
}

// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {

allocator.pre_work();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message);
println!("worker {}: received: <{}>", allocator.index(), message.deref());
received += 1;
}

allocator.post_work();
}

allocator.index()
Expand Down
Loading

0 comments on commit e70fcde

Please sign in to comment.