线程安全

线程安全指的是在有多个线程执行或访问的时候,不会有意外的行为发生。也就是说,多个线程 读取 到的数据是一致的,而多个线程写入时候不会出现数据损坏。

Rust 仅保护用户不会出现数据竞争问题,但并不能保证不会出现死锁。 死锁某种意义上属于业务逻辑层面的错误,很难被发现,可以借助 parking_lot 程序库来解决这个问题。

Rust 避免数据竞争的方法是通过组合 trait 来进行约束,spawn 的声明是这样的:

1
2
3
4
5
pub fn spawn<F, T>(f: F) -> JoinHandle<T> 
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,

首先有两个泛型 FT,接受一个参数 f,返回泛型是 JoinHandle<T>,在这之后 where 语句指定了 trait 特征。

  • FnOnce() -> T:表示需要实现一个只能被调用一次的闭包。换个说法,f 是一个闭包,通过值获取所有内容并移动从环境中引用的项。
  • Send + 'static:闭包必须拥有 Send,因为需要按值传递给新线程。Send 表示从线程传递到线程是安全的,类似的,Sync 则表示线程之间传递引用是安全的。'static 则表示需要在整个生命周期内存活,因为新线程可能比它的调用者存活得更久,我们并不知道它什么时候返回,因此需要尽可能长久有效,直到程序结束。

Actor 模型

Actor 模型是一种概念模型,它使用名为 actors 的实体在 类型层面 实现并发。在 1973 年由 Carl Eddie Hewitt 首次提出,避免了锁和同步,并提供了一种向系统引入并发的简洁方法。

Actor 并发模式在 Erlang 之后流行起来。Erlang 是电信行业中非常流行的函数式编程语言,以其健壮性和天然的分布式特性而闻名。可以想到,Actor 模型设计本身就比较适合解决地理分布型问题,同时还具备很好的容错性。

Actor 模型由三个部分组成:

  • Actor:这是 Actor 模型的核心。每个 Actor 包含地址,使用这个地址可以将消息发送到某个 actor 和
    邮箱。可以理解为这是一个存储收到的消息的队列。
  • FIFO:队列通常都是先进先出(First In First Out, FIFO)。actor 的地址是必须的,这样其它的 actor 才可以给它发送消息。
  • Messages:Actor 之间仅通过消息进行通信,由 Actor 异步处理。

Actix

Actix 库依赖 tokio 实现了 Actor 模型,

Cargo.toml 中引入 actix 依赖

1
2
[dependencies]
actix = "0.12"

初始化系统

1
2
3
4
5
6
7
8
use std::io;
use actix;

fn main() -> io::Result<()> {
let system = actix::System::new();

system.run()
}

Actix 使用 Tokio 作为运行时。System::new() 创建了一个新的事件循环。System::run() 启动 Tokio 的事件循环。(形象一点,相当于开了一家邮局)

在 Actix 中,actor 的创建遵循简单的步骤:

  1. 创建一个类型
  2. 定义一个消息
  3. actor 类型实现消息的处理程序
  4. 可以将它加入到某个仲裁器(arbiter)中

每个 actor 都是在仲裁器中运行。因此,创建 actor 之后,它不会立即执行,而需要等到将 actor 放入到仲裁器线程中,它们才会开始执行。

定义 actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::io;
use actix::{System, Actor, Context};

struct MyActor;

impl Actor for MyActor {
type Context = Context<Self>;

fn started(&mut self, _ctx: &mut Self::Context) {
println!("I am alive!");
System::current().stop(); // <- stop system
}
}


fn main() -> io::Result<()> {
let system = System::new();

system.block_on(async { MyActor.start() });

system.run()
}

actor 有其 生命周期。可以通过实现 Actor Trait 来定义相关的操作。

  • started:会在 actor 启动的时候调用
  • stoppingActor::Stopping 状态时被调用
    • actor 调用了 Context::stop
    • 当前 actor 地址都被删除了或者上下文里没有了事件对象

started 方法中,println 之后就会发送关闭命令(程序终止)。

将 actor 加入 System,显然是在 block_on 中:

1
2
3
4
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.rt.block_on(fut)
}

block_on 会阻塞线程,直到异步操作完成。这一点,从 block_on 这个名字上不难想到。

