1 Star 0 Fork 11

coder_lw / wiki

forked from deepinwiki / wiki 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
rust 异步开发.md 34.46 KB
一键复制 编辑 原始数据 按行查看 历史
htqx 提交于 2023-03-15 23:53 . rust 异步开发

rust 异步开发

前言

rust 很重要的应用场合就是异步开发。它的类型系统很擅长处理异步数据安全,性能也很高。不过相关的库还不是很成熟,比较难用。

原生异步接口

原生线程

  1. thread::spawn(F)->JoinHandle<T>
    1. F:FnOnce()->T
    2. F:Send + 'static
    3. T:Send + 'static
  2. JoinHandle<T>
    1. thread(&self)->&Thread
    2. join(self)->Result<T>

可见,原生的线程要求跨线程的数据具备 'static 生命周期。和接受一个零参数的执行函数。不过可以利用闭包输入参数,但要符合 Send 约束。

use std::thread;

let hander = thread::spawn(||{42});
hander.join.unwrap(); // 同步等待结果

原生线程简单,但是它使用了系统有限的线程资源,因而代价太大。

在语言(运行时)层面提供的方案:

  1. 绿色线程:保存上下文,来回切换
  2. 基于回调:没有上下文切换,但需要任务自己保存状态
  3. promises(承诺) / 组合子(combinators):改写回调的复杂调度关系,由状态机决定下一步
    1. pending : 等候
    2. fulfilled: 完成
    3. rejected: 拒绝
  4. futures(期货):相当于延迟执行(lazy evaluated)的 promises
    1. ready: 就绪
    2. pending: 等候

send / sync

特征:

  1. send : 对象可以跨线程传递
  2. sync : 对象可以跨线程共享

哪些能跨线程传递?

  1. 基本类型
  2. 大部分类型
// & mut
impl<'_, T> Send for &'_ mut T
where
    T: Send + ?Sized

// &
impl<'_, T> Send for &'_ T
where
    T: Sync + ?Sized

哪些能跨线程共享?

  1. 基本类型,i32 等
  2. 大部分类型
  3. 定义:只有 &T 是 Send 时, T 才是 Sync

编译器会在符合安全条件下自动实现这两个特征。

那究竟怎样的条件才是符合? rust 安全界限内的产品,都是符合的。它实际上是借用规则的翻版。

  1. String : Send ,因为移动所有权,只有一个对象拥有所有权,不会出现两个线程拥有所有权
  2. &mut String :Send , 因为只有一个可读借用,逻辑同上
  3. &String : Send ,因为是只读借用,任意多也不会修改对象
    1. 所以根据定义: String : Sync 支持共享,是同步对象

可见同步规则都没有违反借用规则。也就是借用规则和同步规则是类似的。那为什么还需要定义同步规则呢(Send / Sync)?

这是因为我们之前引入了内部可变性类型,这些类型中,有一部分是不支持同步的。他们在穿越线程时,会有问题。比如: Rc, RefCell。另外,其他不安全组件,如原生指针,或者封装了不安全代码却没有对跨线程优化的组件,也是不支持跨线程的。

也就是如果经过对跨线程精心优化的不安全组件,它大可以通过标记 Send、Sync 来表明支持同步。这就是 rust 类库的做法,也是为什么需要标记的原因。

问题二:为什么需要封装成智能指针,而不用原生的借用?

这就要从生命周期去理解,线程什么时候消亡,这个是很难确定的,也很难去安排不同线程的读写顺序,就算安排好了,颗粒度也是比较粗糙的,并发性能会降低。也就是在多线程环境下,往往都是要求你同时拥有读写线程的,不能说等到读线程完毕,再来启动写线程。因此静态的借用规则就完全没有用了,只能依赖智能指针在运行时动态执行借用规则。

不过好消息是:借用规则永远值得信赖!只要借用规则执行到位,不管是静态还是动态,它实际都是安全的。这并不影响 rust 安全语言的承诺。

// 因为不同线程,所以 a 可能比 pa 更快挂掉,pa 将指向无效地址
// 所以 spawn 对传递的生命周期要求是 'static 的
// 简而言之,普通的借用不适合用于传递到另一个线程
let a = 1;
let pa = &a;
thread::spawn(move ||pa); 

// 转移所有权
// 一个普通对象转移所有权,它就到了另一个线程
// 没办法共享的使用这个对象
let a= 1;
thread::spawn(move ||a);
thread::spwan(move ||a); // 已经在上一步转移了

