Rust 并发编程基础:第三方并发模型
线程安全
线程安全指的是在有多个线程执行或访问的时候,不会有意外的行为发生。也就是说,多个线程 读取 到的数据是一致的,而多个线程写入时候不会出现数据损坏。
Rust 仅保护用户不会出现数据竞争问题,但并不能保证不会出现死锁。 死锁某种意义上属于业务逻辑层面的错误,很难被发现,可以借助 parking_lot 程序库来解决这个问题。
Rust 避免数据竞争的方法是通过组合 trait 来进行约束,spawn
的声明是这样的:
1 | pub fn spawn<F, T>(f: F) -> JoinHandle<T> |
首先有两个泛型 F
、T
,接受一个参数 f
,返回泛型是 JoinHandle<T>
,在这之后 where
语句指定了 trait
特征。
FnOnce() -> T
:表示需要实现一个只能被调用一次的闭包。换个说法,f
是一个闭包,通过值获取所有内容并移动从环境中引用的项。Send + 'static
:闭包必须拥有Send
,因为需要按值传递给新线程。Send
表示从线程传递到线程是安全的,类似的,Sync
则表示线程之间传递引用是安全的。'static
则表示需要在整个生命周期内存活,因为新线程可能比它的调用者存活得更久,我们并不知道它什么时候返回,因此需要尽可能长久有效,直到程序结束。
Actor 模型
Actor 模型是一种概念模型,它使用名为 actors 的实体在 类型层面 实现并发。在 1973 年由 Carl Eddie Hewitt 首次提出,避免了锁和同步,并提供了一种向系统引入并发的简洁方法。
Actor 并发模式在 Erlang 之后流行起来。Erlang 是电信行业中非常流行的函数式编程语言,以其健壮性和天然的分布式特性而闻名。可以想到,Actor 模型设计本身就比较适合解决地理分布型问题,同时还具备很好的容错性。
Actor 模型由三个部分组成:
- Actor:这是 Actor 模型的核心。每个 Actor 包含地址,使用这个地址可以将消息发送到某个 actor 和
邮箱。可以理解为这是一个存储收到的消息的队列。 - FIFO:队列通常都是先进先出(First In First Out, FIFO)。actor 的地址是必须的,这样其它的 actor 才可以给它发送消息。
- Messages:Actor 之间仅通过消息进行通信,由 Actor 异步处理。
Actix
在 Cargo.toml
中引入 actix 依赖
1 | [dependencies] |
初始化系统
1 | use std::io; |
Actix 使用 Tokio 作为运行时。System::new()
创建了一个新的事件循环。System::run()
启动 Tokio 的事件循环。(形象一点,相当于开了一家邮局)
在 Actix 中,actor 的创建遵循简单的步骤:
- 创建一个类型
- 定义一个消息
- actor 类型实现消息的处理程序
- 可以将它加入到某个仲裁器(arbiter)中
每个 actor 都是在仲裁器中运行。因此,创建 actor 之后,它不会立即执行,而需要等到将 actor 放入到仲裁器线程中,它们才会开始执行。
定义 actor
1 | use std::io; |
actor 有其 生命周期。可以通过实现 Actor
Trait 来定义相关的操作。
started
:会在 actor 启动的时候调用stopping
:Actor::Stopping
状态时被调用- actor 调用了
Context::stop
- 当前 actor 地址都被删除了或者上下文里没有了事件对象
- actor 调用了
started
方法中,println
之后就会发送关闭命令(程序终止)。
将 actor 加入 System,显然是在 block_on
中:
1 |
|
block_on
会阻塞线程,直到异步操作完成。这一点,从 block_on
这个名字上不难想到。
Actix 提供了一个宏简化 System 的初始化:
1 |
|
消息通信
actor 彼此之间的通信是靠 message 完成的。现在实现一个两数和加法,定义一个 Message,传入两个数字。定义 Message 的时候,必须指定返回的类型 rtype
:
1 |
|
定义一个 Actor,提供一个针对 Sum
消息的处理器 handle
:
1 | struct Calculator; |
由于我们使用了 #[actix::main]
可以使用 await
简洁地完成操作。
1 | // <- 启动系统,阻塞直到 future 都完成 |
await
会阻塞直到完成结果,因此 Sum
的顺序是固定的。
rayon
基于工作窃取算法的 数据并行 程序库,使编写并行代码变得非常简单。
如果你的算法支持分块,例如求和,就可以考虑使用并行。使用 rayon
会非常简单。
简单的求和:
1 | fn sum_of_squares(input: &[i32]) -> i32 { |
使用 rayon,只需要将 iter()
改成 par_iter()
:
1 | use rayon::prelude::*; |
至于有没有效果,我们可以考虑使用基准测试评估求解效率,例如 criterion 库:
数据量小的话并行其实差距不大,但是数据量大了,并行还是能提高效率的。当然,并行并不是就翻倍,会因为中间的划分,最终的整合消耗一部分计算性能。
crossbeam
提供了一套 Rust 并发编程的工具集。值得关注是 scope
,可以从其父堆栈访问数据,并保证在其父堆栈帧消失之前终止。
1 | use crossbeam::thread; |
parking_lot
提供了比标准库中现存的并发原语效率更高的替代方案。如果你觉得标准库里的 Mutex
或者 RwLock
性能无法满足需求,可以考虑使用这个库来提升效率,还提供了一个实验性的检查死锁的工具。
这是个实验性 API,需要开启 deadlock_detection
features:
1 | # ... |
使用 cargo 编译的时候加上 --features deadlock_detection
。
1 |
|
小结
Rust 可以通过第三方库使用其它的并发模型,例如 Actix 库提供的 Actor 模型、rayon 基于工作窃取算法提供的并行。我们根据需要选择最适合的并发工具。