Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumentation overhaul, fixes #91 #92

Merged
merged 33 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e0b41b3
added instrumentation to progress tracking
anonion0 Apr 27, 2016
8fc9e88
added 'application' logging channel
anonion0 Jun 21, 2016
68502cf
logging for timely_communication
anonion0 Jun 22, 2016
7541cb9
redesigned logging infrastructure
utaal Feb 16, 2017
cd87a26
use EventWriter/EventReader
utaal Aug 10, 2017
ed362f1
explain dropping the to_tcp_socket communication logging thread
utaal Aug 10, 2017
0d237f3
captured streams has implicit default cap
utaal Aug 15, 2017
0783bfd
new instrumentation
utaal Aug 16, 2017
fcd3baa
fix warnings
utaal Dec 6, 2017
cb45ab8
various test fixes
utaal Dec 6, 2017
3e6d96c
fix warnings
utaal Dec 6, 2017
461f862
api and doc for execute
utaal Dec 6, 2017
2059472
construct an EventPusher per sender thread
utaal Dec 6, 2017
9e199c7
pagerank restored
utaal Dec 6, 2017
d1a805a
better docs, no warnings, and to_shared_tcp_socket
utaal Dec 6, 2017
dff2ea7
logging: Derive Abomonation
Dec 15, 2017
137374e
Bring back EventLink
Dec 15, 2017
2e262e3
Minor space cleanup
Dec 15, 2017
7870d40
examples: Receiver cleanup
Dec 15, 2017
3248259
Simplify design, only support a single subscriber at this level
utaal Dec 20, 2017
146a882
Add EventPusherTee, remove LogManager
utaal Dec 20, 2017
3f30400
Support rendezvous by exposing setup to EventPusher constructors
utaal Dec 20, 2017
81ece3a
use Option for BufferingLogger
utaal Dec 20, 2017
0fe9577
avoid copy in BufferingLogger
utaal Dec 20, 2017
eed258f
guarded logging
utaal Dec 20, 2017
d3fce80
fix panic, reduce overhead when logging is disabled
utaal Jan 4, 2018
a79c493
remove unnecessary clone in BatchLogger
utaal Jan 4, 2018
c0ea60e
re-enable and fix docstring tests
utaal Jan 4, 2018
acff9ce
flush logs after dataflow construction
utaal Jan 5, 2018
84d33f5
fix remaining doc todos
utaal Jan 5, 2018
651cb5f
move BufferingLogger to communication
utaal Jan 5, 2018
504a6b6
remove logging crate
utaal Jan 5, 2018
3ff69e9
revert perf regression in capture/event
utaal Jan 8, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ license = "MIT"

[features]
default=[]
logging=[]

[dependencies]
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
abomonation_derive = { git = "https://github.com/mystor/abomonation_derive.git" }
timely_sort="0.1.6"
timely_communication = { git = "https://github.com/frankmcsherry/timely-dataflow/" }
timely_communication = { version = "0.1.8", path = "./communication" }
byteorder="0.4.2"
time="0.1.34"

[dev-dependencies]
Expand Down
8 changes: 5 additions & 3 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ repository = "https://github.com/frankmcsherry/timely-dataflow.git"
keywords = ["timely", "dataflow"]
license = "MIT"

[dependencies.abomonation]
git="https://github.com/frankmcsherry/abomonation"
[features]
default=[] # ["logging"]

[dependencies]
#abomonation="0.4.4"
byteorder="0.4.2"
getopts="0.2.14"
time="0.1.34"
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
abomonation_derive = { git = "https://github.com/mystor/abomonation_derive.git" }

[profile.release]
opt-level = 3
Expand Down
57 changes: 47 additions & 10 deletions communication/src/allocator/binary.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::mpsc::{Sender, Receiver, channel};
use std::sync::Arc;

use {Allocate, Data, Push, Pull, Serialize};
use allocator::Process;
Expand All @@ -13,7 +14,8 @@ pub struct Binary {

// for loading up state in the networking threads.
pub readers: Vec<Sender<((usize, usize), Sender<Vec<u8>>)>>,
pub senders: Vec<Sender<(MessageHeader, Vec<u8>)>>
pub senders: Vec<Sender<(MessageHeader, Vec<u8>)>>,
pub log_sender: Arc<Fn(::logging::CommsSetup)->::logging::CommsLogger+Send+Sync>,
}

