Rust异步框架之Tokio

Rust异步框架之Tokio

简介

tokio中的管道

tokio 提供了几种不同功能的channel,用于异步任务之间的消息传递:

  • oneshot: 一对一发送的一次性channel,该channel只能由一个发送者发送最多一个数据,且只有一个接收者接收数据
  • mpsc: 多对一发送, 该channel 可以同时由多个发送者发送数据, 但只有一个接收者接收数据
  • broadcast: 多对多发送, 该channel可以同时有多个发送者发送数据, 也可以有多个接收者接受数据
  • watch: 一对多发送, 该channel只能有一个发送者发送数据, 可以有多个接收者接受数据

oneshot

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
let (tx, rx) = oneshot::channel::<i32>();
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
  tokio::select! {
    _ = tx.closed() => {
      // 先等待到了对端关闭,不做任何事,select!会自动取消其它分支的任务
    }
    value = compute() => {
      // 先计算得到结果,则发送给对端
      // 但有可能刚计算完成,尚未发送时,对端刚好关闭,因此可能发送失败
      // 此处丢弃发送失败的错误
      let _ = tx.send(value);
    }
  }
});

// receiver 没有recv() 方法, rx 本身实现了Future Trait, 执行时对应的异步任务
match rx.await {
  Ok(v) => println!("got = {:?}", v),
  Err(_) => println!("the sender dropped"),
  // Err(e: RecvError) => xxx,
}

mpsc

mpsc channel 的特性是有多个发送者一个接收者。mpsc通道分为两种:

  • bounded channel: 有界通道,通道有容量限制,即通道中最多可以存放指定数量(至少为1)的消息,通过mpsc::channel()创建
  • unbounded channel: 无界通道,通道中可以无限存放消息,直到内存耗尽,通过mpsc::unbounded_channel()创建
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// tx是Sender端,rx是Receiver端
// 接收端接收数据时需修改状态,因此声明为mut
let (tx, mut rx) = mpsc::channel(100);


use tokio::{ self, runtime::Runtime, sync };

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        let (tx, mut rx) = sync::mpsc::channel::<i32>(10);

        tokio::spawn(async move {
            for i in 1..=10 {
                // if let Err(_) = tx.send(i).await {}
                if tx.send(i).await.is_err() {
                    println!("receiver closed");
                    return;
                }
            }
        });

        while let Some(i) = rx.recv().await {
            println!("received: {}", i);
        }
    });
}

参考

  1. tokio 高级用法 - lack 的个人博客

  2. Rust笔记----tokio中的channel - fan-tastic.fun

updatedupdated2024-05-102024-05-10