// 转移智能指针的所有权
// 智能指针自身实现了内部共享持有对象的方法
// 因为并不是传递引用,而是智能指针的所有权,所以没有生命周期的问题
let a = 1;
let pa = Arc::new(a); // 转移所有权到智能指针内部(托管)
let c1 = pa.clone();
let c2 = pa.clone();// 克隆体管理的是同一个内部对象
thread::spawn(move ||c1);
thread::spawn(move ||c2); 

// 需要写操作,可以加一个内部可写的包装
let a = 1;
let pa = Arc::new(Muttex::new(a));
thread::spawn(move || *pa.lock().unwrap() += 1);

Futures

Futures(期货)是一种叫协程的技术。协程和多线程的区别是,协程它是基于任务视角,如果一个任务开始执行,但没有结束,另一个任务又开始,他们就是并发的。任务之间通过协作的方式,暂时停止自己,去执行另一个任务,又由另一个任务在适当的时候重启自己。

这段表述有两个重点:

  1. 协作式(多线程很多是抢占式)
  2. 协程可以在一个线程中实现

协程因为它并不是一定要开启新的线程(线程是有限的系统资源),所以它相对而言更加高效率。

它有点类似 javascript 中的 Promise (承诺)。这两个都是一种寓意,即对未来值的封装。协程的结果就是未来值,对于同步代码来说,它不关心(封装)辅助线程是怎么运作的,它只在乎它的结果。

可见,这是以一种用同步视角来处理异步问题的方式,所以更受程序员的欢迎。

如果你了解 Promise 异步模型,它核心技术就是一个状态机。它能够在函数的某个点暂停,然后从这个点恢复执行。基本原理是类似 goto 跳转,在程序片段间插入各种跳转点,记录上次的跳出的位置,然后这次跳转回去,从而达到暂停继续的效果。

异步函数转化为 future 状态机,从而能够在函数之间的某个点之间暂停和恢复。暂停之后去哪里?这里就需要一个调度器(执行器),来控制不同的 feture 对象,一个对象释放控制权,调度器就运行另一个对象。

future 对象提供了 poll 轮询结构,让调度器可以知道对象的状态,Ready(T) 就绪携带运行结果 T,而 Pending 等待可以让调度可以去调用队列中的下一个 future 对象。这个模型严格来说还是靠对象之间自主的释放控制权,是协作式的异步模型。

那么调度器该何时再来轮询 pending 后的对象呢?

不停的轮询?rust 的设计是让事件源(reactor) 通过唤醒器(wake)来通知执行器(executor),然后执行器就可以把 future 对象加入轮询队列中继续轮询。

而唤醒器是在执行器 poll future 的时候通过参数传递进去的。future 遇到 pending 的时候,就将自己和唤醒器注册到 reactor 反应器,而响应器直接和系统异步 io 通信,io 就绪的时候再通过唤醒器通知执行器。

这个机制比较复杂的属于这个部分。因为涉及到底层系统 io,它现在也还不是标准库的一部分,由第三方库提供支持。

构成:

  1. 状态机 : future 对象
    1. 生成器: 以直观的方式构造状态机
      1. async / await 返回一个 future 对象
  2. 执行器 (executor) : 对状态机进行轮询调度
  3. 反应器(reactor) / 事件源: 等待事件发生,通知执行器唤醒状态机
    1. 唤醒器(waker)和上下文(context) : 负责协调执行器和事件源的通信

其中,并发处理的执行器和反应器,在 rust 中不是内置部分(标准库),而设计成由第三方库(异步运行时)来独立提供:

  1. async-std
  2. tokio
  3. smol

标准库则提供了:

  1. Future trait
  2. async / await
  3. Waker

准标准库 futures (里面的概念将来可能会成为标准,所以要优先使用,避免自造):

  1. channel: 通道
    1. oneshot : 单次通道
    2. mpsc: 多生产,单消费
  2. executor : 执行器
  3. future : 期货/组合子算式
  4. io: 异步io api
  5. lock: 同步锁
  6. never: 永不值(即!)
  7. sink:异步接收器
  8. stream:异步流(类似迭代器)
  9. task:任务/上下文/唤醒器

概念:

  1. 叶子期货(leaf futures): 实际的底层非阻塞运行的 futures
  2. 非叶子期货(non leaf futures): 如 async 创建的包装的 future.这是一种可暂停的状态机。

经典流程:执行器内部建立一个任务队列,里面是 future 对象,读取和轮询其中一个,将执行器当前的上下文状态、唤醒器和 future 状态机绑定,这一步是为了让状态机知道怎么和控制它的执行器通信(当然实际状态机会将通信任务委派给 reactor 反应器)。

