FoundationDB flow -- Part 2
Tutorial 中的 promiseDemo 和 triggerDemo 的实现和 simipleTimer 类似,就不具体分析了。本文主要来分析一下 echoServer 的实现,这个程序涉及网络收发包,代码非常绕。
echoServer
这个是一个网络应用程序,其异步回调流程更难理解,涉及到 RequestStream 和 waitNext 的使用。
接口定义
首先注意到, EchoServerInterface 的定义,以及每个 Request 的定义都包含一个 serialize 方法。
struct EchoServerInterface {
constexpr static FileIdentifier file_identifier = 3152015;
RequestStream<struct GetInterfaceRequest> getInterface;
RequestStream<struct EchoRequest> echo;
RequestStream<struct ReverseRequest> reverse;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, echo, reverse);
}
};
struct GetInterfaceRequest {
constexpr static FileIdentifier file_identifier = 12004156;
ReplyPromise<EchoServerInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct EchoRequest {
constexpr static FileIdentifier file_identifier = 10624019;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct ReverseRequest {
constexpr static FileIdentifier file_identifier = 10765955;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
服务的实现
ACTOR Future<Void> echoServer() {
state EchoServerInterface echoServer;
echoServer.getInterface.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint);
loop {
choose {
when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
req.reply.send(echoServer);
}
when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); }
when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
}
}
}
}
这个代码经过 flow 编译器处理后,生成的代码结构其实和 simpleTimer 差不多,这里就不展开了,理解这个代码的难点在于理解报文是如何收发的,以及报文的收发如何和 ACTOR 结合起来。
注册 well known endpoint
这个服务的实现函数,首先调用了 echoServer.getInterface.makeWellKnownEndpoint() 方法。
RequestStream有个私有成员NetNotifiedQueue<T>* queue;makeWellKnownEndpoint(token, taskID)会调用queue->makeWellKnownEndpoint(token, taskID)NetNotifiedQueue是用于网络应用的NotifiedQueue,同时也继承了FlowReceiver。makeWellKnownEndpoint就是FlowReceiver的成员函数。FlowReceiver对一个应用的接收者,表示某个服务端的收包的一方。该成员函数会调用lowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID)-
这个
FlowTransport::transport()指向的是Net2.global(INetwork::enFlowTransport)),就是一个FlowTransport实例,这个函数的作用是把这个应用,也就是 Endpoint,注册到一个列表中(后续收到报文时,会从这个列表中查找,得到对应的 receiver):void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) { endpoint.addresses = self->localAddresses; ASSERT(receiver->isStream()); self->endpoints.insertWellKnown(receiver, endpoint.token, taskID); }
这行代码之后,剩下的代码逻辑就是:循环接收消息,然后发送响应。但是,看完这些代码,你会发现,自己根本不知道网络报文是怎么收上来的。
Listen and Accept Connections
网络应用程序,肯定需要监听和接受新的连接,所以在一个 flow 应用的代码流程上,肯定有地方做这些事情。
这里就要看一下整个应用的启动代码: FlowTransport::transport()::bind(publicAddress, publicAddress) , bind 函数会调用一个全局定义的 listen 函数。
listen 函数,准确的说,是一个 ACTOR,会循环接受新的连接:
ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr ) {
state ActorCollectionNoErrors incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer
state Reference<IListener> listener = INetworkConnections::net()->listen( listenAddr );
state uint64_t connectionCount = 0;
try {
loop {
Reference<IConnection> conn = wait( listener->accept() );
if(conn) {
TraceEvent("ConnectionFrom", conn->getDebugID()).suppressFor(1.0)
.detail("FromAddress", conn->getPeerAddress())
.detail("ListenAddress", listenAddr.toString());
incoming.add( connectionIncoming(self, conn) );
}
connectionCount++;
if( connectionCount%(FLOW_KNOBS->ACCEPT_BATCH_SIZE) == 0 ) {
wait(delay(0, TaskPriority::AcceptSocket));
}
}
} catch (Error& e) {
TraceEvent(SevError, "ListenError").error(e);
throw;
}
}
listerner = INetworkConnections::net()->listen(listenAddr),这里其实就是调用了Net2::listen()。Net2::listen()会返回一个Listener实例,这个对象使用boost::asio来实现异步网络应用。Listener的doAccept函数,会使用 asio 的async_accept来等待连接完成(会立即返回),当连接完成后,设置一个BindPromise来通知调用者。- 全局
listen函数会wait (listener->accept()),如果返回成功,那么就会对返回的conn调用connectionIncoming().listen函数中还使用了ActorCollection来保存所有的监听连接。 connectionIncoming()里会先调用wait(conn->acceptHandshake()完成连接的握手,然后调用一个connectionReader()执行读操作。
Read from connection
connectionReader() 就是不停的从连接读取数据,然后执行回调。
- 该函数里会直接处理第一个连接报文,就是当一个新的客户端连接上来时,会先发送的一些私有协议连接报文。
- 然后调用
scanPackets来处理收到的报文.scanPackets里会检查 checksum, 然后创建一个ArenaReader来读取报文的内容,首先会读取UID, 然后调用deliver()函数,将报文传递给应用协议,也就是本例中的 EchoServer。- 在 flow 中,识别应用是靠
Endpoint.token,所以要先读出UID。通过 UID 在TransportData中找到对应的 endpoint。这个时候,结合上面的提到的对FlowTransport::addWellKnownEndpoint的调用,我们就可以把我们定义的 EchoServer 和 flow 的收包流程连接上,也就是在收包的 ACTOR 中,会通过 UID 找到对应的 receiver,也就是我们的调用FlowTransport::addWellKnownEndpoint方法时使用的RequestStream对象。
- 在 flow 中,识别应用是靠
-
接下来,就是看看收到报文时,在
deliver函数中,回到了哪个函数?从代码中,可以看到,调用的是receiver->receive()函数,这个方法,在NetNotifiedQueue中实现,参数是一个ArenaObjectReader。template <class T> struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> { using FastAllocated<NetNotifiedQueue<T>>::operator new; using FastAllocated<NetNotifiedQueue<T>>::operator delete; NetNotifiedQueue(int futures, int promises) : NotifiedQueue<T>(futures, promises) {} NetNotifiedQueue(int futures, int promises, const Endpoint& remoteEndpoint) : NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {} void destroy() override { delete this; } void receive(ArenaObjectReader& reader) override { this->addPromiseRef(); T message; reader.deserialize(message); this->send(std::move(message)); this->delPromiseRef(); } bool isStream() const override { return true; } }; -
这个方法先调用了 reader 的
deserialize方法来,然后调用了 send 方法来触发回调。回调的是 flow 生成的代码,在这个例子中,主要的部分是 EchoServer 的req.reply.send(echoServer)。到这里,我们可以衔接上如下的 flow 代码:when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) { req.reply.send(echoServer); }req就是上面this->send(std::move(message))函数的参数。 - 然后,我们的 flow 代码会执行
req.reply.send(echoServer),这个reply是什么时候初始化的?从整个代码流程看,只可能是在reader.deserialize(message)的时候初始化的。
Write to connection
GetInterfaceRequest 有一个成员 reply ,类型是 ReplyPromise<EchoServerInterface>。当我们调用 waitNext(echoServer.getInterface.getFuture()) 时,得到了一个 req 对象,作为一个服务端程序来说,获得客户端发来的一个 request 对象,可以想象得到,其中的内容应该主要包含了客户端发送的请求内容,但是这个 reply 显然是一个响应对象,而且在服务端的代码中,还调用了它的 send 方法,我们必须要搞清楚,这个 reply 对象是怎么初始化的。
这里就要再次回到上面的 deliver() 方法了的流程上。就是 NetNotifiedQueue 的 receive() 方法。
template <class T>
struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
...
void receive(ArenaObjectReader& reader) override {
this->addPromiseRef();
T message;
reader.deserialize(message);
this->send(std::move(message));
this->delPromiseRef();
}
bool isStream() const override { return true; }
};
这个方法会调用 render.deserialzie(message) 方法,将收到的报文反序列化为一个 C++ 对象。整个反序列化的过程涉及到 flat buffer 的实现(见 flow/flat_buffers.h 和 flow/ObjectSerializer.h 等文件),这部分不在本文的讨论范围中,这里提到这个,主要是要找出反序列化过程中,会执行的代码。在 flat buffer 的实现过程中,你会发现如下代码(flow/flat_buffers.h):
template <class Member>
std::enable_if_t<expect_serialize_member<Member>> load(Member& member, const uint8_t* current) {
SerializeFun fun(current, this->context());
if constexpr (serializable_traits<Member>::value) {
serializable_traits<Member>::serialize(fun, member);
} else {
member.serialize(fun);
}
}
这个代码使用了 C++ 的 traits 技术。上面代码中,高亮的那一行,调用了 serializable_traits<Member>::value ,用于判断一个对象是否实现了了一个 serializable_traits 的辅助对象,如果有的化,这里会返回 true,否则就是默认值 false。前面提到的 ReplyPromise 就实现了这个 traits,所以当反序列化程序遇到 GetInterfaceRequest 中的 reply 对象时,就会调用对应的 traits 实现,这个实现如下所示:
template <class T>
struct serializable_traits<ReplyPromise<T>> : std::true_type {
template<class Archiver>
static void serialize(Archiver& ar, ReplyPromise<T>& p) {
if constexpr (Archiver::isDeserializing) {
UID token;
serializer(ar, token);
auto endpoint = FlowTransport::transport().loadedEndpoint(token);
p = ReplyPromise<T>(endpoint);
networkSender(p.getFuture(), endpoint);
} else {
const auto& ep = p.getEndpoint().token;
serializer(ar, ep);
}
}
};
从代码可以看出,在对一个 ReplyPromise 对象进行反序列化的时候,会初始化一个 ReplyPromise 对象,然后会调用 networkSender() 方法。这个方法看起来会起一个 ACTOR,然后等到应用代码告诉它发送响应的时候再发送响应(名字后面带 er 的,一种类似 Golang Interface 的命名方式,这么说是不是觉得这个很想一个动态创建的 goroutine?)。
networkSender() 当等待输入准备好之后,调用网络接口发送数据。
ACTOR template <class T>
void networkSender(Future<T> input, Endpoint endpoint) {
try {
T value = wait(input);
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
} catch (Error& err) {
// if (err.code() == error_code_broken_promise) return;
ASSERT(err.code() != error_code_actor_cancelled);
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
}
}
在 FlowTransport::transport().sendUnreliable() 里会调用 peer->send() 来发送报文,不过这个发送报文是异步的,是将要发送的报文添加到 peer 的 unsent 队列中,然后等待 connectionWriter ACTOR 来发送. connectionWriter 会在 connectionKeeper 中被调用,而 connectionKeeper 则会在我们上面提到的 connectionIncoming 中被调用。 connectionWriter 里会调用 conn->write 来发送报文,是在 Net2.actor.cpp 中的 Conneciton 对象的 write 方法。
总结
- flow 应用启动的时候会启动对应的 ACTOR 来实现网络监听,以便接受客户端的新连接。
- flow 使用
Endpoint来标识应用,收到的报文会根据endpoint.token来进行分发。 - flow 消息分发过程中的 deserialize 过程会附加上 reply 的处理,这个是利用 traits 特性实现的。
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。