FoundationDB flow -- Part 1
Introduction
flow 其实是一种语言,提供了一种在 C++ 中编写异步程序的方式。在 Linux 上,flow 的底层依赖于
- boost::asio,用于实现网络异步。
- kernel 的 ASIO,用于实现 IO 异步。
- 以及自己实现的定时器调度,用于实现定时器异步。
在 flow 最常见的 loop choose ... when
其实类似于 Golang 的 for + select + channel 的方式。一个 ACTOR 可以理解为一个 goroutine。但是,因为 flow 是以 C++ 为基础的,所以它看起来会比较奇怪,比较难以理解。
这个系列文章主要用于梳理 FoundationDB (fdb) flow 是如何工作的,以帮助大家看懂代码(官方文档实在太少)。
Future and Promise
Future 和 Promise 相当于一个简化的 Golang 的 channel,Future 可以用来等待和接受,Promise 则用来触发和发送。
在 flow 的语境下,Future 表示一个 ACTOR 等待的值,Promise 表示一个 ACTOR 会承诺提供的值。当一个 ACTOR 创建了一个 Promise 之后,需要把该 Promise 对应的 future 提供给等待者 (promise.getFuture())
,等待者会 wait 这个 future,达到传递值的目的。
Future 和 Promise 都采用了 Pimpl 的模式来实现,主要实现放在成员 SAV<T> *sav
中。
SAV: Single Assignment Variable
SAV
申请了一段连续内存来保存一个对象,同时绑定了一个 Error
,当 Error
已经被设置过时,获取这个对象会抛出异常。这个错误主要用来控制给对象只能被设置一次,比如各种 send 函数,只能调用一次。
SAV
初始化的时候,需要指定有几个 futures,有几个 promises。futures 表示有多少个对象在等待回调,promises 表示有多少个对象会触发这个回调。futures 为 0,表示没有人在等待回调,意味着这个 SAV 可以被删除了。
SAV
继承自 Callback
,通过一个双向链表来保存需要回调的函数。
sav.send(value)
将 value 移动到 sav.value_storage
中,将 error_state
设置为 SET_ERROR_CODE
,然后调用 Callback<T>::next->fire(value)
。这个函数是整个 flow 语言的核心,对一个 promise 调用 send 时,相当于触发了它对应的回调函数,从而让等待在对应 future 上的函数继续执行。
sav.send(Error)
这个和 sav.send(value)
的类似,不过 error_state
会设置为参数传递进来的 err
,然后调用 Callback<T>::next->error(err)
。
Promise 相关的函数
以下几个函数是和 promise 相关的。
finishSendAndDelPromiseRef
this->error_state = SET_ERROR_CODE
callback
if (!--promises && !futures) destroy()
一个 promise 已经完成,判断是否还有 promise 或者 future,都没有则 destroy。
sendAndDelPromiseRef
if (promises == 1 && !futures) destroy()
else 调用 fnishSendAndDelPromiseRef
作为一个 promise 提供者,调用这个方法发送 promise,然后减少引用计数。
sendErrorAndDelPromiseRef
和上面的方法作用类似,只不过用于 sendError 的情况
addCallbackAndDelFutureRef(cb)
将当前对象 this 添加到 cb 的队列中
if (Callback<T>::next != this) sav.futures-
如果当前对象和回调的对象不同,则减少当前对象的 futures。对于一个刚创建的对象,其 next == this,所以这里不会调用。如果添加过 callback 了,那么这里会执行 futures–。根据注释,这个是逻辑上保持 futures 计数平衡的措施。不过,这个代码太绕,注释也不清晰,不知道是什么含义。
Callback
本质上是一个双向链表,提供了 fire, error, wait 等虚函数。
Callback
被继承使用,例如 SAV
, ActorCallback
,继承者指定如何实现 fire 等函数。
例如 ActorCallback
会指定调用注册的对象的 a_callback_fire
函数和 a_callback_error
函数。这两个函数是在 ACTOR 生成的代码中自动生成的,属于 XxxActorState 类。
template <class ActorType, int CallbackNumber, class ValueType>
struct ActorCallback : Callback<ValueType> {
virtual void fire(ValueType const& value) override { static_cast<ActorType*>(this)->a_callback_fire(this, value); }
virtual void error(Error e) override { static_cast<ActorType*>(this)->a_callback_error(this, e); }
};
在整个 flow 程序里,有一点是比较难理解的: future 和 actor 都是 callback。
Actor
每个 ACTOR 会被生成一系列的代码,包括原始函数,一个 Actor 类,和一个用于保存状态的 ActorState 类,主要的逻辑在 ActorState 类里。
Actor 和 g_network
对象强绑定,也就是和 Net2
类 (flow/Net2.actor.cpp) 强绑定,所有的 delay 和 yield 调用都作用于 g_network
对象,用于将当前 coroutine 的回调添加到 Net2
的 ready 队列中。
Net2
flow 的 Net2
是一个 flow 应用的起点,开始于 g_network->run()
, g_network
是一个 Net2
实例,run 方法主要的逻辑是把 ready 队列中的 task 取出并执行。
所有的 flow ACTOR 的执行,都需要先提交到 Net2 的 ready 队列。这里有四个方法,三个是 delay,一个是 yield (见 flow/flow.h 文件的最下面)。所以,ACTOR 生成的后的代码,主要通过调用 delay 和 yield 来实现 coroutine 的运行控制。
Tutorial
documentation/tutorial/tutorial.actor.cpp 包含了很多 ACTOR 示例,这个也可以用来理解 flow 的实现细节。从这里开始,我会基于 tutorial 中的例子来分析 flow 应用是如何运行的,
tutorial 文件按照从上到下的顺序提供了从简单到复杂的 flow 程序。我们可以通过对比源代码,和编译器转换后的代码来学习一个 ACTOR 的运行流程。
编译的方法可以在官方 README 查到,编译后的文件变成了 PATH_TO_BUILD/documentation/tutorial/tutorial.actor.g.cpp。
simpleTimer
这个方法实现一个简单的循环定时器,每个一定时间,打印一条输出。
ACTOR Future<Void> simpleTimer() {
state double start_time = g_network->now();
loop {
wait(delay(1.0));
std::cout << format("Time: %.2f\n", g_network->now() - start_time);
}
}
这个函数会被编译器转换为三个部分: simpleTimer
函数, SimpleTimerActor
和 SimpleTimerActorState
。
simpleTimer
函数的 signature 和原来一样,但是内部的实现全部被放到 SimpletTimeActor
中:
class SimpleTimerActor : public Actor<Void>, public ActorCallback< SimpleTimerActor, 0, Void >, public FastAllocated<SimpleTimerActor>, public SimpleTimerActorState<SimpleTimerActor> {
SimpleTimerActor
类的基类 Actor 的基类是个 SAV,用来保存一个 ACTOR 的状态:
- 1: actor is cancelled
- 0: actor is not waiting
- 1-N: waiting in callback group #,就是等待的 callback group 的编号。
基类 ActorCallback
提供了对 Callback 的 fire, error 的实现,分别映射到 a_callback_fire
, a_callback_error
两个调用。这两个调用都是由 XxxxActorState 类实现的。
一个 actor 被转换为 Actor 类之后,其执行入口就是这个 Actor 类的构造函数。Actor 类还支持了 cancel 操作。
XxxActorState 类用于封装 ACTOR 的逻辑,所有的 state (flow 的 state 关键字定义的变量) 都是该类的类成员,该类会根据用户写的 flow 关键字,生成很多个类函数,一般来说。另外,对于所有的 wait 和 waitNext 操作,都会生成对应的 a_callback_fire
和 a_callback_error
函数,实现异步回调。XxxActorState 类的入口是在 XxxActor 中调用的,一般是 this->a_body1()
. 自动生成的 XxxActorState 类帮助用户解决了异步编程中的 callback hell 问题。
simpleTimer 的逻辑很简单,就是等待定时器到期后,打印一个字符串。 delay
是一个关键点的函数(上文提到过),对应到 Net2::delay 函数。
SimpleTimerActorState.a_body1loopBody1
封装了对 delay 的调用。这里有两个关键点:
- delay 调用本身会生成一个
DelayedTask
,其中会封装一个PromiseTask
,然后DelayedTask
加入到 ready 队列中。 - delay 调用本身会返回一个 Future,名字是
__when_expr_0
,然后会执行__when_expr_0.addCallbackAndClear(this)
,这个调用最终会把当前对象this
,也就是SimpleTimerActorState
实例作为一个 callback,加入到__when_expr_0
的回调队列中。
当 delay 生成的 DelayedTask 在 Net2::run 中被调用时,会执行的是其中封装的 PromiseTask
,这个 PromiseTask
被执行时,会调用 Promise.send
方法,激活自己所对应的 Future ,也就是前面 delay 函数返回的那个。因为 send 的结果是调用 SAV 对应的 callback 操作,所以这里就会调用前面通过 __when_expr_0.addCallbackAndClear(this)
对应的 this
,也就是 SimpleTimerActorState
的 a_callback_fire
函数。
这个函数会调用 a_body1loopBody1when1 -> a_body1loopBody1cont1
,在这个函数里会调用 std::cout
打印函数。