当状态机遇到中断(如 pending),状态机将在反应器注册等待事件(将唤醒器传递给反应器),然后控制权回到执行器,所以执行器可以开始轮询下一个 futures 对象。当反应器中的等待事件到来,即表示第一个对象准备就绪,反应器会通过唤醒器通知执行器将中断了的对象重新放入轮询队列中。

因为这个唤醒器是执行器定制的,所以它能够控制执行器来再次轮询 future 对象。

用异步领域的话:执行器扮演计算资源的角色,而反应器扮演 IO 资源的角色。rust 中的设计,让你可以随意搭配不同的执行器和反应器。因为存在媒介(waker)。

它的简化模型:

// 线程运行状态信息
enum Poll<T>{
   Ready(T), // 完毕,携带返回信息 T
   Pending, // 运行中
}
// 基于轮询的多线程接口
trait Future {
   type Output;
   fn poll(&mut self, wake:fn())->Poll<Self::Output>;
}

机制:

  1. poll() : 轮询线程状态
  2. wake: 当遇到特定事件,调用 wake 回调函数。

真实的样子:

pub trait Future {
    type Output; // 返回类型
    pub fn poll( // 轮询主函数
        self: Pin<&mut Self>,  // 固定指针的 self
        cx: &mut Context<'_> // 上下文和回调函数
    ) -> Poll<Self::Output>; // 状态和返回值
}

pub enum Poll<T> {
    Ready(T), // 得到返回值
    Pending, // 运行中
}

pub struct Pin<P>  // 固定指针位置
where P:Deref,
<P as Deref>::Target: Unpin { // 指针目标类型为 Unpin
   const fn new(P)->Pin<P>, // 固定包装
   const fn into_inner(Pin<P>)->P, // 解包装
   const unsafe fn new_unchecked(P)->Pin<P>, // !Unpin 也可以
}

pub auto trait Unpin { } // 支持解除移动的指针

pub struct Context<'a> {  // 上下文
   fn from_waker(&'a Waker)->Context<'a>, // 包装 Waker
   fn waker(&self)->&'a Waker, // 解包装
}

