FoundationDB flow -- Part 2
Tutorial 中的 promiseDemo
和 triggerDemo
的实现和 simipleTimer
类似,就不具体分析了。本文主要来分析一下 echoServer
的实现,这个程序涉及网络收发包,代码非常绕。
echoServer
这个是一个网络应用程序,其异步回调流程更难理解,涉及到 RequestStream
和 waitNext
的使用。
接口定义
首先注意到, EchoServerInterface
的定义,以及每个 Request 的定义都包含一个 serialize
方法。
struct EchoServerInterface {
constexpr static FileIdentifier file_identifier = 3152015;
RequestStream<struct GetInterfaceRequest> getInterface;
RequestStream<struct EchoRequest> echo;
RequestStream<struct ReverseRequest> reverse;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, echo, reverse);
}
};
struct GetInterfaceRequest {
constexpr static FileIdentifier file_identifier = 12004156;
ReplyPromise<EchoServerInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct EchoRequest {
constexpr static FileIdentifier file_identifier = 10624019;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct ReverseRequest {
constexpr static FileIdentifier file_identifier = 10765955;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
服务的实现
ACTOR Future<Void> echoServer() {
state EchoServerInterface echoServer;
echoServer.getInterface.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint);
loop {
choose {
when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
req.reply.send(echoServer);
}
when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); }
when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
}
}
}
}
这个代码经过 flow 编译器处理后,生成的代码结构其实和 simpleTimer
差不多,这里就不展开了,理解这个代码的难点在于理解报文是如何收发的,以及报文的收发如何和 ACTOR 结合起来。
注册 well known endpoint
这个服务的实现函数,首先调用了 echoServer.getInterface.makeWellKnownEndpoint()
方法。
RequestStream
有个私有成员NetNotifiedQueue<T>* queue;
makeWellKnownEndpoint(token, taskID)
会调用queue->makeWellKnownEndpoint(token, taskID)
NetNotifiedQueue
是用于网络应用的NotifiedQueue
,同时也继承了FlowReceiver
。makeWellKnownEndpoint
就是FlowReceiver
的成员函数。FlowReceiver
对一个应用的接收者,表示某个服务端的收包的一方。该成员函数会调用lowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID)
-
这个
FlowTransport::transport()
指向的是Net2.global(INetwork::enFlowTransport))
,就是一个FlowTransport
实例,这个函数的作用是把这个应用,也就是 Endpoint,注册到一个列表中(后续收到报文时,会从这个列表中查找,得到对应的 receiver):void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) { endpoint.addresses = self->localAddresses; ASSERT(receiver->isStream()); self->endpoints.insertWellKnown(receiver, endpoint.token, taskID); }
这行代码之后,剩下的代码逻辑就是:循环接收消息,然后发送响应。但是,看完这些代码,你会发现,自己根本不知道网络报文是怎么收上来的。
Listen and Accept Connections
网络应用程序,肯定需要监听和接受新的连接,所以在一个 flow 应用的代码流程上,肯定有地方做这些事情。
这里就要看一下整个应用的启动代码: FlowTransport::transport()::bind(publicAddress, publicAddress)
, bind
函数会调用一个全局定义的 listen
函数。
listen
函数,准确的说,是一个 ACTOR,会循环接受新的连接:
ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr ) {
state ActorCollectionNoErrors incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer
state Reference<IListener> listener = INetworkConnections::net()->listen( listenAddr );
state uint64_t connectionCount = 0;
try {
loop {
Reference<IConnection> conn = wait( listener->accept() );
if(conn) {
TraceEvent("ConnectionFrom", conn->getDebugID()).suppressFor(1.0)
.detail("FromAddress", conn->getPeerAddress())
.detail("ListenAddress", listenAddr.toString());
incoming.add( connectionIncoming(self, conn) );
}
connectionCount++;
if( connectionCount%(FLOW_KNOBS->ACCEPT_BATCH_SIZE) == 0 ) {
wait(delay(0, TaskPriority::AcceptSocket));
}
}
} catch (Error& e) {
TraceEvent(SevError, "ListenError").error(e);
throw;
}
}
listerner = INetworkConnections::net()->listen(listenAddr)
,这里其实就是调用了Net2::listen()
。Net2::listen()
会返回一个Listener
实例,这个对象使用boost::asio
来实现异步网络应用。Listener
的doAccept
函数,会使用 asio 的async_accept
来等待连接完成(会立即返回),当连接完成后,设置一个BindPromise
来通知调用者。- 全局
listen
函数会wait (listener->accept())
,如果返回成功,那么就会对返回的conn
调用connectionIncoming()
.listen
函数中还使用了ActorCollection
来保存所有的监听连接。 connectionIncoming()
里会先调用wait(conn->acceptHandshake()
完成连接的握手,然后调用一个connectionReader()
执行读操作。
Read from connection
connectionReader()
就是不停的从连接读取数据,然后执行回调。
- 该函数里会直接处理第一个连接报文,就是当一个新的客户端连接上来时,会先发送的一些私有协议连接报文。
- 然后调用
scanPackets
来处理收到的报文.scanPackets
里会检查 checksum, 然后创建一个ArenaReader
来读取报文的内容,首先会读取UID
, 然后调用deliver()
函数,将报文传递给应用协议,也就是本例中的 EchoServer。- 在 flow 中,识别应用是靠
Endpoint.token
,所以要先读出UID
。通过 UID 在TransportData
中找到对应的 endpoint。这个时候,结合上面的提到的对FlowTransport::addWellKnownEndpoint
的调用,我们就可以把我们定义的 EchoServer 和 flow 的收包流程连接上,也就是在收包的 ACTOR 中,会通过 UID 找到对应的 receiver,也就是我们的调用FlowTransport::addWellKnownEndpoint
方法时使用的RequestStream
对象。
- 在 flow 中,识别应用是靠
-
接下来,就是看看收到报文时,在
deliver
函数中,回到了哪个函数?从代码中,可以看到,调用的是receiver->receive()
函数,这个方法,在NetNotifiedQueue
中实现,参数是一个ArenaObjectReader
。template <class T> struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> { using FastAllocated<NetNotifiedQueue<T>>::operator new; using FastAllocated<NetNotifiedQueue<T>>::operator delete; NetNotifiedQueue(int futures, int promises) : NotifiedQueue<T>(futures, promises) {} NetNotifiedQueue(int futures, int promises, const Endpoint& remoteEndpoint) : NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {} void destroy() override { delete this; } void receive(ArenaObjectReader& reader) override { this->addPromiseRef(); T message; reader.deserialize(message); this->send(std::move(message)); this->delPromiseRef(); } bool isStream() const override { return true; } };
-
这个方法先调用了 reader 的
deserialize
方法来,然后调用了 send 方法来触发回调。回调的是 flow 生成的代码,在这个例子中,主要的部分是 EchoServer 的req.reply.send(echoServer)
。到这里,我们可以衔接上如下的 flow 代码:when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) { req.reply.send(echoServer); }
req
就是上面this->send(std::move(message))
函数的参数。 - 然后,我们的 flow 代码会执行
req.reply.send(echoServer)
,这个reply
是什么时候初始化的?从整个代码流程看,只可能是在reader.deserialize(message)
的时候初始化的。
Write to connection
GetInterfaceRequest
有一个成员 reply
,类型是 ReplyPromise<EchoServerInterface>
。当我们调用 waitNext(echoServer.getInterface.getFuture())
时,得到了一个 req
对象,作为一个服务端程序来说,获得客户端发来的一个 request 对象,可以想象得到,其中的内容应该主要包含了客户端发送的请求内容,但是这个 reply
显然是一个响应对象,而且在服务端的代码中,还调用了它的 send
方法,我们必须要搞清楚,这个 reply 对象是怎么初始化的。
这里就要再次回到上面的 deliver()
方法了的流程上。就是 NetNotifiedQueue
的 receive()
方法。
template <class T>
struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
...
void receive(ArenaObjectReader& reader) override {
this->addPromiseRef();
T message;
reader.deserialize(message);
this->send(std::move(message));
this->delPromiseRef();
}
bool isStream() const override { return true; }
};
这个方法会调用 render.deserialzie(message)
方法,将收到的报文反序列化为一个 C++ 对象。整个反序列化的过程涉及到 flat buffer 的实现(见 flow/flat_buffers.h 和 flow/ObjectSerializer.h 等文件),这部分不在本文的讨论范围中,这里提到这个,主要是要找出反序列化过程中,会执行的代码。在 flat buffer 的实现过程中,你会发现如下代码(flow/flat_buffers.h):
template <class Member>
std::enable_if_t<expect_serialize_member<Member>> load(Member& member, const uint8_t* current) {
SerializeFun fun(current, this->context());
if constexpr (serializable_traits<Member>::value) {
serializable_traits<Member>::serialize(fun, member);
} else {
member.serialize(fun);
}
}
这个代码使用了 C++ 的 traits 技术。上面代码中,高亮的那一行,调用了 serializable_traits<Member>::value
,用于判断一个对象是否实现了了一个 serializable_traits
的辅助对象,如果有的化,这里会返回 true
,否则就是默认值 false
。前面提到的 ReplyPromise
就实现了这个 traits,所以当反序列化程序遇到 GetInterfaceRequest
中的 reply
对象时,就会调用对应的 traits 实现,这个实现如下所示:
template <class T>
struct serializable_traits<ReplyPromise<T>> : std::true_type {
template<class Archiver>
static void serialize(Archiver& ar, ReplyPromise<T>& p) {
if constexpr (Archiver::isDeserializing) {
UID token;
serializer(ar, token);
auto endpoint = FlowTransport::transport().loadedEndpoint(token);
p = ReplyPromise<T>(endpoint);
networkSender(p.getFuture(), endpoint);
} else {
const auto& ep = p.getEndpoint().token;
serializer(ar, ep);
}
}
};
从代码可以看出,在对一个 ReplyPromise
对象进行反序列化的时候,会初始化一个 ReplyPromise
对象,然后会调用 networkSender()
方法。这个方法看起来会起一个 ACTOR,然后等到应用代码告诉它发送响应的时候再发送响应(名字后面带 er 的,一种类似 Golang Interface 的命名方式,这么说是不是觉得这个很想一个动态创建的 goroutine?)。
networkSender()
当等待输入准备好之后,调用网络接口发送数据。
ACTOR template <class T>
void networkSender(Future<T> input, Endpoint endpoint) {
try {
T value = wait(input);
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
} catch (Error& err) {
// if (err.code() == error_code_broken_promise) return;
ASSERT(err.code() != error_code_actor_cancelled);
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
}
}
在 FlowTransport::transport().sendUnreliable()
里会调用 peer->send()
来发送报文,不过这个发送报文是异步的,是将要发送的报文添加到 peer 的 unsent
队列中,然后等待 connectionWriter
ACTOR 来发送. connectionWriter
会在 connectionKeeper
中被调用,而 connectionKeeper
则会在我们上面提到的 connectionIncoming
中被调用。 connectionWriter
里会调用 conn->write
来发送报文,是在 Net2.actor.cpp 中的 Conneciton
对象的 write
方法。
总结
- flow 应用启动的时候会启动对应的 ACTOR 来实现网络监听,以便接受客户端的新连接。
- flow 使用
Endpoint
来标识应用,收到的报文会根据endpoint.token
来进行分发。 - flow 消息分发过程中的 deserialize 过程会附加上 reply 的处理,这个是利用 traits 特性实现的。