Tutorial 中的 promiseDemotriggerDemo 的实现和 simipleTimer 类似,就不具体分析了。本文主要来分析一下 echoServer 的实现,这个程序涉及网络收发包,代码非常绕。

echoServer

这个是一个网络应用程序,其异步回调流程更难理解,涉及到 RequestStreamwaitNext 的使用。

接口定义

首先注意到, EchoServerInterface  的定义,以及每个 Request 的定义都包含一个 serialize 方法。

struct EchoServerInterface {
	constexpr static FileIdentifier file_identifier = 3152015;
	RequestStream<struct GetInterfaceRequest> getInterface;
	RequestStream<struct EchoRequest> echo;
	RequestStream<struct ReverseRequest> reverse;

	template <class Ar>
	void serialize(Ar& ar) {
		serializer(ar, echo, reverse);
	}
};

struct GetInterfaceRequest {
	constexpr static FileIdentifier file_identifier = 12004156;
	ReplyPromise<EchoServerInterface> reply;

	template <class Ar>
	void serialize(Ar& ar) {
		serializer(ar, reply);
	}
};

struct EchoRequest {
	constexpr static FileIdentifier file_identifier = 10624019;
	std::string message;
	// this variable has to be called reply!
	ReplyPromise<std::string> reply;

	template <class Ar>
	void serialize(Ar& ar) {
		serializer(ar, message, reply);
	}
};

struct ReverseRequest {
	constexpr static FileIdentifier file_identifier = 10765955;
	std::string message;
	// this variable has to be called reply!
	ReplyPromise<std::string> reply;

	template <class Ar>
	void serialize(Ar& ar) {
		serializer(ar, message, reply);
	}
};

服务的实现

ACTOR Future<Void> echoServer() {
	state EchoServerInterface echoServer;
	echoServer.getInterface.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint);
	loop {
		choose {
			when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
				req.reply.send(echoServer);
			}
			when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); }
			when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
				req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
			}
		}
	}
}

这个代码经过 flow 编译器处理后,生成的代码结构其实和 simpleTimer 差不多,这里就不展开了,理解这个代码的难点在于理解报文是如何收发的,以及报文的收发如何和 ACTOR 结合起来

注册 well known endpoint

这个服务的实现函数,首先调用了 echoServer.getInterface.makeWellKnownEndpoint() 方法。

  • RequestStream  有个私有成员 NetNotifiedQueue<T>* queue;
  • makeWellKnownEndpoint(token, taskID) 会调用 queue->makeWellKnownEndpoint(token, taskID)
  • NetNotifiedQueue 是用于网络应用的 NotifiedQueue,同时也继承了 FlowReceiver
  • makeWellKnownEndpoint 就是  FlowReceiver  的成员函数。 FlowReceiver 对一个应用的接收者,表示某个服务端的收包的一方。该成员函数会调用 lowTransport::transport().addWellKnownEndpoint(endpoint, this, taskID)
  • 这个 FlowTransport::transport() 指向的是 Net2.global(INetwork::enFlowTransport)),就是一个 FlowTransport 实例,这个函数的作用是把这个应用,也就是 Endpoint,注册到一个列表中(后续收到报文时,会从这个列表中查找,得到对应的 receiver):

      void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) {
      	endpoint.addresses = self->localAddresses;
      	ASSERT(receiver->isStream());
      	self->endpoints.insertWellKnown(receiver, endpoint.token, taskID);
      }
    

这行代码之后,剩下的代码逻辑就是:循环接收消息,然后发送响应。但是,看完这些代码,你会发现,自己根本不知道网络报文是怎么收上来的。

Listen and Accept Connections

网络应用程序,肯定需要监听和接受新的连接,所以在一个 flow 应用的代码流程上,肯定有地方做这些事情。

这里就要看一下整个应用的启动代码: FlowTransport::transport()::bind(publicAddress, publicAddress)bind 函数会调用一个全局定义的 listen 函数。

