Skip to content

Commit

Permalink
Test fallout from std::comm rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Dec 17, 2013
1 parent 529e268 commit 39a6c9d
Show file tree
Hide file tree
Showing 23 changed files with 140 additions and 123 deletions.
46 changes: 19 additions & 27 deletions doc/tutorial-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,16 @@ receiving messages. Pipes are low-level communication building-blocks and so
come in a variety of forms, each one appropriate for a different use case. In
what follows, we cover the most commonly used varieties.

The simplest way to create a pipe is to use the `comm::stream`
The simplest way to create a pipe is to use `Chan::new`
function to create a `(Port, Chan)` pair. In Rust parlance, a *channel*
is a sending endpoint of a pipe, and a *port* is the receiving
endpoint. Consider the following example of calculating two results
concurrently:

~~~~
# use std::task::spawn;
# use std::comm::{stream, Port, Chan};
let (port, chan): (Port<int>, Chan<int>) = stream();
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
do spawn || {
let result = some_expensive_computation();
Expand All @@ -150,8 +149,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
a tuple into its component parts).

~~~~
# use std::comm::{stream, Chan, Port};
let (port, chan): (Port<int>, Chan<int>) = stream();
let (port, chan): (Port<int>, Chan<int>) = Chan::new();
~~~~

The child task will use the channel to send data to the parent task,
Expand All @@ -160,9 +158,8 @@ spawns the child task.

