Back

/ 10 min read

(施工中)Rust实现自己的Future与runtime

什么是异步编程?

    ​异步编程允许我们在执行一块带有阻塞的代码时,无需等待阻塞完毕,就可以先行跳转其他代码块进行代码的执行。当阻塞恢复时,通过执行器的调度方案,在某一时刻(并不及时)回到之前被阻塞的代码块,进行继续执行。

image-20240613180436685     通常来说异步编程是并发编程的范畴,而不是并行编程的范畴。异步编程的实现一般依赖于协程,所以异步编程往往并不依赖于内核和硬件的支持,更加依赖于协程的实现。反直觉的事情是,协程起源于汇编语言。

    综上所述,异步编程可以依赖于第三方库,或者我们自己实现一个执行器,并在其中进行自己的调度,这在某些无法多线程的硬件环境中极为有用。

    异步编程在使用中极为常见,所以大部分语言都会使用关键字来定义一个异步函数。通常这组关键字为Async/Await

在Rust中,语言本身只为我们提供了Async/Await这组关键字。并不额外的为我们提供异步运行时,这导致我们想要使用异步时,需要额外的引入运行时库。

协程调度者的策略

​    协程是一种更轻量级的线程,协程的调度者(executor )是线程中自行实现的一套调度机制运行者。对于系统内核的时间片调度,协程的实现往往采用更简单的方式,例如Golang中的有栈协程goroutine,和在前端广为人知的Promise事件循环以及Python中的generator

​    Rust的实现为无栈协程,它的实现方式是通过FutureTask的方式,将协程的状态保存在全局线程区,通过状态机的方式,来实现协程的调度。


​    有栈协程可以在任意函数中挂起,阻塞恢复时立即接着执行;有栈协程通过遇到阻塞时,将整条函数栈保存到一段空间中(也可能固定当前这段栈空间),并且保存寄存器值,当阻塞恢复时,从空间中恢复栈状态与寄存器状态。优点是任意函数中都可以开辟协程运行。而缺点正是需要资源保存栈、恢复栈。

​    而无栈协程不能在任意函数中挂起,常见的实现,你只能在async函数中调用无阻塞的await过程。因为我们预设了在async的环境中执行异步,所以我们只需要保存async函数此时的上下文即可。上下文保存在全局线程区,当阻塞恢复的时候,无栈协程不像有栈协程一样恢复所有状态,而是通过状态机,直接执行接下来的程序语句。

​    我们本文将注重无栈协程的实现,会从最基本的阻塞场景进行探究,我会尽力将一个协程执行器的构建过程讲清楚。

实现部分

本节所有代码,均能通过访问我的gitea仓库来进行获取。gitea地址

Section1-1 模拟阻塞

​    我们要实现异步执行时,那么我们就必须模拟一个阻塞的场景,但是我们并不能让代码真正的阻塞。为了能在主线程的事件循环中转移控制权,我们定义一个函数,它的作用是接受一个参数delay_time,在至少经过delay_time时间后,打印一个字符串。

use std::time::{Duration, Instant};
#[allow(dead_code)]
fn delay_task(delay_time: usize) -> impl Fn() -> () {
// 记录当前时间
let start_time = Instant::now();
return move || {
if Instant::now() - start_time > Duration::from_secs(delay_time as u64) {
println!("已经经过了至少: {:?}秒", delay_time);
}
};
}
fn main() {
let delay_seconds_time: usize = 5;
let task = delay_task(delay_seconds_time);
loop {
task();
std::thread::sleep(Duration::from_millis(500)); // 休眠500ms
}
}

​    上述代码实现了一个最基本的delay_task模拟阻塞的函数,使用了一个闭包来暂存start_time,并且在每次调用时,判断是否已经经过了delay_time时间,如果是则打印字符串。 ​    程序运行后,主线程每500ms会进行一次task()的调用,task()任务被推进。当5s之后,我们会看到每隔500ms打印一次字符串。这并不是我们想要的结果。造成这种问题的原因是我们的主线程并不能知道task()内部的状态,从而选择执行或不执行,接下来我们稍微改进一下代码,来让主线程可以知道task()的状态。

Section1-2 返回状态