listen 函数,准确的说,是一个 ACTOR,会循环接受新的连接:

ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr ) {
	state ActorCollectionNoErrors incoming;  // Actors monitoring incoming connections that haven't yet been associated with a peer
	state Reference<IListener> listener = INetworkConnections::net()->listen( listenAddr );
	state uint64_t connectionCount = 0;
	try {
		loop {
			Reference<IConnection> conn = wait( listener->accept() );
			if(conn) {
				TraceEvent("ConnectionFrom", conn->getDebugID()).suppressFor(1.0)
					.detail("FromAddress", conn->getPeerAddress())
					.detail("ListenAddress", listenAddr.toString());
				incoming.add( connectionIncoming(self, conn) );
			}
			connectionCount++;
			if( connectionCount%(FLOW_KNOBS->ACCEPT_BATCH_SIZE) == 0 ) {
				wait(delay(0, TaskPriority::AcceptSocket));
			}
		}
	} catch (Error& e) {
		TraceEvent(SevError, "ListenError").error(e);
		throw;
	}
}
  • listerner = INetworkConnections::net()->listen(listenAddr) ,这里其实就是调用了 Net2::listen()Net2::listen() 会返回一个 Listener 实例,这个对象使用 boost::asio 来实现异步网络应用。Listener  的 doAccept 函数,会使用 asio  的 async_accept 来等待连接完成(会立即返回),当连接完成后,设置一个 BindPromise 来通知调用者。
  • 全局 listen 函数会 wait (listener->accept()),如果返回成功,那么就会对返回的 conn 调用  connectionIncoming(). listen 函数中还使用了 ActorCollection 来保存所有的监听连接。
  • connectionIncoming() 里会先调用 wait(conn->acceptHandshake() 完成连接的握手,然后调用一个 connectionReader() 执行读操作。

Read from connection

connectionReader() 就是不停的从连接读取数据,然后执行回调。

  • 该函数里会直接处理第一个连接报文,就是当一个新的客户端连接上来时,会先发送的一些私有协议连接报文。
  • 然后调用 scanPackets 来处理收到的报文. scanPackets 里会检查 checksum, 然后创建一个 ArenaReader 来读取报文的内容,首先会读取 UID, 然后调用 deliver() 函数,将报文传递给应用协议,也就是本例中的 EchoServer。
    • 在 flow 中,识别应用是靠 Endpoint.token,所以要先读出 UID。通过 UID 在 TransportData 中找到对应的 endpoint。这个时候,结合上面的提到的对 FlowTransport::addWellKnownEndpoint 的调用,我们就可以把我们定义的 EchoServer 和 flow 的收包流程连接上,也就是在收包的 ACTOR 中,会通过 UID 找到对应的 receiver,也就是我们的调用 FlowTransport::addWellKnownEndpoint 方法时使用的 RequestStream 对象。
  • 接下来,就是看看收到报文时,在 deliver 函数中,回到了哪个函数?从代码中,可以看到,调用的是 receiver->receive() 函数,这个方法,在 NetNotifiedQueue 中实现,参数是一个 ArenaObjectReader

      template <class T>
      struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
      	using FastAllocated<NetNotifiedQueue<T>>::operator new;
      	using FastAllocated<NetNotifiedQueue<T>>::operator delete;
    
      	NetNotifiedQueue(int futures, int promises) : NotifiedQueue<T>(futures, promises) {}
      	NetNotifiedQueue(int futures, int promises, const Endpoint& remoteEndpoint)
      	  : NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {}
    
      	void destroy() override { delete this; }
      	void receive(ArenaObjectReader& reader) override {
      		this->addPromiseRef();
      		T message;
      		reader.deserialize(message);
      		this->send(std::move(message));
      		this->delPromiseRef();
      	}
      	bool isStream() const override { return true; }
      };
    
  • 这个方法先调用了 reader 的 deserialize 方法来,然后调用了 send 方法来触发回调。回调的是 flow 生成的代码,在这个例子中,主要的部分是 EchoServer 的 req.reply.send(echoServer) 。到这里,我们可以衔接上如下的 flow 代码:

      when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
      				req.reply.send(echoServer);
      			}
    

    req 就是上面 this->send(std::move(message)) 函数的参数。

  • 然后,我们的 flow 代码会执行 req.reply.send(echoServer) ,这个 reply 是什么时候初始化的?从整个代码流程看,只可能是在 reader.deserialize(message) 的时候初始化的

Write to connection