impl Binary {
Expand All @@ -24,12 +26,12 @@ impl Binary {
impl Allocate for Binary {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T:Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>) {
fn allocate<T:Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
let mut pushers: Vec<Box<Push<T>>> = Vec::new();

// we'll need process-local channels as well (no self-loop binary connection in this design; perhaps should allow)
let inner_peers = self.inner.peers();
let (inner_sends, inner_recv) = self.inner.allocate();
let (inner_sends, inner_recv, _) = self.inner.allocate();

// prep a pushable for each endpoint, multiplied by inner_peers
for index in 0..self.readers.len() {
Expand All @@ -44,8 +46,14 @@ impl Allocate for Binary {
source: self.index,
target: target_index,
length: 0,
seqno: 0,
};
pushers.push(Box::new(Pusher::new(header, self.senders[index].clone())));
let logger = (self.log_sender)(::logging::CommsSetup {
process: self.index,
sender: true,
remote: Some(target_index),
});
pushers.push(Box::new(Pusher::new(header, self.senders[index].clone(), logger)));
}
}

Expand All @@ -60,38 +68,54 @@ impl Allocate for Binary {
reader.send(((self.index, self.allocated), send.clone())).unwrap();
}

let pullable = Box::new(Puller::new(inner_recv, recv));
let logger = (self.log_sender)(::logging::CommsSetup {
process: self.index,
sender: false,
remote: None,
});
let pullable = Box::new(Puller::new(inner_recv, recv, logger));

self.allocated += 1;

return (pushers, pullable);
return (pushers, pullable, Some(self.allocated - 1));
}
}

struct Pusher<T> {
header: MessageHeader,
sender: Sender<(MessageHeader, Vec<u8>)>, // targets for each remote destination
phantom: ::std::marker::PhantomData<T>,
log_sender: ::logging::CommsLogger,
}