pub struct Waker { // 唤醒器
   fn wake(self), // 唤醒
   fn wake_by_ref(&self), // 唤醒
   fn will_wake(&self, &Waker)->bool, // 是否同一个任务
   unsafe fn from_raw(RawWaker)->Waker, // 从原始唤醒器创建一个唤醒器
   fn from(Arc<W:'static + Wake + Send + Sync>)->Waker, // 从 Arc<W> 唤醒器创建一个唤醒器
}

pub trait Wake { // 唤醒器
    pub fn wake(self: Arc<Self>); // 唤醒
    pub fn wake_by_ref(self: &Arc<Self>) { ... } // 唤醒
}

pub struct RawWaker { // 原始唤醒器
   const fn new(*const(), &'static RawWAkerVTable)->RawWaker, // 数据指针,虚函数表指针
   fn from(Arc<W>)->RawWaker, // 从 Arc<W> 唤醒器创建一个原始唤醒器
}

pub struct RawWakerVTable { // 虚函数表
   const fn new(
      clone: unsafe fn(*const ()) -> RawWaker, // 克隆时调用
      wake: unsafe fn(*const ()), // 唤醒时调用
      wake_by_ref: unsafe fn(*const ()), // 唤醒时调用
      drop: unsafe fn(*const ()) // 释放时调用
   )->RawWakerVTable
}

备注:rust 关于 async /await 的架构上还在探索,很多代码没有成为正式版。现在只需要了解一下高层架构,和怎么使用。

吐槽:个人认为 rust 的异步架构还是相当复杂,甚至对新手来说有点恶心的。需要从易用性上做改进,不要把入门的问题复杂化,去提供一个无所不包,面面俱到的超级框架。这种东西对学习来说是一种负担,它应该用在复杂的工程应用,而对大多数初级程序员来说是浪费精力。

言归正传,唤醒器的设计看上去挺复杂,它实际目的是实现动态分配,为啥不用特征对象这种 rust 原生的动态分配方案呢?一个原因它要实现克隆,这是原生做不到的,另一个原因似乎是为了更大范围的应用,比如嵌入式。如果你了解动态分配,它的原理实际上就是使用类似虚函数表这种数据结构,只不过由程序员自己处理罢了。

这里面并没有执行器和反应器的相关 api。

执行器的原理是调用 future 的 poll 来得到状态。这个 poll 是在执行器上运行的,所以它被归类为计算资源。要怎样实现一个好的计算分配,可以让其在多个线程上运行,拥有任务队列,接收不同的 future 请求。

反应器是处理事件,所谓事件很大程度就是 io 事件,比如读取网络,这些在系统中是原生的,就是异步运行机制的,而反应器封装系统原生的这些机制,触发相关事件。所以它对于它的开发者来说,主要是一个封装的任务。

可以清晰的观察到这两个东西,是负责不同的任务的。rust 标准库并没有包含反应器,可能正是因为它和系统底层机制有很大的关联。

对于初学者来说,可能会质疑 rust 的异步机制为什么非要设计得那么复杂,如果有异步,就开启一个异步线程执行不就完了,费那么多事干嘛?但我认为,需要异步的人可能很大程度都是这种高性能要求的客户,所以 rust 就这么设计了。

Pin

自引用结构需要使用 Pin 固定指针。否则有些内存操作将会不安全。

Waker

Context 只是对 Waker 的简单包装。Waker 是一个实现动态分派 api 的结构。目的是可以适配不同的执行器,从而对执行器 executor 和反应器 reactor 解耦合。

机制演示

对于 future 机制的简单演示.要注意, 机制和使用是两码事.我们不需要去做这些事情,也可以使用 async 来开发,也就是我们实际上只需要编写 async 函数。

fn backcall() { // 回调函数
   println!("backcall...");
}
struct FnWaker(fn()); // 自定义 waker
impl Wake for FnWaker {
   fn wake(self: Arc<Self>) {
      self.0();
   }
}
async fn f1()->i32{ // future
   println!("async ...");
   108
}

#[derive(Default)]
pub struct xFuture(i32); // 自定义 future, future 可能会跨线程,这里简化了成员的同步封装

impl Future for xFuture {
type Output = i32;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
   match self.0 {
      3 => Poll::Ready(3), // 执行结束
      _ => {
         self.get_mut().0 += 1; // 修改 future 状态
         cx.waker().wake_by_ref(); // 回调函数
         Poll::Pending // 执行器(有可能)根据这个返回状态暂停线程
      }
   }
}
}

fn run(){
   let waker = Arc::new(FnWaker(backcall)); // 自定义 wake
   let waker = Waker::from(waker); // 转换成Waker
   let mut cx = Context::from_waker(&waker); // 再转换成 Context
   let mut f = f1(); // future(可以自定义,async 的定义有点不一样)
   let mut f = xFuture::default(); // 使用自定义 future
   let pf = unsafe { Pin::new_unchecked(&mut f) }; // 固定指针
   loop { // 轮询
      match pf.poll(&mut cx) { 
         Poll::Ready(result) => { // async 这里只会同步处理
            println!("Ready:{}", result);
            return result; // 跳出
         }
         Poll::Pending => { // async 不运行,因为其中没有pending 状态,自己定义的可以.
            println!("pending...");
         }
      }
   }
}

注解:

  1. run() 是执行器,这里只是轮询一个 future,实际应该是一个队列
  2. f1() 这个是普通的 async 函数,这种是不会有pending的,因为它压根没有调用什么实际的异步函数(或await 叶子 future)。而实际叶子 future 会有 pending 状态的。叶子 future 实际上就是类似以上自定义的 xFuture 实现,会定义 pending 状态该干什么。而非叶子 future 它只是 poll 叶子节点,如果叶子有 pending 它才会 pending,否则就只有 ready,也就是它自身并不会主动定义什么 pending。强调这点,是避免误会,async 或者 await 并不一定会产生中断的效果。
  3. FnWaker 保存执行器一部分信息(唤醒器),这里是一个没有意义的回调函数,实际这个函数应该是让当前 future 重新回到执行器的任务队列,供下次轮询。
    1. wake() 函数实际是并不是由 future 使用,而是注册给 reactor 反应器根据事件是否完成来调用。这里为了简易起见,由 pending 时直接调用了。

从使用者角度,我们并不需要去轮询什么 future ,以上都是运行机制的一部分,都是库作者应该去考虑的。我们要做的只是写好 async 函数(并配置好执行环境)。

另外一个悲剧的事实,就是标准库(STD)还没有完整的包含这套东西的实现(实现但没完全实现),而只是实现了语言层面的支持,如 async / await,也就是 future 本身。其他如执行器,waker 依赖第三方库来实现(新人噩梦):

