Introduction

flow 其实是一种语言,提供了一种在 C++ 中编写异步程序的方式。在 Linux 上,flow 的底层依赖于

  • boost::asio,用于实现网络异步。
  • kernel 的 ASIO,用于实现 IO 异步。
  • 以及自己实现的定时器调度,用于实现定时器异步。

在 flow 最常见的 loop choose ... when 其实类似于 Golang 的 for + select + channel 的方式。一个 ACTOR 可以理解为一个 goroutine。但是,因为 flow 是以 C++ 为基础的,所以它看起来会比较奇怪,比较难以理解。

这个系列文章主要用于梳理 FoundationDB (fdb) flow 是如何工作的,以帮助大家看懂代码(官方文档实在太少)。

Future and Promise

Future 和 Promise 相当于一个简化的 Golang 的 channel,Future 可以用来等待和接受,Promise 则用来触发和发送。

在 flow 的语境下,Future 表示一个 ACTOR 等待的值,Promise  表示一个 ACTOR 会承诺提供的值。当一个 ACTOR 创建了一个 Promise 之后,需要把该 Promise 对应的 future 提供给等待者 (promise.getFuture()),等待者会 wait 这个 future,达到传递值的目的。

Future 和 Promise 都采用了 Pimpl 的模式来实现,主要实现放在成员 SAV<T> *sav 中。

SAV: Single Assignment Variable

SAV 申请了一段连续内存来保存一个对象,同时绑定了一个 Error ,当 Error 已经被设置过时,获取这个对象会抛出异常。这个错误主要用来控制给对象只能被设置一次,比如各种 send 函数,只能调用一次。

SAV 初始化的时候,需要指定有几个 futures,有几个 promises。futures 表示有多少个对象在等待回调,promises 表示有多少个对象会触发这个回调。futures 为 0,表示没有人在等待回调,意味着这个 SAV 可以被删除了。

SAV 继承自 Callback ,通过一个双向链表来保存需要回调的函数。

sav.send(value)

将 value 移动到 sav.value_storage 中,将 error_state 设置为 SET_ERROR_CODE,然后调用 Callback<T>::next->fire(value)。这个函数是整个 flow 语言的核心,对一个 promise 调用 send 时,相当于触发了它对应的回调函数,从而让等待在对应 future 上的函数继续执行。

sav.send(Error)

这个和 sav.send(value) 的类似,不过 error_state 会设置为参数传递进来的 err,然后调用 Callback<T>::next->error(err)

Promise 相关的函数

以下几个函数是和 promise 相关的。

finishSendAndDelPromiseRef

this->error_state = SET_ERROR_CODE

callback

if (!--promises && !futures) destroy()

一个 promise 已经完成,判断是否还有 promise 或者 future,都没有则 destroy。

sendAndDelPromiseRef

if (promises == 1 && !futures) destroy()

else 调用 fnishSendAndDelPromiseRef

作为一个 promise 提供者,调用这个方法发送 promise,然后减少引用计数。

sendErrorAndDelPromiseRef

和上面的方法作用类似,只不过用于 sendError 的情况

addCallbackAndDelFutureRef(cb)

将当前对象 this 添加到 cb 的队列中

if (Callback<T>::next != this) sav.futures-

如果当前对象和回调的对象不同,则减少当前对象的 futures。对于一个刚创建的对象,其 next == this,所以这里不会调用。如果添加过 callback 了,那么这里会执行  futures–。根据注释,这个是逻辑上保持 futures 计数平衡的措施。不过,这个代码太绕,注释也不清晰,不知道是什么含义。

Callback

本质上是一个双向链表,提供了 fire, error, wait 等虚函数。

Callback 被继承使用,例如 SAV, ActorCallback,继承者指定如何实现 fire 等函数。

例如 ActorCallback 会指定调用注册的对象的 a_callback_fire 函数和 a_callback_error 函数。这两个函数是在 ACTOR 生成的代码中自动生成的,属于 XxxActorState 类。

template <class ActorType, int CallbackNumber, class ValueType>

struct ActorCallback : Callback<ValueType> {

virtual void fire(ValueType const& value) override { static_cast<ActorType*>(this)->a_callback_fire(this, value); }

virtual void error(Error e) override { static_cast<ActorType*>(this)->a_callback_error(this, e); }

};

在整个 flow 程序里,有一点是比较难理解的: future 和 actor 都是 callback。

Actor

每个 ACTOR 会被生成一系列的代码,包括原始函数,一个 Actor 类,和一个用于保存状态的 ActorState 类,主要的逻辑在 ActorState 类里。

