tokio 提供了几种不同功能的channel,用于异步任务之间的消息传递:
- oneshot: 一对一发送的一次性channel,该channel只能由一个发送者发送最多一个数据,且只有一个接收者接收数据
- mpsc: 多对一发送, 该channel 可以同时由多个发送者发送数据, 但只有一个接收者接收数据
- broadcast: 多对多发送, 该channel可以同时有多个发送者发送数据, 也可以有多个接收者接受数据
- watch: 一对多发送, 该channel只能有一个发送者发送数据, 可以有多个接收者接受数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| let (tx, rx) = oneshot::channel::<i32>();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::select! {
_ = tx.closed() => {
// 先等待到了对端关闭,不做任何事,select!会自动取消其它分支的任务
}
value = compute() => {
// 先计算得到结果,则发送给对端
// 但有可能刚计算完成,尚未发送时,对端刚好关闭,因此可能发送失败
// 此处丢弃发送失败的错误
let _ = tx.send(value);
}
}
});
// receiver 没有recv() 方法, rx 本身实现了Future Trait, 执行时对应的异步任务
match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("the sender dropped"),
// Err(e: RecvError) => xxx,
}
|
mpsc channel 的特性是有多个发送者一个接收者。mpsc通道分为两种:
- bounded channel: 有界通道,通道有容量限制,即通道中最多可以存放指定数量(至少为1)的消息,通过
mpsc::channel()
创建 - unbounded channel: 无界通道,通道中可以无限存放消息,直到内存耗尽,通过
mpsc::unbounded_channel()
创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // tx是Sender端,rx是Receiver端
// 接收端接收数据时需修改状态,因此声明为mut
let (tx, mut rx) = mpsc::channel(100);
use tokio::{ self, runtime::Runtime, sync };
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let (tx, mut rx) = sync::mpsc::channel::<i32>(10);
tokio::spawn(async move {
for i in 1..=10 {
// if let Err(_) = tx.send(i).await {}
if tx.send(i).await.is_err() {
println!("receiver closed");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("received: {}", i);
}
});
}
|
tokio 高级用法 - lack 的个人博客
Rust笔记----tokio中的channel - fan-tastic.fun