显式线程
生成短期线程
本实例使用 crossbeam crate 为并发和并行编程提供了数据结构和函数。Scope::spawn
生成一个新的作用域线程,该线程确保传入 crossbeam::scope
函数的闭包在返回之前终止,这意味着您可以从调用的函数中引用数据。
本实例将数组一分为二,并在不同的线程中并行计算。
fn main() {
let arr = &[1, 25, -4, 10];
let max = find_max(arr);
assert_eq!(max, Some(25));
}
fn find_max(arr: &[i32]) -> Option<i32> {
const THRESHOLD: usize = 2;
if arr.len() <= THRESHOLD {
return arr.iter().cloned().max();
}
let mid = arr.len() / 2;
let (left, right) = arr.split_at(mid);
crossbeam::scope(|s| {
let thread_l = s.spawn(|_| find_max(left));
let thread_r = s.spawn(|_| find_max(right));
let max_l = thread_l.join().unwrap()?;
let max_r = thread_r.join().unwrap()?;
Some(max_l.max(max_r))
}).unwrap()
}
创建并发的数据管道
下面的实例使用 crossbeam 和 crossbeam-channel 两个 crate 创建了一个并行的管道,与 ZeroMQ 指南 中所描述的类似:管道有一个数据源和一个数据接收器,数据在从源到接收器的过程中由两个工作线程并行处理。
我们使用容量由 crossbeam_channel::bounded
分配的有界信道。生产者必须在它自己的线程上,因为它产生的消息比工作线程处理它们的速度快(因为工作线程休眠了半秒)——这意味着生产者将在对 [crossbeam_channel::Sender::send
] 调用时阻塞半秒,直到其中一个工作线程对信道中的数据处理完毕。也请注意,信道中的数据由最先接收它的任何工作线程调用,因此每个消息都传递给单个工作线程,而不是传递给两个工作线程。
通过迭代器 crossbeam_channel::Receiver::iter
方法从信道读取数据,这将会造成阻塞,要么等待新消息,要么直到信道关闭。因为信道是在 crossbeam::scope
范围内创建的,我们必须通过 drop
手动关闭它们,以防止整个程序阻塞工作线程的 for 循环。你可以将对 drop
的调用视作不再发送消息的信号。
extern crate crossbeam;
extern crate crossbeam_channel;
use std::thread;
use std::time::Duration;
use crossbeam_channel::bounded;
fn main() {
let (snd1, rcv1) = bounded(1);
let (snd2, rcv2) = bounded(1);
let n_msgs = 4;
let n_workers = 2;
crossbeam::scope(|s| {
// 生产者线程
s.spawn(|_| {
for i in 0..n_msgs {
snd1.send(i).unwrap();
println!("Source sent {}", i);
}
// 关闭信道 —— 这是退出的必要条件
// for 巡海在工作线程中
drop(snd1);
});
// 由 2 个县城并行处理
for _ in 0..n_workers {
// 从数据源发送数据到接收器,接收器接收数据
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
// 在不同的线程中衍生工人
s.spawn(move |_| {
thread::sleep(Duration::from_millis(500));
// 接收数据,直到信道关闭前
for msg in recvr.iter() {
println!("Worker {:?} received {}.",
thread::current().id(), msg);
sendr.send(msg * 2).unwrap();
}
});
}
// 关闭信道,否则接收器不会关闭
// 退出 for 循坏
drop(snd2);
// 接收器
for msg in rcv2.iter() {
println!("Sink received {}", msg);
}
}).unwrap();
}
在两个线程间传递数据
这个实例示范了在单生产者、单消费者(SPSC)环境中使用 crossbeam-channel。我们构建的生成短期线程实例中,使用 crossbeam::scope
和 Scope::spawn
来管理生产者线程。在两个线程之间,使用 crossbeam_channel::unbounded
信道交换数据,这意味着可存储消息的数量没有限制。生产者线程在消息之间休眠半秒。
use std::{thread, time};
use crossbeam_channel::unbounded;
fn main() {
let (snd, rcv) = unbounded();
let n_msgs = 5;
crossbeam::scope(|s| {
s.spawn(|_| {
for i in 0..n_msgs {
snd.send(i).unwrap();
thread::sleep(time::Duration::from_millis(100));
}
});
}).unwrap();
for _ in 0..n_msgs {
let msg = rcv.recv().unwrap();
println!("Received {}", msg);
}
}
保持全局可变状态
使用 lazy_static 声明全局状态。lazy_static 创建了一个全局可用的 static ref
,它需要 Mutex
来允许变化(请参阅 RwLock
)。在 Mutex
的包裹下,保证了状态不能被多个线程同时访问,从而防止出现争用情况。必须获取 MutexGuard
,方可读取或更改存储在 Mutex
中的值。
对所有 iso 文件的 SHA256 值并发求和
下面的实例计算了当前目录中每个扩展名为 iso 的文件的 SHA256 哈希值。线程池生成的线程数与使用 num_cpus::get
获取的系统内核数相等。Walkdir::new
遍历当前目录,并调用 execute
来执行读取和计算 SHA256 哈希值的操作。
将绘制分形的线程分派到线程池
此实例通过从朱莉娅集绘制分形来生成图像,该集合具有用于分布式计算的线程池。
使用 ImageBuffer::new
为指定宽度和高度的输出图像分配内存,Rgb::from_channels
信道则计算输出图像的 RGB 像素值。使用 ThreadPool
创建线程池,线程池中的线程数量和使用 num_cpus::get
获取的系统内核数相等。ThreadPool::execute
将每个像素作为单独的作业接收。
mpsc::channel
信道接收作业,Receiver::recv
接收器则检索作业。ImageBuffer::put_pixel
处理数据,设置像素颜色。最后,ImageBuffer::save
将图像存储为 output.png
。