Actor 和 g_network 对象强绑定,也就是和 Net2 类 (flow/Net2.actor.cpp) 强绑定,所有的 delay 和 yield 调用都作用于 g_network 对象,用于将当前 coroutine 的回调添加到 Net2 的 ready 队列中。

Net2

flow 的 Net2 是一个 flow 应用的起点,开始于 g_network->run()g_network 是一个 Net2 实例,run 方法主要的逻辑是把 ready 队列中的 task 取出并执行。

所有的 flow ACTOR 的执行,都需要先提交到 Net2 的 ready 队列。这里有四个方法,三个是 delay,一个是 yield (见 flow/flow.h 文件的最下面)。所以,ACTOR 生成的后的代码,主要通过调用 delay 和 yield 来实现 coroutine 的运行控制。

Tutorial

documentation/tutorial/tutorial.actor.cpp 包含了很多 ACTOR 示例,这个也可以用来理解 flow 的实现细节。从这里开始,我会基于 tutorial 中的例子来分析 flow 应用是如何运行的,

tutorial 文件按照从上到下的顺序提供了从简单到复杂的 flow 程序。我们可以通过对比源代码,和编译器转换后的代码来学习一个 ACTOR 的运行流程。

编译的方法可以在官方 README 查到,编译后的文件变成了 PATH_TO_BUILD/documentation/tutorial/tutorial.actor.g.cpp。

simpleTimer

这个方法实现一个简单的循环定时器,每个一定时间,打印一条输出。

ACTOR Future<Void> simpleTimer() {
	state double start_time = g_network->now();
	loop {
		wait(delay(1.0));
		std::cout << format("Time: %.2f\n", g_network->now() - start_time);
	}
}

这个函数会被编译器转换为三个部分: simpleTimer 函数, SimpleTimerActorSimpleTimerActorState

simpleTimer 函数的 signature 和原来一样,但是内部的实现全部被放到 SimpletTimeActor 中:

class SimpleTimerActor : public Actor<Void>, public ActorCallback< SimpleTimerActor, 0, Void >, public FastAllocated<SimpleTimerActor>, public SimpleTimerActorState<SimpleTimerActor> {

SimpleTimerActor 类的基类 Actor 的基类是个 SAV,用来保存一个 ACTOR 的状态:

  • 1: actor is cancelled
  • 0: actor is not waiting
  • 1-N: waiting in callback group #,就是等待的 callback group 的编号。

基类 ActorCallback 提供了对 Callback 的 fire, error 的实现,分别映射到 a_callback_fire, a_callback_error 两个调用。这两个调用都是由 XxxxActorState 类实现的。

一个 actor 被转换为 Actor 类之后,其执行入口就是这个 Actor 类的构造函数。Actor 类还支持了 cancel 操作。

XxxActorState 类用于封装 ACTOR 的逻辑,所有的 state (flow 的 state 关键字定义的变量) 都是该类的类成员,该类会根据用户写的 flow 关键字,生成很多个类函数,一般来说。另外,对于所有的 wait 和 waitNext  操作,都会生成对应的 a_callback_firea_callback_error 函数,实现异步回调。XxxActorState 类的入口是在 XxxActor 中调用的,一般是 this->a_body1(). 自动生成的 XxxActorState 类帮助用户解决了异步编程中的 callback hell 问题。

simpleTimer 的逻辑很简单,就是等待定时器到期后,打印一个字符串。 delay 是一个关键点的函数(上文提到过),对应到 Net2::delay 函数。

SimpleTimerActorState.a_body1loopBody1 封装了对 delay 的调用。这里有两个关键点:

  1. delay 调用本身会生成一个 DelayedTask,其中会封装一个 PromiseTask,然后 DelayedTask 加入到 ready 队列中。
  2. delay 调用本身会返回一个 Future,名字是 __when_expr_0,然后会执行 __when_expr_0.addCallbackAndClear(this),这个调用最终会把当前对象 this,也就是 SimpleTimerActorState 实例作为一个 callback,加入到 __when_expr_0 的回调队列中。

当 delay 生成的 DelayedTask 在 Net2::run 中被调用时,会执行的是其中封装的 PromiseTask,这个 PromiseTask 被执行时,会调用 Promise.send 方法,激活自己所对应的 Future ,也就是前面 delay 函数返回的那个。因为 send 的结果是调用 SAV 对应的 callback 操作,所以这里就会调用前面通过 __when_expr_0.addCallbackAndClear(this) 对应的 this,也就是 SimpleTimerActorStatea_callback_fire 函数。

这个函数会调用 a_body1loopBody1when1 -> a_body1loopBody1cont1,在这个函数里会调用 std::cout 打印函数。


知识共享许可协议本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。