GetInterfaceRequest 有一个成员 reply ,类型是 ReplyPromise<EchoServerInterface>。当我们调用 waitNext(echoServer.getInterface.getFuture()) 时,得到了一个 req 对象,作为一个服务端程序来说,获得客户端发来的一个 request 对象,可以想象得到,其中的内容应该主要包含了客户端发送的请求内容,但是这个 reply 显然是一个响应对象,而且在服务端的代码中,还调用了它的 send 方法,我们必须要搞清楚,这个 reply 对象是怎么初始化的。

这里就要再次回到上面的 deliver() 方法了的流程上。就是 NetNotifiedQueuereceive() 方法。

template <class T>
struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
  ...
	void receive(ArenaObjectReader& reader) override {
		this->addPromiseRef();
		T message;
		reader.deserialize(message);
		this->send(std::move(message));
		this->delPromiseRef();
	}
	bool isStream() const override { return true; }
};

这个方法会调用 render.deserialzie(message) 方法,将收到的报文反序列化为一个 C++ 对象。整个反序列化的过程涉及到 flat buffer 的实现(见 flow/flat_buffers.hflow/ObjectSerializer.h 等文件),这部分不在本文的讨论范围中,这里提到这个,主要是要找出反序列化过程中,会执行的代码。在 flat buffer 的实现过程中,你会发现如下代码(flow/flat_buffers.h):

template <class Member>
	std::enable_if_t<expect_serialize_member<Member>> load(Member& member, const uint8_t* current) {
		SerializeFun fun(current, this->context());
		if constexpr (serializable_traits<Member>::value) {
			serializable_traits<Member>::serialize(fun, member);
		} else {
			member.serialize(fun);
		}
	}

这个代码使用了 C++ 的 traits 技术。上面代码中,高亮的那一行,调用了 serializable_traits<Member>::value ,用于判断一个对象是否实现了了一个 serializable_traits 的辅助对象,如果有的化,这里会返回 true,否则就是默认值 false。前面提到的 ReplyPromise 就实现了这个 traits,所以当反序列化程序遇到 GetInterfaceRequest 中的 reply 对象时,就会调用对应的 traits 实现,这个实现如下所示:

template <class T>
struct serializable_traits<ReplyPromise<T>> : std::true_type {
	template<class Archiver>
	static void serialize(Archiver& ar, ReplyPromise<T>& p) {
		if constexpr (Archiver::isDeserializing) {
			UID token;
			serializer(ar, token);
			auto endpoint = FlowTransport::transport().loadedEndpoint(token);
			p = ReplyPromise<T>(endpoint);
			networkSender(p.getFuture(), endpoint);
		} else {
			const auto& ep = p.getEndpoint().token;
			serializer(ar, ep);
		}
	}
};

从代码可以看出,在对一个 ReplyPromise 对象进行反序列化的时候,会初始化一个 ReplyPromise 对象,然后会调用 networkSender() 方法。这个方法看起来会起一个 ACTOR,然后等到应用代码告诉它发送响应的时候再发送响应(名字后面带 er 的,一种类似 Golang Interface 的命名方式,这么说是不是觉得这个很想一个动态创建的 goroutine?)。

networkSender() 当等待输入准备好之后,调用网络接口发送数据。

ACTOR template <class T>
void networkSender(Future<T> input, Endpoint endpoint) {
	try {
		T value = wait(input);
		FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false);
	} catch (Error& err) {
		// if (err.code() == error_code_broken_promise) return;
		ASSERT(err.code() != error_code_actor_cancelled);
		FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false);
	}
}

FlowTransport::transport().sendUnreliable() 里会调用 peer->send() 来发送报文,不过这个发送报文是异步的,是将要发送的报文添加到 peer 的 unsent 队列中,然后等待 connectionWriter ACTOR 来发送. connectionWriter 会在 connectionKeeper 中被调用,而 connectionKeeper 则会在我们上面提到的 connectionIncoming 中被调用。 connectionWriter 里会调用 conn->write 来发送报文,是在 Net2.actor.cpp 中的 Conneciton 对象的 write 方法。

总结

  • flow 应用启动的时候会启动对应的 ACTOR 来实现网络监听,以便接受客户端的新连接。
  • flow 使用 Endpoint 来标识应用,收到的报文会根据 endpoint.token 来进行分发。
  • flow 消息分发过程中的 deserialize 过程会附加上 reply 的处理,这个是利用 traits 特性实现的。

知识共享许可协议本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。