同步操作将从 deepinwiki/wiki 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
rust 很重要的应用场合就是异步开发。它的类型系统很擅长处理异步数据安全,性能也很高。不过相关的库还不是很成熟,比较难用。
thread::spawn(F)->JoinHandle<T>
JoinHandle<T>
join(self)->Result<T>
可见,原生的线程要求跨线程的数据具备 'static 生命周期。和接受一个零参数的执行函数。不过可以利用闭包输入参数,但要符合 Send 约束。
use std::thread;
let hander = thread::spawn(||{42});
hander.join.unwrap(); // 同步等待结果
原生线程简单,但是它使用了系统有限的线程资源,因而代价太大。
在语言(运行时)层面提供的方案:
特征:
哪些能跨线程传递?
// & mut
impl<'_, T> Send for &'_ mut T
where
T: Send + ?Sized
// &
impl<'_, T> Send for &'_ T
where
T: Sync + ?Sized
哪些能跨线程共享?
编译器会在符合安全条件下自动实现这两个特征。
那究竟怎样的条件才是符合? rust 安全界限内的产品,都是符合的。它实际上是借用规则的翻版。
可见同步规则都没有违反借用规则。也就是借用规则和同步规则是类似的。那为什么还需要定义同步规则呢(Send / Sync)?
这是因为我们之前引入了内部可变性类型,这些类型中,有一部分是不支持同步的。他们在穿越线程时,会有问题。比如: Rc, RefCell。另外,其他不安全组件,如原生指针,或者封装了不安全代码却没有对跨线程优化的组件,也是不支持跨线程的。
也就是如果经过对跨线程精心优化的不安全组件,它大可以通过标记 Send、Sync 来表明支持同步。这就是 rust 类库的做法,也是为什么需要标记的原因。
问题二:为什么需要封装成智能指针,而不用原生的借用?
这就要从生命周期去理解,线程什么时候消亡,这个是很难确定的,也很难去安排不同线程的读写顺序,就算安排好了,颗粒度也是比较粗糙的,并发性能会降低。也就是在多线程环境下,往往都是要求你同时拥有读写线程的,不能说等到读线程完毕,再来启动写线程。因此静态的借用规则就完全没有用了,只能依赖智能指针在运行时动态执行借用规则。
不过好消息是:借用规则永远值得信赖!只要借用规则执行到位,不管是静态还是动态,它实际都是安全的。这并不影响 rust 安全语言的承诺。
// 因为不同线程,所以 a 可能比 pa 更快挂掉,pa 将指向无效地址
// 所以 spawn 对传递的生命周期要求是 'static 的
// 简而言之,普通的借用不适合用于传递到另一个线程
let a = 1;
let pa = &a;
thread::spawn(move ||pa);
// 转移所有权
// 一个普通对象转移所有权,它就到了另一个线程
// 没办法共享的使用这个对象
let a= 1;
thread::spawn(move ||a);
thread::spwan(move ||a); // 已经在上一步转移了
// 转移智能指针的所有权
// 智能指针自身实现了内部共享持有对象的方法
// 因为并不是传递引用,而是智能指针的所有权,所以没有生命周期的问题
let a = 1;
let pa = Arc::new(a); // 转移所有权到智能指针内部(托管)
let c1 = pa.clone();
let c2 = pa.clone();// 克隆体管理的是同一个内部对象
thread::spawn(move ||c1);
thread::spawn(move ||c2);
// 需要写操作,可以加一个内部可写的包装
let a = 1;
let pa = Arc::new(Muttex::new(a));
thread::spawn(move || *pa.lock().unwrap() += 1);
Futures(期货)是一种叫协程的技术。协程和多线程的区别是,协程它是基于任务视角,如果一个任务开始执行,但没有结束,另一个任务又开始,他们就是并发的。任务之间通过协作的方式,暂时停止自己,去执行另一个任务,又由另一个任务在适当的时候重启自己。
这段表述有两个重点:
协程因为它并不是一定要开启新的线程(线程是有限的系统资源),所以它相对而言更加高效率。
它有点类似 javascript 中的 Promise (承诺)。这两个都是一种寓意,即对未来值的封装。协程的结果就是未来值,对于同步代码来说,它不关心(封装)辅助线程是怎么运作的,它只在乎它的结果。
可见,这是以一种用同步视角来处理异步问题的方式,所以更受程序员的欢迎。
如果你了解 Promise 异步模型,它核心技术就是一个状态机。它能够在函数的某个点暂停,然后从这个点恢复执行。基本原理是类似 goto 跳转,在程序片段间插入各种跳转点,记录上次的跳出的位置,然后这次跳转回去,从而达到暂停继续的效果。
异步函数转化为 future 状态机,从而能够在函数之间的某个点之间暂停和恢复。暂停之后去哪里?这里就需要一个调度器(执行器),来控制不同的 feture 对象,一个对象释放控制权,调度器就运行另一个对象。
future 对象提供了 poll 轮询结构,让调度器可以知道对象的状态,Ready(T) 就绪携带运行结果 T,而 Pending 等待可以让调度可以去调用队列中的下一个 future 对象。这个模型严格来说还是靠对象之间自主的释放控制权,是协作式的异步模型。
那么调度器该何时再来轮询 pending 后的对象呢?
不停的轮询?rust 的设计是让事件源(reactor) 通过唤醒器(wake)来通知执行器(executor),然后执行器就可以把 future 对象加入轮询队列中继续轮询。
而唤醒器是在执行器 poll future 的时候通过参数传递进去的。future 遇到 pending 的时候,就将自己和唤醒器注册到 reactor 反应器,而响应器直接和系统异步 io 通信,io 就绪的时候再通过唤醒器通知执行器。
这个机制比较复杂的属于这个部分。因为涉及到底层系统 io,它现在也还不是标准库的一部分,由第三方库提供支持。
构成:
其中,并发处理的执行器和反应器,在 rust 中不是内置部分(标准库),而设计成由第三方库(异步运行时)来独立提供:
标准库则提供了:
准标准库 futures (里面的概念将来可能会成为标准,所以要优先使用,避免自造):
概念:
经典流程:执行器内部建立一个任务队列,里面是 future 对象,读取和轮询其中一个,将执行器当前的上下文状态、唤醒器和 future 状态机绑定,这一步是为了让状态机知道怎么和控制它的执行器通信(当然实际状态机会将通信任务委派给 reactor 反应器)。
当状态机遇到中断(如 pending),状态机将在反应器注册等待事件(将唤醒器传递给反应器),然后控制权回到执行器,所以执行器可以开始轮询下一个 futures 对象。当反应器中的等待事件到来,即表示第一个对象准备就绪,反应器会通过唤醒器通知执行器将中断了的对象重新放入轮询队列中。
因为这个唤醒器是执行器定制的,所以它能够控制执行器来再次轮询 future 对象。
用异步领域的话:执行器扮演计算资源的角色,而反应器扮演 IO 资源的角色。rust 中的设计,让你可以随意搭配不同的执行器和反应器。因为存在媒介(waker)。
它的简化模型:
// 线程运行状态信息
enum Poll<T>{
Ready(T), // 完毕,携带返回信息 T
Pending, // 运行中
}
// 基于轮询的多线程接口
trait Future {
type Output;
fn poll(&mut self, wake:fn())->Poll<Self::Output>;
}
机制:
真实的样子:
pub trait Future {
type Output; // 返回类型
pub fn poll( // 轮询主函数
self: Pin<&mut Self>, // 固定指针的 self
cx: &mut Context<'_> // 上下文和回调函数
) -> Poll<Self::Output>; // 状态和返回值
}
pub enum Poll<T> {
Ready(T), // 得到返回值
Pending, // 运行中
}
pub struct Pin<P> // 固定指针位置
where P:Deref,
<P as Deref>::Target: Unpin { // 指针目标类型为 Unpin
const fn new(P)->Pin<P>, // 固定包装
const fn into_inner(Pin<P>)->P, // 解包装
const unsafe fn new_unchecked(P)->Pin<P>, // !Unpin 也可以
}
pub auto trait Unpin { } // 支持解除移动的指针
pub struct Context<'a> { // 上下文
fn from_waker(&'a Waker)->Context<'a>, // 包装 Waker
fn waker(&self)->&'a Waker, // 解包装
}
pub struct Waker { // 唤醒器
fn wake(self), // 唤醒
fn wake_by_ref(&self), // 唤醒
fn will_wake(&self, &Waker)->bool, // 是否同一个任务
unsafe fn from_raw(RawWaker)->Waker, // 从原始唤醒器创建一个唤醒器
fn from(Arc<W:'static + Wake + Send + Sync>)->Waker, // 从 Arc<W> 唤醒器创建一个唤醒器
}
pub trait Wake { // 唤醒器
pub fn wake(self: Arc<Self>); // 唤醒
pub fn wake_by_ref(self: &Arc<Self>) { ... } // 唤醒
}
pub struct RawWaker { // 原始唤醒器
const fn new(*const(), &'static RawWAkerVTable)->RawWaker, // 数据指针,虚函数表指针
fn from(Arc<W>)->RawWaker, // 从 Arc<W> 唤醒器创建一个原始唤醒器
}
pub struct RawWakerVTable { // 虚函数表
const fn new(
clone: unsafe fn(*const ()) -> RawWaker, // 克隆时调用
wake: unsafe fn(*const ()), // 唤醒时调用
wake_by_ref: unsafe fn(*const ()), // 唤醒时调用
drop: unsafe fn(*const ()) // 释放时调用
)->RawWakerVTable
}
备注:rust 关于 async /await 的架构上还在探索,很多代码没有成为正式版。现在只需要了解一下高层架构,和怎么使用。
吐槽:个人认为 rust 的异步架构还是相当复杂,甚至对新手来说有点恶心的。需要从易用性上做改进,不要把入门的问题复杂化,去提供一个无所不包,面面俱到的超级框架。这种东西对学习来说是一种负担,它应该用在复杂的工程应用,而对大多数初级程序员来说是浪费精力。
言归正传,唤醒器的设计看上去挺复杂,它实际目的是实现动态分配,为啥不用特征对象这种 rust 原生的动态分配方案呢?一个原因它要实现克隆,这是原生做不到的,另一个原因似乎是为了更大范围的应用,比如嵌入式。如果你了解动态分配,它的原理实际上就是使用类似虚函数表这种数据结构,只不过由程序员自己处理罢了。
这里面并没有执行器和反应器的相关 api。
执行器的原理是调用 future 的 poll 来得到状态。这个 poll 是在执行器上运行的,所以它被归类为计算资源。要怎样实现一个好的计算分配,可以让其在多个线程上运行,拥有任务队列,接收不同的 future 请求。
反应器是处理事件,所谓事件很大程度就是 io 事件,比如读取网络,这些在系统中是原生的,就是异步运行机制的,而反应器封装系统原生的这些机制,触发相关事件。所以它对于它的开发者来说,主要是一个封装的任务。
可以清晰的观察到这两个东西,是负责不同的任务的。rust 标准库并没有包含反应器,可能正是因为它和系统底层机制有很大的关联。
对于初学者来说,可能会质疑 rust 的异步机制为什么非要设计得那么复杂,如果有异步,就开启一个异步线程执行不就完了,费那么多事干嘛?但我认为,需要异步的人可能很大程度都是这种高性能要求的客户,所以 rust 就这么设计了。
自引用结构需要使用 Pin 固定指针。否则有些内存操作将会不安全。
Context 只是对 Waker 的简单包装。Waker 是一个实现动态分派 api 的结构。目的是可以适配不同的执行器,从而对执行器 executor 和反应器 reactor 解耦合。
对于 future 机制的简单演示.要注意, 机制和使用是两码事.我们不需要去做这些事情,也可以使用 async 来开发,也就是我们实际上只需要编写 async 函数。
fn backcall() { // 回调函数
println!("backcall...");
}
struct FnWaker(fn()); // 自定义 waker
impl Wake for FnWaker {
fn wake(self: Arc<Self>) {
self.0();
}
}
async fn f1()->i32{ // future
println!("async ...");
108
}
#[derive(Default)]
pub struct xFuture(i32); // 自定义 future, future 可能会跨线程,这里简化了成员的同步封装
impl Future for xFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.0 {
3 => Poll::Ready(3), // 执行结束
_ => {
self.get_mut().0 += 1; // 修改 future 状态
cx.waker().wake_by_ref(); // 回调函数
Poll::Pending // 执行器(有可能)根据这个返回状态暂停线程
}
}
}
}
fn run(){
let waker = Arc::new(FnWaker(backcall)); // 自定义 wake
let waker = Waker::from(waker); // 转换成Waker
let mut cx = Context::from_waker(&waker); // 再转换成 Context
let mut f = f1(); // future(可以自定义,async 的定义有点不一样)
let mut f = xFuture::default(); // 使用自定义 future
let pf = unsafe { Pin::new_unchecked(&mut f) }; // 固定指针
loop { // 轮询
match pf.poll(&mut cx) {
Poll::Ready(result) => { // async 这里只会同步处理
println!("Ready:{}", result);
return result; // 跳出
}
Poll::Pending => { // async 不运行,因为其中没有pending 状态,自己定义的可以.
println!("pending...");
}
}
}
}
注解:
从使用者角度,我们并不需要去轮询什么 future ,以上都是运行机制的一部分,都是库作者应该去考虑的。我们要做的只是写好 async 函数(并配置好执行环境)。
另外一个悲剧的事实,就是标准库(STD)还没有完整的包含这套东西的实现(实现但没完全实现),而只是实现了语言层面的支持,如 async / await,也就是 future 本身。其他如执行器,waker 依赖第三方库来实现(新人噩梦):
async 本质上是语法糖。也就是编译器背后会帮我们生成大量的实现代码。其原理是: async --> generator(生成器) --> GenFuture(生成的实体) --> impl Future(实现了Future 特征)。
生成器这个概念在 iterator 迭代器中会涉及,具体可以查阅相关知识。也就是 async 背后是用生成器生成了一个实现 Future 的特征对象。
而 await 也是一个语法糖,它实际上是在 GenFuture 实体内部生成一个简单的执行器,对 Future 对象进行 poll 轮询(loop)。
也就是说,大体上,现在要开发异步程序,应该从 tokio 或者 async_std 出发,其中 tokio 比较成熟, async_std 比较新。
明确一下相关概念:
这里面有一些比较微妙的点,那就是异步的定义,我们可以理解为并发,也可以理解为并行。并发和并行的差异在于同一时刻是否执行两个以上的任务,并发只是某一时间段,存在两个任务,这可以通过切换当前执行的任务来实现。
并发是宏观的,时间段的。并行是微观的,时刻的。
rust 需要借助第三方库,实现真正的多线程调度。
异步编程可以说是一种范式,它不同于基于原生多线程的做法。原生多线程它是创建一个独立运行的任务,然后等待它,并最后同步数据。这种方式其实比较容易理解,但是它占用较多的系统资源,尤其在 io 处理比较频繁的时候是不合适的。
异步编程,它并不等同于多线程,它强调的是并发,就是多个任务同时启动这点,如果把任务分割交替执行,那么它就是异步,它并不需要多线程。
future 就是这种分割的抽象,值会有两种状态,一种是 pending 等候,一种是 ready 就绪,等候的时候可以分配 cpu 给下一个任务执行,从而达到并发的效果。
而 async / await 是从普通函数中创建这种抽象包装的便利工具。利用这套工具,我们就可以进行所谓的异步编程,它本质上是模拟同步编程的编写流程。但又不等于同步编程,你得想办法区分哪些是同步操作,哪些是异步操作,想办法让两个任务能够尽可能的并发运行。
fn s1(){}
fn s2(){}
fn main(){
s1();
s2();
}
// 异步 A
// 虽然确实是异步代码,实际操作却亦然是 s1 --> s2
// 因为 s1 和 s2 是有先后顺序的
// await 和多线程不一样,并不会启动一个新的线程
// 它和同步代码类似,遵从代码的顺序来执行
// 那为什么要用异步?
async s1(){}
async s2(){}
fn main(){
let run = async||{
s1().await;
s2().await;
}
block_on(run); // 阻塞当前线程并启动异步函数
}
// 异步 B
// 但是你可以用 join! 宏,它告诉执行器可以按任意顺序来执行
// 代码 A 也并非毫无意义,因为每个 await 都意味着释放一次控制权
// 执行器才能接着运行其他任务
// 系统中的任务足够多,这样并发的可能性就越高了
let run = async||{
futures::join!(s1,s2);
}
注意,其实 async 创建的 future 的 await 实际是不会返回 pending 的,但是如果 await 一个自定义的 future,那么实际上会等于在内部有个执行器,poll 自定义的 future,这时如果遇到 pending,才会真正返回 pending。
因此,await并不代表一定会 pending,await 是相等于内部 poll 轮询,遇到真正的 pending (一般是精心设计的自定义 future 才有)才会中断当前流程。这一点是需要特别留意的。
原始的 future 是通过一个独立的 futures 库提供的,现在已经加核心部分纳入 std 标准库中,而其扩展部分归类到 FutureExt 。
async_std 不依赖 futures 库,但是实现了类似的功能。
提供的基本接口:
期货(futures)的基本概念:
任务状态 Future:
任务 Task(async_std 中的核心抽象):
任务类似线程,但它并不意义对应一个线程(可能多个任务共享同一个线程),也就是它比线程要轻量。
阻塞操作:将会停止关联的所有任务,而不只当前任务,应避免使用系统的线程阻塞。
这是一个轻量级多线程库。
组成:
pending<T>()
: 一个只会返回 Pending 的 futureready<T>()
: 返回一个 Ready(T) 的 futureBoxed<T>
: Pin<Box<dyn Future<output = T> + Send + 'static>>
的别名BoxedLocal<T>
: 类似上面,但是没有 Send 修饰AssertAsync<T>
: 将 Read、Write、Seek 对象包装成异步版本:AsyncRead、AsyncWrite、AsyncSeekBlockOn<T>
: 将异步版本同步化(阻塞)。AsyncRead+Unpin、AsyncBufRead+Unpin、AsyncWrite+Unpin、AsyncSeek+Unpin 转换为 Read、BufRead、Write、SeekBufReader<R>
: 读取器升级为带缓冲区版本。(只有缓冲区没有命中才会真实读取操作,所以性能会更高)BufWrite<W>
: 写入器升级为带缓冲区版本。Cursor<T>
: 读写游标Vec<u8>
Pin<Box<dyn AsyncRead + Send + 'a>>
Pin<Box<dyn AsyncWrite + Send + 'a>>
Pin<Box<dyn AsyncRead + Send + 'static>>
Pin<Box<dyn AsyncWrite + Send + 'static>>
FnMut(&mut Context<'_>)->Poll<Option<T>>
)返回流FnMut(T)->Future<Option<(Item,T)>>
创建流。Option<T>
,筛选掉 NoneOption<B>
Boxed<T>
: Pin<Box<dyn Stream<Item = T> + Send + 'static>>
Boxed_Local<T>
: Pin<Box<dyn Stream<Item = T> + 'static>>
executor 执行器(三种实现):
pub(crate) struct ThreadLocalExecutor {
queue: RefCell<VecDeque<Runnable>>, //非并发主队列
injector: Arc<SegQueue<Runnable>>, //唤醒并发队列
event: IoEvent, // io 事件线程
}
impl ThreadLocalExecutor {
pub fn new() -> ThreadLocalExecutor{} // 创建
pub fn enter<T>(&self, f:impl FnOnce()-> T)->T{} //检测是否单例
pub fn spawn<T:'static>(future: impl Future<Output = T> + 'static)-> Task<T>{} // 创建并调度 task
pub fn execute(&self)->bool{} // 执行任务
}
安装 rust 并配置 vscode:
# gentoo 安装稳定版 rust
emerge -aq rust
# 设置 bash 变量
cat > ~/.bashrc << endl
PATH=\$HOME/.cargo/bin:\$PATH
export RUSTUP_UPDATE_ROOT=https://mirrors.tuna.tsinghua.edu.cn/rustup/rustup
export RUSTUP_DIST_SERVER=https://mirrors.tuna.tsinghua.edu.cn/rustup
export CARGO_HOME=\$HOME/.cargo
export RUSTUP_HOME=\$HOME/.rustup
endl
# 同样的方法,设置 X 窗口的环境变量 ~/.xprofile
nano ~/.xprofile
# 更新 rust 工具链到夜间版本
rustup toolchain install nightly
rustup default nightly
rustup component list # 查看已经安装的组件,如果没有,安装相关组件
rustup component add rust-analyzer-preview
# 编辑
nano ~/.cargo/config
输入:
[source] #源
[source.tsinghua]
registry="https://mirrors.tuna.tsinghua.edu.cn/git/crates.io-index.git"
[source.ustc]
registry = "http://mirrors.ustc.edu.cn/crates.io-index"
[source.crates-io]
replace-with = "tsinghua"
安装 vscode 插件(图):
配置:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。