use std::time::{Duration, Instant};
#[allow(dead_code)]
fn delay_task(delay_time: usize) -> impl Fn() -> bool {
// 记录当前时间
let start_time = Instant::now();
return move || {
if Instant::now() - start_time > Duration::from_secs(delay_time as u64) {
println!("已经经过了至少: {:?}秒", delay_time);
return true;
}
false
};
}
const TIME_PIECE: usize = 500;
fn main() {
let delay_seconds_time: usize = 5;
let task01 = delay_task(delay_seconds_time);
let task02 = delay_task(delay_seconds_time * 2);
let mut task01_is_done = false;
let mut task02_is_done = false;
loop {
if !task01_is_done {
task01_is_done = task01();
}
if !task02_is_done {
task02_is_done = task02();
}
if task01_is_done && task02_is_done {
break;
}
std::thread::sleep(Duration::from_millis(TIME_PIECE as u64)); // 休眠500ms
}
println!("退出主线程!");
}

​    我们修改了delay_task函数的返回值,现在它能够返回是否执行过一次的状态(虽然并不优雅)。我们还在主线程定义了另一个task02任务,这样我们才能更清楚的看到主线程的执行流程。主线程需要等待task01task02都执行完毕后才能退出。

​    我们上面的这段代码存在的问题是,我们的主线程退出的逻辑和task强耦合,并且极度依赖于硬编码写入的中断条件。所以我们再次修改一下代码,让返回值和主线程的退出逻辑分离。

Section1-3 事件循环和任务抽象

use std::{
borrow::BorrowMut,
cell::RefCell,
rc::Rc,
time::{Duration, Instant},
};
type TaskStatus = Rc<RefCell<bool>>;
#[allow(dead_code)]
fn delay_task(delay_time: usize) -> (impl FnMut() -> (), TaskStatus) {
// 记录当前时间
let start_time = Instant::now();
let status = Rc::new(RefCell::new(false));
let mut _status = status.clone();
return (
move || {
if !(*_status.borrow())
&& (Instant::now() - start_time > Duration::from_secs(delay_time as u64))
{
println!("已经经过了至少: {:?}秒", delay_time);
_status.borrow_mut().replace(true);
}
},
status.clone(),
);
}
const TIME_PIECE: usize = 500;
pub struct Task<F, T>
where
F: FnMut() -> T,
{
task_poll_fn: F,
task_status: TaskStatus,
}
impl<F, T> Task<F, T>
where
F: FnMut() -> T,
{
pub fn new(mut task_init_fn: impl FnMut() -> (F, TaskStatus)) -> Task<F, T> {
let (task_poll_fn, task_status) = task_init_fn();
Task {
task_poll_fn,
task_status,
}
}
pub fn run(&mut self) {
if !(*self.task_status.borrow()) {
(self.task_poll_fn)();
}
}
}
fn event_loop<F, T>(task_list: &mut Vec<Task<F, T>>)
where
F: FnMut() -> T,
{
loop{
let mut is_all_done = true;
for task in task_list.iter_mut() {
if !(*task.task_status.borrow()) {
is_all_done = false;
task.run();
break;
}
}
if is_all_done {
break;
}
std::thread::sleep(Duration::from_millis(TIME_PIECE as u64));
}
println!("事件循环退出!");
}
fn main() {
let mut task_list = vec![];
let delay_seconds_time: usize = 5;
task_list.push(Task::new(move || {
return delay_task(delay_seconds_time);
}));
task_list.push(Task::new(move || {
return delay_task(delay_seconds_time * 2);
}));
event_loop(&mut task_list);
println!("退出主线程!");
}

​    经过我们的不懈努力,我们的代码越来越抽象了,接下来我们来看看这些新增加的代码都对结构做了什么样的优化。

​    首先我们定义了一个结构体TaskTask是任务的抽象,它接收一个task_init_fn函数,这个函数的作用是开启并初始化一个任务,并且返回这个任务的推进函数task_pool和任务状态task_statusTask结构体的run方法,会在任务未完成的情况下,调用task_pool函数,推进任务。

​    其次,我们将原本的任务循环抽象了出来,并创建了一个新的函数event_loopevent_loop接收一个任务列表,它会在任务列表中的任务全部完成后退出。event_loop函数的实现是一个简单的循环,每次循环都会遍历任务列表,如果有任务未完成,就调用任务的run方法,推进任务。如果所有任务都完成了,就退出循环。

Section2-1 标准中的Future

TODO…