  1. std::future : 官方只是实现了语言层面的内容,如 async / await,而功能支持并没有标准化。参考:《rust 异步编程》(Asynchronous Programming in Rust):https://rust-lang.github.io/async-book/
  2. futures : 技术前瞻(原始实现,很多内容已经转化为 std),扩展功能大部分被移到 FuturesExt。
  3. async_std : 基于 std 的扩展,提供基础支持。参考:《async-std》:https://learnku.com/docs/rust-async-std/std-and-library-futures/7149
  4. tokio : 商业化比较成功的异步库。 文档:https://tokio-zh.github.io/document/getting-started/hello-world.html

async / await

async 本质上是语法糖。也就是编译器背后会帮我们生成大量的实现代码。其原理是: async --> generator(生成器) --> GenFuture(生成的实体) --> impl Future(实现了Future 特征)。

生成器这个概念在 iterator 迭代器中会涉及,具体可以查阅相关知识。也就是 async 背后是用生成器生成了一个实现 Future 的特征对象。

而 await 也是一个语法糖,它实际上是在 GenFuture 实体内部生成一个简单的执行器,对 Future 对象进行 poll 轮询(loop)。

也就是说,大体上,现在要开发异步程序,应该从 tokio 或者 async_std 出发,其中 tokio 比较成熟, async_std 比较新。

明确一下相关概念:

  1. 并发:一个任务没结束之前另一个任务就启动了。但两个任务不要求同时执行(并行)
  2. 并行:两个以上的任务同时执行
  3. 同步 / 继发:等待上一个任务完成再执行下一个任务
  4. 异步 / 并发:不等待立即执行下一步
  5. 数据同步:对数据的操作是同步的(隐含意思:有序的,协调的),异步操作往往要对读写数据进行同步,这两个概念并不冲突!

这里面有一些比较微妙的点,那就是异步的定义,我们可以理解为并发,也可以理解为并行。并发和并行的差异在于同一时刻是否执行两个以上的任务,并发只是某一时间段,存在两个任务,这可以通过切换当前执行的任务来实现。

并发是宏观的,时间段的。并行是微观的,时刻的。

rust 需要借助第三方库,实现真正的多线程调度。

范例

异步编程可以说是一种范式,它不同于基于原生多线程的做法。原生多线程它是创建一个独立运行的任务,然后等待它,并最后同步数据。这种方式其实比较容易理解,但是它占用较多的系统资源,尤其在 io 处理比较频繁的时候是不合适的。

异步编程,它并不等同于多线程,它强调的是并发,就是多个任务同时启动这点,如果把任务分割交替执行,那么它就是异步,它并不需要多线程。

future 就是这种分割的抽象,值会有两种状态,一种是 pending 等候,一种是 ready 就绪,等候的时候可以分配 cpu 给下一个任务执行,从而达到并发的效果。

而 async / await 是从普通函数中创建这种抽象包装的便利工具。利用这套工具,我们就可以进行所谓的异步编程,它本质上是模拟同步编程的编写流程。但又不等于同步编程,你得想办法区分哪些是同步操作,哪些是异步操作,想办法让两个任务能够尽可能的并发运行。

fn s1(){}
fn s2(){}
fn main(){
   s1();
   s2();
}

// 异步 A
// 虽然确实是异步代码,实际操作却亦然是 s1 --> s2
// 因为 s1 和 s2 是有先后顺序的
// await 和多线程不一样,并不会启动一个新的线程
// 它和同步代码类似,遵从代码的顺序来执行
// 那为什么要用异步?
async s1(){}
async s2(){}
fn main(){
   let run = async||{
      s1().await;
      s2().await;
   }

   block_on(run); // 阻塞当前线程并启动异步函数
}

// 异步 B
// 但是你可以用 join! 宏,它告诉执行器可以按任意顺序来执行
// 代码 A 也并非毫无意义,因为每个 await 都意味着释放一次控制权
// 执行器才能接着运行其他任务
// 系统中的任务足够多,这样并发的可能性就越高了
let run = async||{
   futures::join!(s1,s2);
}

注意,其实 async 创建的 future 的 await 实际是不会返回 pending 的,但是如果 await 一个自定义的 future,那么实际上会等于在内部有个执行器,poll 自定义的 future,这时如果遇到 pending,才会真正返回 pending。

因此,await并不代表一定会 pending,await 是相等于内部 poll 轮询,遇到真正的 pending (一般是精心设计的自定义 future 才有)才会中断当前流程。这一点是需要特别留意的。

async_std 库

原始的 future 是通过一个独立的 futures 库提供的,现在已经加核心部分纳入 std 标准库中,而其扩展部分归类到 FutureExt 。

  1. std::future::Future : 官方核心实现
  2. futures::future::Future : 原始的参考实现
  3. futures::future::FutureExt: 扩展

async_std 不依赖 futures 库,但是实现了类似的功能。

提供的基本接口:

  1. Read
  2. Write
  3. Seek
  4. BufRead
  5. Stream

期货(futures)的基本概念:

  1. 延迟计算
  2. 异步性
  3. 独立的执行策略

任务状态 Future:

  1. Ready(T) : 就绪并返回结果
  2. Pending : 等候

任务 Task(async_std 中的核心抽象):

  1. 单次分配
  2. 反向通道(返回结果或错误信息)
  3. 携带元数据
  4. 本地存储

任务类似线程,但它并不意义对应一个线程(可能多个任务共享同一个线程),也就是它比线程要轻量。

阻塞操作:将会停止关联的所有任务,而不只当前任务,应避免使用系统的线程阻塞。

smol

这是一个轻量级多线程库。

组成:

  1. futures-lite : 对应 futures 库的小型库
    1. future
      1. block_on(): 执行器
      2. pin!(): 将 future 钉在堆栈上
      3. ready!(): 摊开一个 poll 的值,如是 pending 则立即返回,值则保存到变量。
      4. thread_local!: 线程本地变量
      5. pending<T>(): 一个只会返回 Pending 的 future
      6. ready<T>(): 返回一个 Ready(T) 的 future
      7. poll_once(): poll一次,并返回其参数值 None/Some(T)
      8. poll_fn(): poll 并返回结果。
      9. yield_now(): 在 poll 函数中制造一次 pending
      10. zip(): 等待两个期货完成,返回两则的元组 (a,b)
      11. try_zip(): 同上,但出错立即返回
      12. or(): 返回两者首先完成的期货(若同时则优先位置1)
      13. race(): 同上,但同时完成则随机返回一个
      14. catch_unwind(): 异常处理
      15. Boxed<T>: Pin<Box<dyn Future<output = T> + Send + 'static>> 的别名
      16. BoxedLocal<T>: 类似上面,但是没有 Send 修饰
      17. FutureExt: 一些 Future 的扩展 api
        1. or()
        2. race()
        3. catch_unwind()
        4. boxed()
        5. boxed_local()
    2. io
      1. AsyncRead: 异步读特征
      2. AsyncBufRead: 异步带缓冲区读特征
      3. AsyncWrite: 异步写特征
      4. Unpin: 不可钉住特征(可移动)
      5. copy(reader, writer): 从读取器读取内容到写入器,知道读取器返回'EOF'(空字节)
      6. AssertAsync<T>: 将 Read、Write、Seek 对象包装成异步版本:AsyncRead、AsyncWrite、AsyncSeek
      7. BlockOn<T>: 将异步版本同步化(阻塞)。AsyncRead+Unpin、AsyncBufRead+Unpin、AsyncWrite+Unpin、AsyncSeek+Unpin 转换为 Read、BufRead、Write、Seek
      8. BufReader<R>: 读取器升级为带缓冲区版本。(只有缓冲区没有命中才会真实读取操作,所以性能会更高)
      9. BufWrite<W>: 写入器升级为带缓冲区版本。
      10. Cursor<T>: 读写游标
      11. empty(): 空读取器
      12. repeat(): 重复读取器,如'aaaa...'
      13. sink(): 类似 /dev/null 的空写入器
      14. AsyncBufReadExt : 一些 AsyncBufRead 的扩展 api
        1. fill_buf(): 返回内部缓冲区内容(如果为空自动填充)
        2. consume():消耗缓冲区指定字节数
        3. read_until(): 读取指定结束符的所有字节
        4. read_line():读取一行
        5. lines(): 读取所有行
        6. split(): 按指定分隔符读取所有行
      15. AsyncReadExt: 一些 AsyncRead 的扩展 api
        1. read() : 读取
        2. read_vectored():读取到切片
        3. read_to_end():读取到 Vec<u8>
        4. read_to_string(): 读取到字符串
        5. read_exact(): 读取到指定大小的缓冲区
        6. take():创建指定范围的适配器(视图)
        7. bytes() : 返回字节数组(视图)
        8. chain(): 将两个流串联起来的视图
        9. boxed_reader():装箱 Pin<Box<dyn AsyncRead + Send + 'a>>
      16. AsyncSeekExt: AsyncSeek 的扩展
        1. seek(): 定位
      17. AsyncWriteExt:AsyncWrite 的扩展
        1. write(): 写入
        2. write_vectored():将切片内容写入
        3. write_all(): 将缓冲流写入
        4. flush(): 写入并刷新缓冲区
        5. close(): 关闭写入器
        6. boxed_writer():装箱Pin<Box<dyn AsyncWrite + Send + 'a>>
      18. BoxedReader: Pin<Box<dyn AsyncRead + Send + 'static>>
      19. BoxedWriter: Pin<Box<dyn AsyncWrite + Send + 'static>>
      20. split(): 将流拆分为读取器和写入器
    3. stream
      1. block_on(): 将异步流同步化,即阻塞迭代器
      2. empty(): 空流
      3. iter(): 从迭代器创建流
      4. once(): 只有一个元素的流
      5. pending(): 始终等候(pending)的流
      6. poll_fn():从映射函数(FnMut(&mut Context<'_>)->Poll<Option<T>>)返回流
      7. repeat():重复流
      8. repeat_with(): 从闭包创建重复流
      9. unfold(): 从异步闭包 FnMut(T)->Future<Option<(Item,T)>> 创建流。
      10. try_unfold(): 同上,遇到错误返回。
      11. StreamExt: Stream 的扩展
        1. poll_next()
        2. next(): 下一项
        3. try_next(): 带返回错误版本
        4. count(): 总项数
        5. map(): 闭包映射 FnMut(Item)->T
        6. flat_map(): 映射然后摊平
        7. flatten(): 摊平
        8. then(): 异步闭包映射
        9. filter(): 谓词闭包筛选
        10. filter_map(): 映射到 Option<T>,筛选掉 None
        11. take(): 前 n 个元素的流视图
        12. take_while(): 满足谓词的流视图
        13. skip(): 跳过前 n 项的流视图
        14. skip_while():跳过满足谓词的流视图
        15. step_by(): 每次的步进(1-n)
        16. chain(): 串联两个流
        17. cloned(): 克隆
        18. copied(): 拷贝
        19. collect(): 转换成集合
        20. try_collect()
        21. partition(): 根据谓词分成真假两个集合
        22. fold(): 累计器
        23. try_fold()
        24. scan():映射到 Option<B>
        25. fuse():熔断,遇到 None 之后停止生成项目。
        26. cycle(): 回环流
        27. enumerate(): 映射到枚举:(index, item)
        28. inspect(): 巡视,即对每个项调用闭包
        29. nth(): 迭代到 n+1 的位置
        30. last(): 最后项
        31. find(): 查找第一次匹配谓词的项
        32. find_map(): 映射到 Option 并返回一个 Some
        33. position(): 查找第一个匹配谓词的项,并返回下标
        34. all(): 全部满足谓词
        35. any(): 任一满足谓词
        36. for_each(): 迭代闭包
        37. try_for_each()
        38. zip(): 两个流打包成一个元组为元素的流(如没配对就停止)
        39. unzip():元组流分解成两个集合
        40. or():合流,选择其中一个流(优先位置1)
        41. race():合流,没有优先
        42. boxed()
        43. boxed_local()
      12. Boxed<T>: Pin<Box<dyn Stream<Item = T> + Send + 'static>>
      13. Boxed_Local<T>: Pin<Box<dyn Stream<Item = T> + 'static>>
  2. blocking : 用异步方式封装同步 io
    1. Executor: 执行器
      1. spawn():产生任务
    2. UnBlock: 封装同步 io 成异步接口
    3. pipe() : 单生产单消费管道
  3. polling: 封装系统底层 io 事件接口
    1. syscall!: libc 函数调用
    2. Event: 事件(句柄、可读、可写)
    3. Poller: 轮询事件
      1. add(): 添加对事件的兴趣
      2. modify(): 修改之前的事件的兴趣(读写属性)
      3. delete(): 删除
      4. wait(): 等待事件
      5. notify(): 唤醒
    4. epoll : linux 的 io 事件封装
      1. Poller: (epoll句柄、事件句柄、时间句柄)
      2. Events: 事件列表(epoll_event list, size)
    5. wepoll: windows 的 io 事件封装
      1. wepoll!: wepoll_ffi 函数调用
      2. Poller: (we 句柄, 通知与否)
      3. Events:(epoll_event list, size)

executor 执行器(三种实现):

  1. thread-local : 单线程
  2. work-stealing : 窃取型多线程
  3. blocking executor : 阻塞型多线程
pub(crate) struct ThreadLocalExecutor {
    queue: RefCell<VecDeque<Runnable>>, //非并发主队列
    injector: Arc<SegQueue<Runnable>>, //唤醒并发队列
    event: IoEvent, // io 事件线程
}

impl ThreadLocalExecutor {
    pub fn new() -> ThreadLocalExecutor{} // 创建
    pub fn enter<T>(&self, f:impl FnOnce()-> T)->T{} //检测是否单例
    pub fn spawn<T:'static>(future: impl Future<Output = T> + 'static)-> Task<T>{} // 创建并调度 task
    pub fn execute(&self)->bool{} // 执行任务
}

附录

安装 rust 并配置 vscode:

  1. 安装 rust
  2. 更新 rust,使用 rustup 更新并设置默认的工具链为 nightly 版本
  3. 安装组件:cargo,rust-analyzer-preview,rust-src, rustfmt
  4. vscode 安装插件 rust-analyzer ,并设置 nightly 版本
  5. 设置环境变量(关键)
    1. export RUSTUP_UPDATE_ROOT=https://mirrors.tuna.tsinghua.edu.cn/rustup/rustup
    2. export RUSTUP_DIST_SERVER=https://mirrors.tuna.tsinghua.edu.cn/rustup
    3. export CARGO_HOME=$HOME/.cargo
    4. export RUSTUP_HOME=$HOME/.rustup
    5. PATH 添加 ~/.cargo/bin 到前面,因为 rustup 更新cargo 和 rustc 在这个目录
  6. 设置包镜像源: ~/.cargo/config
# gentoo 安装稳定版 rust
emerge -aq rust
# 设置 bash 变量
cat > ~/.bashrc << endl
PATH=\$HOME/.cargo/bin:\$PATH
export RUSTUP_UPDATE_ROOT=https://mirrors.tuna.tsinghua.edu.cn/rustup/rustup
export RUSTUP_DIST_SERVER=https://mirrors.tuna.tsinghua.edu.cn/rustup
export CARGO_HOME=\$HOME/.cargo
export RUSTUP_HOME=\$HOME/.rustup
endl

# 同样的方法,设置 X 窗口的环境变量 ~/.xprofile
nano ~/.xprofile

# 更新 rust 工具链到夜间版本
rustup toolchain install nightly
rustup default nightly

rustup component list # 查看已经安装的组件,如果没有,安装相关组件
rustup component add rust-analyzer-preview

# 编辑
nano ~/.cargo/config

输入:

[source] #源
[source.tsinghua]
registry="https://mirrors.tuna.tsinghua.edu.cn/git/crates.io-index.git"

[source.ustc]
registry = "http://mirrors.ustc.edu.cn/crates.io-index"
[source.crates-io]
replace-with = "tsinghua"

安装 vscode 插件(图):

安装 rust-analyzer

配置:

插件配置

参考

  1. 别混淆数据争用(data race) 和竞态条件(race condition): https://blog.csdn.net/gg_18826075157/article/details/72582939
  2. 200行代码讲透Rust Futures:https://stevenbai.top/rust/futures_explained_in_200_lines_of_rust/
  3. 使用MSVC工具链和VSCode搭建Rust环境: https://segmentfault.com/a/1190000039984109
  4. 理解Rust中的Futures (一)https://www.cnblogs.com/praying/p/14140680.html
  5. async-std 官方教程:https://learnku.com/docs/rust-async-std/std-and-library-futures/7149
  6. async-std 官方教程中文翻译:https://learnku.com/docs/rust-async-std/translation-notes/7132
  7. async-std 库:https://gitee.com/mirrors/async-std#https://book.async.rs
  8. tokio 文档: https://tokio-zh.github.io/document/getting-started/hello-world.html
  9. Rust异步浅谈: https://rustcc.cn/article?id=e6d50145-4bc2-4f1e-84da-c39c8217640b
  10. rust 异步编程:https://rust-lang.github.io/async-book/04_pinning/01_chapter.html
  11. rust 异步编程(中文翻译): https://learnku.com/docs/async-book/2018
  12. Rust Async: 标准库futures api解析: https://zhuanlan.zhihu.com/p/66028983
    1. Rust Async: Pin概念解析: https://zhuanlan.zhihu.com/p/67803708
    2. Rust Async: smol源码分析-Executor篇: https://zhuanlan.zhihu.com/p/137353103
  13. async/await 如何工作 | Rust学习笔记: https://blog.51cto.com/u_14915984/2536737
    1. 异步代码的几种写法: https://blog.51cto.com/u_14915984/2538928
    2. 再谈 Send 与 Sync: https://blog.51cto.com/u_14915984/2542027
  14. 深入理解 Linux 的 epoll 机制: https://zhuanlan.zhihu.com/p/393747291
  15. 通过 Futures 库分析加深对 Rust 异步机制的理解(视频):https://www.bilibili.com/video/BV1fR4y147BU?spm_id_from=333.999.0.0
  16. 什么是协程?: https://zhuanlan.zhihu.com/p/172471249
  17. Rust异步编程 Future && async/await(视频): https://www.bilibili.com/video/BV1XZ4y1c7tD?p=1
1
https://gitee.com/coder_lw/wiki.git
git@gitee.com:coder_lw/wiki.git
coder_lw
wiki
wiki
master

搜索帮助