Actix 提供了一个宏简化 System 的初始化:

1
2
3
4
#[actix::main]
async fn main(){
MyActor.start();
}

消息通信

actor 彼此之间的通信是靠 message 完成的。现在实现一个两数和加法,定义一个 Message,传入两个数字。定义 Message 的时候,必须指定返回的类型 rtype

1
2
3
#[derive(Message)]
#[rtype(result = "usize")]
struct Sum(usize, usize);

定义一个 Actor,提供一个针对 Sum 消息的处理器 handle

1
2
3
4
5
6
7
8
9
10
11
12
13
struct Calculator;

impl Actor for Calculator {
type Context = Context<Self>;
}

impl Handler<Sum> for Calculator {
type Result = usize; // <- 消息响应类型

fn handle(&mut self, msg: Sum, _ctx: &mut Context<Self>) -> Self::Result {
msg.0 + msg.1
}
}

由于我们使用了 #[actix::main] 可以使用 await 简洁地完成操作。

1
2
3
4
5
6
7
8
9
10
11
12
#[actix::main] // <- 启动系统,阻塞直到 future 都完成
async fn main() {
let addr = Calculator.start();

for n in 5..10 {
let res = addr.send(Sum(n, n+1)).await;
match res {
Ok(result) => println!("SUM: {}", result),
_ => println!("Communication to the actor has failed"),
}
}
}

await 会阻塞直到完成结果,因此 Sum 的顺序是固定的。

rayon

基于工作窃取算法的 数据并行 程序库,使编写并行代码变得非常简单。

如果你的算法支持分块,例如求和,就可以考虑使用并行。使用 rayon 会非常简单。

简单的求和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn sum_of_squares(input: &[i32]) -> i32 {
input.iter()
.map(|&i| i * i)
.sum()
}

fn main() {
let mut v = vec![];
for i in 1..1000 {
v.push(i);
}

println!("{}", sum_of_squares(&v));
}

使用 rayon,只需要将 iter() 改成 par_iter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use rayon::prelude::*;

fn sum_of_squares(input: &[i32]) -> i32 {
input.par_iter() // <- 只需改成 par_iter()
.map(|&i| i * i)
.sum()
}

fn main() {
let mut v = vec![];
for i in 1..1000 {
v.push(i);
}

println!("{}", sum_of_squares(&v));
}

至于有没有效果,我们可以考虑使用基准测试评估求解效率,例如 criterion 库:

数据量小的话并行其实差距不大,但是数据量大了,并行还是能提高效率的。当然,并行并不是就翻倍,会因为中间的划分,最终的整合消耗一部分计算性能。

crossbeam

提供了一套 Rust 并发编程的工具集。值得关注是 scope,可以从其父堆栈访问数据,并保证在其父堆栈帧消失之前终止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use crossbeam::thread;

fn main() {
let var = vec![1, 2, 3];

thread::scope(|s| {
s.spawn(|_| {
println!("A child thread borrowing `var`: {:?}", var);
});
})
.unwrap();

println!("{:?}", var);
}

parking_lot

提供了比标准库中现存的并发原语效率更高的替代方案。如果你觉得标准库里的 Mutex 或者 RwLock 性能无法满足需求,可以考虑使用这个库来提升效率,还提供了一个实验性的检查死锁的工具。

这是个实验性 API,需要开启 deadlock_detection features:

1
2
3
4
5
6
7
# ...

[dependencies]
parking_lot = {version = "0.12.0", optional = true, features = ["deadlock_detection"]}

[features]
deadlock_detection = ["parking_lot"]

使用 cargo 编译的时候加上 --features deadlock_detection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#[cfg(feature = "deadlock_detection")]
{
use parking_lot::deadlock;
use std::thread;
use std::time::Duration;
// Create a background thread which checks for deadlocks every 10s
thread::spawn(move || loop {
println!("deadlocks detecting...");
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}
println!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
println!("Deadlock #{}", i);
for t in threads {
println!("Thread Id {:#?}", t.thread_id());
println!("{:#?}", t.backtrace());
}
}
});
}

小结

Rust 可以通过第三方库使用其它的并发模型,例如 Actix 库提供的 Actor 模型、rayon 基于工作窃取算法提供的并行。我们根据需要选择最适合的并发工具。