~~~~
# use std::task::spawn;
# use std::comm::stream;
# fn some_expensive_computation() -> int { 42 }
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
do spawn || {
let result = some_expensive_computation();
chan.send(result);
Expand All @@ -180,25 +177,23 @@ computation, then waits for the child's result to arrive on the
port:

~~~~
# use std::comm::{stream};
# fn some_other_expensive_computation() {}
# let (port, chan) = stream::<int>();
# let (port, chan) = Chan::<int>::new();
# chan.send(0);
some_other_expensive_computation();
let result = port.recv();
~~~~

The `Port` and `Chan` pair created by `stream` enables efficient communication
between a single sender and a single receiver, but multiple senders cannot use
a single `Chan`, and multiple receivers cannot use a single `Port`. What if our
example needed to compute multiple results across a number of tasks? The
following program is ill-typed:
The `Port` and `Chan` pair created by `Chan::new` enables efficient
communication between a single sender and a single receiver, but multiple
senders cannot use a single `Chan`, and multiple receivers cannot use a single
`Port`. What if our example needed to compute multiple results across a number
of tasks? The following program is ill-typed:

~~~ {.xfail-test}
# use std::task::{spawn};
# use std::comm::{stream, Port, Chan};
# fn some_expensive_computation() -> int { 42 }
let (port, chan) = stream();
let (port, chan) = Chan::new();
do spawn {
chan.send(some_expensive_computation());
Expand All @@ -216,10 +211,8 @@ Instead we can use a `SharedChan`, a type that allows a single

~~~
# use std::task::spawn;
# use std::comm::{stream, SharedChan};
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let (port, chan) = SharedChan::new();
for init_val in range(0u, 3) {
// Create a new channel handle to distribute to the child task
Expand All @@ -238,23 +231,22 @@ Here we transfer ownership of the channel into a new `SharedChan` value. Like
as an *affine* or *linear* type). Unlike with `Chan`, though, the programmer
may duplicate a `SharedChan`, with the `clone()` method. A cloned
`SharedChan` produces a new handle to the same channel, allowing multiple
tasks to send data to a single port. Between `spawn`, `stream` and
tasks to send data to a single port. Between `spawn`, `Chan` and
`SharedChan`, we have enough tools to implement many useful concurrency
patterns.

Note that the above `SharedChan` example is somewhat contrived since
you could also simply use three `stream` pairs, but it serves to
you could also simply use three `Chan` pairs, but it serves to
illustrate the point. For reference, written with multiple streams, it
might look like the example below.

~~~
# use std::task::spawn;
# use std::comm::stream;
# use std::vec;
// Create a vector of ports, one for each child task
let ports = vec::from_fn(3, |init_val| {
let (port, chan) = stream();
let (port, chan) = Chan::new();
do spawn {
chan.send(some_expensive_computation(init_val));
}
Expand Down Expand Up @@ -341,7 +333,7 @@ fn main() {
let numbers_arc = Arc::new(numbers);
for num in range(1u, 10) {
let (port, chan) = stream();
let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
do spawn {
Expand Down Expand Up @@ -370,7 +362,7 @@ and a clone of it is sent to each task
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc = Arc::new(numbers);
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
chan.send(numbers_arc.clone());
~~~
copying only the wrapper and not its contents.
Expand All @@ -382,7 +374,7 @@ Each task recovers the underlying data by
# use std::rand;
# let numbers=vec::from_fn(1000000, |_| rand::random::<f64>());
# let numbers_arc=Arc::new(numbers);
# let (port, chan) = stream();
# let (port, chan) = Chan::new();
# chan.send(numbers_arc.clone());
# let local_arc : Arc<~[f64]> = port.recv();
let task_numbers = local_arc.get();
Expand Down Expand Up @@ -499,7 +491,7 @@ Here is the code for the parent task:
# }
# fn main() {
let (from_child, to_child) = DuplexStream();
let (from_child, to_child) = DuplexStream::new();
do spawn {
stringifier(&to_child);
Expand Down
3 changes: 1 addition & 2 deletions src/libextra/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,8 @@ mod tests {
})
}

let mut c = Some(c);
arc.access_cond(|state, cond| {
c.take_unwrawp().send(());
c.send(());
assert!(!*state);
while !*state {
cond.wait();
Expand Down
2 changes: 1 addition & 1 deletion src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ mod tests {
let mi = m2.clone();
// spawn sibling task
do task::spawn { // linked
let mut c = Some(c);
mi.lock_cond(|cond| {
c.send(()); // tell sibling to go ahead
(|| {
Expand Down Expand Up @@ -994,6 +993,7 @@ mod tests {
})
}
#[test]
#[ignore(reason = "linked failure?")]
fn test_mutex_different_conds() {
let result = do task::try {
let m = Mutex::new_with_condvars(2);
Expand Down
39 changes: 19 additions & 20 deletions src/librustuv/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ impl Drop for UdpWatcher {

#[cfg(test)]
mod test {
use std::comm::oneshot;
use std::rt::test::*;
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket};
Expand Down Expand Up @@ -689,7 +688,7 @@ mod test {
#[test]
fn listen_ip4() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
Expand Down Expand Up @@ -725,7 +724,7 @@ mod test {
#[test]
fn listen_ip6() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let addr = next_test_ip6();
do spawn {
Expand Down Expand Up @@ -761,7 +760,7 @@ mod test {
#[test]
fn udp_recv_ip4() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let client = next_test_ip4();
let server = next_test_ip4();
Expand Down Expand Up @@ -793,7 +792,7 @@ mod test {
#[test]
fn udp_recv_ip6() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let client = next_test_ip6();
let server = next_test_ip6();
Expand Down Expand Up @@ -828,7 +827,7 @@ mod test {
use std::rt::rtio::*;
let addr = next_test_ip4();
static MAX: uint = 5000;
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
Expand Down Expand Up @@ -865,7 +864,7 @@ mod test {
fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
do spawn {
let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
Expand Down Expand Up @@ -896,8 +895,8 @@ mod test {
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
let (p1, c1) = oneshot();
let (p2, c2) = oneshot();
let (p1, c1) = Chan::new();
let (p2, c2) = Chan::new();
do spawn {
let l = local_loop();
Expand Down Expand Up @@ -953,12 +952,12 @@ mod test {
#[test]
fn test_read_and_block() {
let addr = next_test_ip4();
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let (port2, chan2) = stream();
let (port2, chan2) = Chan::new();
chan.send(port2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
Expand Down Expand Up @@ -1026,7 +1025,7 @@ mod test {
// thread, close itself, and then come back to the last thread.
#[test]
fn test_homing_closes_correctly() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
do task::spawn_sched(task::SingleThreaded) {
let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
Expand All @@ -1048,9 +1047,9 @@ mod test {
use std::rt::sched::{Shutdown, TaskFromFriend};
use std::rt::sleeper_list::SleeperList;
use std::rt::task::Task;
use std::rt::task::UnwindResult;
use std::rt::thread::Thread;
use std::rt::deque::BufferPool;
use std::task::TaskResult;
use std::unstable::run_in_bare_thread;
use uvio::UvEventLoop;
Expand All @@ -1072,12 +1071,12 @@ mod test {
let handle2 = sched2.make_handle();
let tasksFriendHandle = sched2.make_handle();
let on_exit: proc(UnwindResult) = proc(exit_status) {
let on_exit: proc(TaskResult) = proc(exit_status) {
let mut handle1 = handle1;
let mut handle2 = handle2;
handle1.send(Shutdown);
handle2.send(Shutdown);
assert!(exit_status.is_success());
assert!(exit_status.is_ok());
};
unsafe fn local_io() -> &'static mut IoFactory {
Expand Down Expand Up @@ -1148,7 +1147,7 @@ mod test {
#[should_fail] #[test]
fn tcp_stream_fail_cleanup() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
Expand All @@ -1172,7 +1171,7 @@ mod test {
#[should_fail] #[test]
fn udp_fail_other_task() {
let addr = next_test_ip4();
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
// force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this
Expand All @@ -1190,7 +1189,7 @@ mod test {
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure1() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
Expand All @@ -1208,7 +1207,7 @@ mod test {
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure2() {
let (port, chan) = oneshot();
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
Expand All @@ -1229,7 +1228,7 @@ mod test {
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure3() {
let (port, chan) = stream();
let (port, chan) = Chan::new();
let addr = next_test_ip4();

do spawn {
Expand Down
5 changes: 2 additions & 3 deletions src/librustuv/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ impl HomingIO for PipeAcceptor {

#[cfg(test)]
mod tests {
use std::comm::oneshot;
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::rt::test::next_test_unix;

Expand Down Expand Up @@ -274,7 +273,7 @@ mod tests {
fn connect() {
let path = next_test_unix();
let path2 = path.clone();
let (port, chan) = oneshot();
let (port, chan) = Chan::new();

do spawn {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
Expand All @@ -298,7 +297,7 @@ mod tests {
fn connect_fail() {
let path = next_test_unix();
let path2 = path.clone();
let (port, chan) = oneshot();
let (port, chan) = Chan::new();

do spawn {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
Expand Down
4 changes: 1 addition & 3 deletions src/librustuv/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,11 @@ mod test {
use super::*;
use super::super::local_loop;
use std::io::signal;
use std::comm::{SharedChan, stream};

#[test]
fn closing_channel_during_drop_doesnt_kill_everything() {
// see issue #10375, relates to timers as well.
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let (port, chan) = SharedChan::new();
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
chan);

Expand Down
Loading

17 comments on commit 39a6c9d

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@39a6c9d

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/spsc-queue = 39a6c9d into auto

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/spsc-queue = 39a6c9d merged ok, testing candidate = 1804c3ff

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@39a6c9d

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/spsc-queue = 39a6c9d into auto

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/spsc-queue = 39a6c9d merged ok, testing candidate = f12c2099

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@39a6c9d

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/spsc-queue = 39a6c9d into auto

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/spsc-queue = 39a6c9d merged ok, testing candidate = ae807b75

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from brson
at alexcrichton@39a6c9d

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging alexcrichton/rust/spsc-queue = 39a6c9d into auto

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alexcrichton/rust/spsc-queue = 39a6c9d merged ok, testing candidate = 47c9a35

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bors
Copy link
Contributor

@bors bors commented on 39a6c9d Dec 17, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding master to auto = 47c9a35

Please sign in to comment.