impl<T> Pusher<T> {
pub fn new(header: MessageHeader, sender: Sender<(MessageHeader, Vec<u8>)>) -> Pusher<T> {
pub fn new(header: MessageHeader, sender: Sender<(MessageHeader, Vec<u8>)>, log_sender: ::logging::CommsLogger) -> Pusher<T> {
Pusher {
header: header,
sender: sender,
phantom: ::std::marker::PhantomData,
log_sender: log_sender,
}
}
}

impl<T:Data> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
if let Some(ref mut element) = *element {
self.log_sender.when_enabled(|l| l.log(::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: Some(self.header.seqno),
is_start: true,
})));
let mut bytes = Vec::new();
<T as Serialize>::into_bytes(element, &mut bytes);
let mut header = self.header;
header.length = bytes.len();
self.sender.send((header, bytes)).ok(); // TODO : should be unwrap()?
self.log_sender.when_enabled(|l| l.log(::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: Some(self.header.seqno),
is_start: true,
})));
self.header.seqno += 1;
}
}
}
Expand All @@ -100,21 +124,34 @@ struct Puller<T> {
inner: Box<Pull<T>>, // inner pullable (e.g. intra-process typed queue)
current: Option<T>,
receiver: Receiver<Vec<u8>>, // source of serialized buffers
log_sender: ::logging::CommsLogger,
}
impl<T:Data> Puller<T> {
fn new(inner: Box<Pull<T>>, receiver: Receiver<Vec<u8>>) -> Puller<T> {
Puller { inner: inner, receiver: receiver, current: None }
fn new(inner: Box<Pull<T>>, receiver: Receiver<Vec<u8>>, log_sender: ::logging::CommsLogger) -> Puller<T> {
Puller { inner: inner, receiver: receiver, current: None, log_sender: log_sender }
}
}

impl<T:Data> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<T> {
let inner = self.inner.pull();
let log_sender = &self.log_sender;
if inner.is_some() { inner }
else {
self.current = self.receiver.try_recv().ok().map(|mut bytes| {
<T as Serialize>::from_bytes(&mut bytes)
log_sender.when_enabled(|l| l.log(
::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: None,
is_start: true,
})));
let result = <T as Serialize>::from_bytes(&mut bytes);
log_sender.when_enabled(|l| l.log(
::logging::CommsEvent::Serialization(::logging::SerializationEvent {
seq_no: None,
is_start: false,
})));
result
});
&mut self.current
}
Expand Down
6 changes: 4 additions & 2 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Generic {
}
}
/// Constructs several send endpoints and one receive endpoint.
pub fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>) {
pub fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
match self {
&mut Generic::Thread(ref mut t) => t.allocate(),
&mut Generic::Process(ref mut p) => p.allocate(),
Expand All @@ -44,5 +44,7 @@ impl Generic {
impl Allocate for Generic {
fn index(&self) -> usize { self.index() }
fn peers(&self) -> usize { self.peers() }
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>) { self.allocate() }
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
self.allocate()
}
}
2 changes: 1 addition & 1 deletion communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ pub trait Allocate {
/// The number of workers.
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>);
fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>);
}
4 changes: 2 additions & 2 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Process {
impl Allocate for Process {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Any+Send+'static>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>) {
fn allocate<T: Any+Send+'static>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {

// ensure exclusive access to shared list of channels
let mut channels = self.channels.lock().ok().expect("mutex error?");
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Allocate for Process {
self.allocated += 1;
let mut temp = Vec::new();
for s in send.into_iter() { temp.push(Box::new(s) as Box<Push<T>>); }
return (temp, Box::new(recv) as Box<Pull<T>>)
return (temp, Box::new(recv) as Box<Pull<T>>, None)
}
else {
panic!("channel already consumed");
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub struct Thread;
impl Allocate for Thread {
fn index(&self) -> usize { 0 }
fn peers(&self) -> usize { 1 }
fn allocate<T: 'static>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>) {
fn allocate<T: 'static>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>, Option<usize>) {
let (pusher, puller) = Thread::new();
(vec![Box::new(pusher)], Box::new(puller))
(vec![Box::new(pusher)], Box::new(puller), None)
}
}

Expand Down
12 changes: 7 additions & 5 deletions communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl Configuration {
}
}

fn create_allocators(config: Configuration) -> Result<Vec<Generic>,String> {
fn create_allocators(config: Configuration, logger: Arc<Fn(::logging::CommsSetup)->::logging::CommsLogger+Send+Sync>) -> Result<Vec<Generic>,String> {
match config {
Configuration::Thread => Ok(vec![Generic::Thread(Thread)]),
Configuration::Process(threads) => Ok(Process::new_vector(threads).into_iter().map(|x| Generic::Process(x)).collect()),
Configuration::Cluster(threads, process, addresses, report) => {
if let Ok(stuff) = initialize_networking(addresses, process, threads, report) {
if let Ok(stuff) = initialize_networking(addresses, process, threads, report, logger) {
Ok(stuff.into_iter().map(|x| Generic::Binary(x)).collect())
}
else {
Expand Down Expand Up @@ -146,17 +146,19 @@ fn create_allocators(config: Configuration) -> Result<Vec<Generic>,String> {
/// result: Ok(0)
/// result: Ok(1)
/// ```
pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(config: Configuration, func: F) -> Result<WorkerGuards<T>,String> {
pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(config: Configuration, func: F, log_sender: Arc<Fn(::logging::CommsSetup)->::logging::CommsLogger+Send+Sync>) -> Result<WorkerGuards<T>,String> {

let allocators = try!(create_allocators(config));
let allocators = try!(create_allocators(config, log_sender));
let logic = Arc::new(func);

let mut guards = Vec::new();
for allocator in allocators.into_iter() {
let clone = logic.clone();
guards.push(try!(thread::Builder::new()
.name(format!("worker thread {}", allocator.index()))
.spawn(move || (*clone)(allocator))
.spawn(move || {
(*clone)(allocator)
})
.map_err(|e| format!("{:?}", e))));
}

Expand Down
3 changes: 3 additions & 0 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@
extern crate getopts;
extern crate byteorder;
extern crate abomonation;
#[macro_use] extern crate abomonation_derive;
extern crate time;

pub mod allocator;
mod networking;
pub mod initialize;
mod drain;
pub mod logging;

use std::any::Any;
use abomonation::{Abomonation, encode, decode};
Expand Down
Loading