Skip to content

Commit

Permalink
revise rust concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
greathongtu committed Apr 12, 2024
1 parent ca3b284 commit 9994c33
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 139 deletions.
127 changes: 74 additions & 53 deletions content/posts/rust-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() {
println!("{n}");
}
}).join().unwrap();
let t1 = thread::spawn(f);
let t1 = thread::spawn(f);
println!("Hello from the main thread.");
t1.join().unwrap();
}
Expand All @@ -26,6 +26,8 @@ fn f(){


// scoped thread
// scoped threads can borrow non-static data,
// as the scope guarantees all threads will be joined at the end
use std::thread;
fn main() {
let numbers = vec![1, 2, 3];
Expand All @@ -45,8 +47,10 @@ fn main() {


// interior mutability
// Cell<T> and RefCell<T> types may be mutated through shared references
// Both single threaded

// Cell: single threaded
// Cell implements interior mutability by moving values in and out of the Cell<T>.
// To avoid undefined behavior, it only allows you
// to copy the value out (if T is Copy),
// or replace it with another value as a whole
Expand All @@ -65,45 +69,50 @@ fn main() {
}


// RefCell: single threaded
// RefCell let you use references instead of values
// allow you to borrow its contents
// If you try to borrow it while it is already mutably borrowed it will panic.
// If you try to borrow it while it is already mutably borrowed it will panic.
fn f(v: &RefCell<Vec<i32>>) {
v.borrow_mut().push(1); // We can modify the `Vec` directly.
v.borrow_mut().push(1); // We can modify the `Vec` directly.
}

// When to choose interior mutability
// 1. Introducing mutability 'inside' of something immutable: Rc<RefCell<T>>
// 2. Implementation details of logically-immutable methods.
// 3. Mutating implementations of Clone

// RwLock: concurrent version of RefCell
// unlike a RefCell, it does not panic.

// RwLock & mutex: concurrent version of RefCell
// unlike a RefCell, it does not panic.
// Instead, it blocks the current thread — putting it to sleep when conflicting.
// more complicated version of mutex. Instead of a single lock() method,
// it has a read() and write() method for locking
// more complicated version of mutex. Instead of a single lock() method,
// it has a read() and write() method for locking


// Atomics: concurrent version of Cell
// Atomics: concurrent version of Cell
// Unlike a Cell, they cannot be of arbitrary size.
// no generic Atomic<T> type for any T
// there are only specific atomic types such as AtomicU32 and AtomicPtr<T>.


// UnsafeCell: primitive building block for interior mutability.
// its get() method just gives a raw pointer to the value it wraps
// Usually UnsafeCell is not used directly, but wrapped in Cell or Mutex
// UnsafeCell: primitive building block for interior mutability.
// its get() method just gives a raw pointer to the value it wraps
// Usually UnsafeCell is not used directly, but wrapped in Cell or Mutex


// Send & Sync
// A type is Send if it can be sent to another thread.
// A type is Sync if it can be shared with another thread aka: &T is Send.
// A type is Send if it can be sent to another thread.
// A type is Sync if it can be shared with another thread aka: &T is Send.
// They are auto traits. To opt out, add a field like std::marker::PhantomData<T>.
// PhantomData is treated as a T. not exist at runtime. zero-sized
use std::marker::PhantomData;
use std::marker::PhantomData;
struct X {
handle: i32,
handle: i32,
// Since Cell<()> is not Sync.
_not_sync: PhantomData<Cell<()>>,
}
_not_sync: PhantomData<Cell<()>>,
}
// Raw pointers (*const T and *mut T) are neither Send nor Sync.
// To opt in, impl it urself
// To opt in Send & Sync, impl it urself


// Mutex<T> does not have an unlock() method.
Expand All @@ -117,7 +126,7 @@ fn main() {
thread::scope(|s| {
for _ in 0..10 {
s.spawn(|| {
// MutexGuard. unwrap() relate to lock poisoning.
// MutexGuard. unwrap() relate to lock poisoning.
let mut guard = n.lock().unwrap();
for _ in 0..100 {
// DerefMut trait
Expand All @@ -127,38 +136,42 @@ fn main() {
}
});
// into_inner takes ownership of the mutex,
// nothing else have a reference to the mutex anymore
// nothing else have a reference to the mutex anymore
assert_eq!(n.into_inner().unwrap(), 1000);
}


// Lock Poisoning
// A Mutex gets marked as poisoned when a thread panics while holding the lock.
// Lock Poisoning
// A Mutex gets marked as poisoned when a thread panics while holding the lock.
// When poisoned, the Mutex will no longer be locked,
// calling lock() will result in an Err.


// Lifetime of the MutexGuard:
// Any temporaries produced within a larger expression,
// like the guard returned by lock()
// will be dropped at the end of the statement.
// will be dropped at the end of the statement.
list.lock().unwrap().push(1);

// common pitfall regarding to this:
// item could be borrowing from the list,
// making it necessary to keep the guard around.
// When using match, if let, or while let
// the lock is hold until the end of the block
if let Some(item) = list.lock().unwrap().front() {
process_item(item);
}
// the temporary guard does get dropped before if statement body
// since the condition of a if statement is a plain boolean.
// No reason to extend the lifetime
process_item(item);
}
// This is OK
if list.lock().unwrap().pop() == Some(1) {
do_something();
}
do_something();
}

// This is OK
let item = list.lock().unwrap().pop();
if let Some(item) = item {
process_item(item);
}


// Waiting: Parking and Condition Variables
// Waiting: Parking and Condition Variables
// When data is mutated by multiple threads, there are many
// situations where they would need to wait for some event,
// for some condition about the data to become true.
Expand Down Expand Up @@ -188,7 +201,7 @@ fn main() {
// Producing thread
for i in 0.. {
queue.lock().unwrap().push_back(i);
// wake t up
// wake it up
t.thread().unpark();
thread::sleep(Duration::from_secs(1));
}
Expand All @@ -199,9 +212,10 @@ fn main() {
// spurious wake-ups
// a call to unpark() before the thread parks itself does not get lost.
// Otherwise starve
// but unpark() requests don’t stack up
// but unpark() requests don’t stack up


// Why condition variables:
// if we had multiple consumer threads taking items from the same queue,
// we need Condition Variables
use std::collections::VecDeque;
Expand Down Expand Up @@ -242,43 +256,50 @@ fn main() {
}
// Normally, a Condvar is only ever used together with a single Mutex
// A downside of a Condvar is that it only works when used together with a Mutex.
```




## Atomic Operations
```Rust
// 探讨原子操作之前,需要看内存顺序 memory ordering,
// 每个原子操作都有一个std::sync::atomic::Ordering参数
// Relaxed:Still guarantees consistency on a single atomic variable,
// but does not promise anything about the relative order
// of operations between different variables.
// but does not promise anything about the relative order of operations
// between different variables.

// So two threads might see operations on different variables
// happen in a different order.

// Atomic Load and Store Operations
impl AtomicI32 {
pub fn load(&self, ordering: Ordering) -> i32;
// only takes shared reference
pub fn store(&self, value: i32, ordering: Ordering);
}
pub fn store(&self, value: i32, ordering: Ordering);
}


// std::sync::Once and std::sync::OnceLock
// A cell which can be written to only once

pub struct OnceCell<T> { /* private fields */ }
// thread-safe OnceCell, can be used in statics.
// 用get_or_init获取值
pub struct OnceLock<T>



// Fetch-and-Modify Operations
// could overflow compared to CAS
// Example: Progress Reporting from Multiple Threads
// Example: Progress Reporting from Multiple Threads
// https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/examples/ch2-06-progress-reporting-multiple-threads.rs


// example: Statistics
// the three atomic variables are updated separately,
// the Relaxed memory ordering gives no guarantees about the relative order of operations
// example: Statistics
// the three atomic variables are updated separately,
// the Relaxed memory ordering gives no guarantees about the relative order of operations
// https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/examples/ch2-07-statistics.rs

// example: ID Allocation. could overflow
// https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/examples/ch2-08-id-allocation.rs

Expand All @@ -292,7 +313,7 @@ impl AtomicI32 {
success_order: Ordering,
failure_order: Ordering
) -> Result<i32, i32>;
}
}

// CAS example usage
use std::sync::atomic::AtomicU32;
Expand All @@ -316,7 +337,7 @@ fn main() {
assert_eq!(a.into_inner(), 2);
}
// compare_exchange_weak is more efficiently on some platforms.
// Fetch-Update: for the compare-and-exchange loop pattern.
// Fetch-Update: for the compare-and-exchange loop pattern.

// CAS 的应用场景:生成某种密钥,只要生成一次,之后的就不要再生成了,用已经有的
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -378,7 +399,7 @@ fn main() {
// 也就是只有中间的部分能上下移动
```

## Relaxed Ordering
## Relaxed Ordering
While atomic operations using relaxed memory ordering do not provide any happens-before relationship, they do guarantee a total modification order of each individual atomic variable. This means that all modifications of the same atomic variable happen in an order that is the same from the perspective of every single thread.
在不同的线程看来,对于同一个原子变量的不同修改是相同顺序的
reference code: https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/examples/ch3-04-total-modification-order-2.rs
Expand Down Expand Up @@ -429,4 +450,4 @@ almost never necessary
my_spin_lock: https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/src/ch4_spin_lock/s3_guard.rs


My channels: https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/src/ch5_channels/s6_blocking.rs
My_channels: https://github.com/m-ou-se/rust-atomics-and-locks/blob/main/src/ch5_channels/s6_blocking.rs
2 changes: 1 addition & 1 deletion public/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<link>http://localhost:1313/posts/rust-concurrency/</link>
<pubDate>Sun, 31 Mar 2024 01:39:55 +0800</pubDate>
<guid>http://localhost:1313/posts/rust-concurrency/</guid>
<description>use std::thread; fn main() { // pass a closure let numbers = vec![1, 2, 3]; thread::spawn(move || { for n in numbers { println!(&amp;#34;{n}&amp;#34;); } }).join().unwrap(); let t1 = thread::spawn(f); println!(&amp;#34;Hello from the main thread.&amp;#34;); t1.join().unwrap(); } fn f(){ println!(&amp;#34;Hello from another thread!&amp;#34;); let id = thread::current().id(); println!(&amp;#34;This is my thread id: {id:?}&amp;#34;); } // scoped thread use std::thread; fn main() { let numbers = vec![1, 2, 3]; // argument s of the closure represents the scope.</description>
<description>use std::thread; fn main() { // pass a closure let numbers = vec![1, 2, 3]; thread::spawn(move || { for n in numbers { println!(&amp;#34;{n}&amp;#34;); } }).join().unwrap(); let t1 = thread::spawn(f); println!(&amp;#34;Hello from the main thread.&amp;#34;); t1.join().unwrap(); } fn f(){ println!(&amp;#34;Hello from another thread!&amp;#34;); let id = thread::current().id(); println!(&amp;#34;This is my thread id: {id:?}&amp;#34;); } // scoped thread // scoped threads can borrow non-static data, // as the scope guarantees all threads will be joined at the end use std::thread; fn main() { let numbers = vec!</description>
</item>
<item>
<title>React</title>
Expand Down
4 changes: 2 additions & 2 deletions public/posts/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@

<section class="relative my-10 first-of-type:mt-0 last-of-type:mb-0">

<h2 class="!my-0 pb-1 font-bold !leading-none">Idiomatic_rust</h2>
<h2 class="!my-0 pb-1 font-bold !leading-none">idiomatic rust way</h2>
<time class="text-sm antialiased opacity-60"
>Apr 3, 2024</time
>
<a class="absolute inset-0 text-[0]" href="http://localhost:1313/posts/idiomatic_rust/">Idiomatic_rust</a>
<a class="absolute inset-0 text-[0]" href="http://localhost:1313/posts/idiomatic_rust/">idiomatic rust way</a>
</section>


Expand Down
4 changes: 2 additions & 2 deletions public/posts/index.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<lastBuildDate>Wed, 03 Apr 2024 15:28:46 +0800</lastBuildDate>
<atom:link href="http://localhost:1313/posts/index.xml" rel="self" type="application/rss+xml" />
<item>
<title>Idiomatic_rust</title>
<title>idiomatic rust way</title>
<link>http://localhost:1313/posts/idiomatic_rust/</link>
<pubDate>Wed, 03 Apr 2024 15:28:46 +0800</pubDate>
<guid>http://localhost:1313/posts/idiomatic_rust/</guid>
Expand All @@ -20,7 +20,7 @@
<link>http://localhost:1313/posts/rust-concurrency/</link>
<pubDate>Sun, 31 Mar 2024 01:39:55 +0800</pubDate>
<guid>http://localhost:1313/posts/rust-concurrency/</guid>
<description>use std::thread; fn main() { // pass a closure let numbers = vec![1, 2, 3]; thread::spawn(move || { for n in numbers { println!(&amp;#34;{n}&amp;#34;); } }).join().unwrap(); let t1 = thread::spawn(f); println!(&amp;#34;Hello from the main thread.&amp;#34;); t1.join().unwrap(); } fn f(){ println!(&amp;#34;Hello from another thread!&amp;#34;); let id = thread::current().id(); println!(&amp;#34;This is my thread id: {id:?}&amp;#34;); } // scoped thread use std::thread; fn main() { let numbers = vec![1, 2, 3]; // argument s of the closure represents the scope.</description>
<description>use std::thread; fn main() { // pass a closure let numbers = vec![1, 2, 3]; thread::spawn(move || { for n in numbers { println!(&amp;#34;{n}&amp;#34;); } }).join().unwrap(); let t1 = thread::spawn(f); println!(&amp;#34;Hello from the main thread.&amp;#34;); t1.join().unwrap(); } fn f(){ println!(&amp;#34;Hello from another thread!&amp;#34;); let id = thread::current().id(); println!(&amp;#34;This is my thread id: {id:?}&amp;#34;); } // scoped thread // scoped threads can borrow non-static data, // as the scope guarantees all threads will be joined at the end use std::thread; fn main() { let numbers = vec!</description>
</item>
<item>
<title>React</title>
Expand Down
Loading

0 comments on commit 9994c33

Please sign in to comment.