这边梳理一下整个RPC通信以及数据包的过程(从看源代码角度出发的),首先是客户端调用ping
这个函数,
Test test;
test.num1 = 1000;
test.num2 = 1000;
test.str = "000000";
test.bs = "111111";
client.ping(test);
那么调用这个函数其实是分为两步的,一部是send_ping
,另一部分是recv_ping
,下面我们专注于send_ping
部分。
void CalculatorClient::ping(const Test& test)
{
send_ping(test);
recv_ping();
}
send_ping
部分主要如下所示,首先是写入message开始的相关信息,之后写入相关参数的信息,最后写入message结束标识符。最后transport层再写入end,这个函数返回的就是整个buffer区的大小,当然还需要flush掉。
void CalculatorClient::send_ping(const Test& test)
{
int32_t cseqid = 0;
oprot_->writeMessageBegin("ping", ::apache::thrift::protocol::T_CALL, cseqid);
Calculator_ping_pargs args;
args.test = &test;
args.write(oprot_);
oprot_->writeMessageEnd();
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
}
最后来贴一下server端和client端的配置信息
// client
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);
// server
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);
1. 写入message开始的相关信息
如下所示,是TBinaryProtocol的写入的函数,首先是4 bytes的version,再是4 bytes(string length)+函数名长度+4 bytes的seqid。
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
if (this->strict_write_) {
int32_t version = (VERSION_1) | ((int32_t)messageType);
uint32_t wsize = 0;
wsize += writeI32(version);
wsize += writeString(name);
wsize += writeI32(seqid);
return wsize;
} else {
uint32_t wsize = 0;
wsize += writeString(name);
wsize += writeByte((int8_t)messageType);
wsize += writeI32(seqid);
return wsize;
}
}
2. 传递参数部分
接下来传递的是函数参数的相关信息了,下面分以下几种方式进行讨论
2.1. 当参数类型是int_32等
参数部分的字节数是1 byte的参数Type和2 bytes的参数ID,通过这两个量可以来定位参数的位置,从而把相应参数传进去。当参数传递完成之后,最后写入field stop标志符。
1.为什么fieldend为0?个人觉得因为前面已经知道了参数type,也就没有必要在写入end标识符了。
2.bool、int_8、int_16、int_32、int_64、double是同理的。
uint32_t Calculator_add_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Calculator_add_pargs"); // 0 byte
// 3 bytes(参数ID和Type)
xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32((*(this->num1))); // int type is 4 bytes
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32((*(this->num2)));
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop(); // 1 byte
xfer += oprot->writeStructEnd(); // 0 byte
return xfer;
}
2.1.1. writeStructBegin是写入0 byte
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructBegin(const char* name) {
(void)name;
return 0;
}
2.1.2. writeFieldBegin是写入3 bytes
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldBegin(const char* name,
const TType fieldType,
const int16_t fieldId) {
(void)name;
uint32_t wsize = 0;
wsize += writeByte((int8_t)fieldType);
wsize += writeI16(fieldId);
return wsize;
}
2.1.3. writeI32函数是写入4 bytes
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI32(const int32_t i32) {
int32_t net = (int32_t)ByteOrder_::toWire32(i32);
this->trans_->write((uint8_t*)&net, 4);
return 4;
}
2.1.4. writeFieldEnd函数是写入0字节
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldEnd() {
return 0;
}
2.1.5. writeFieldStop是写入1字节
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() {
return writeByte((int8_t)T_STOP);
}
2.1.6. writeStructEnd函数是写入0字节
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructEnd() {
return 0;
}
2.2. 当参数类型是字符串
当参数类型是字符串的话,同样需要写入1 byte的参数Type和2 bytes的参数ID,但是除写入字符串外,还要写入4字节的字符串长度。通过字符串长度字段,可以完整的将参数中的字符串传进去。
uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Calculator_ping_pargs"); // 0 byte
// 3 bytes(参数ID和Type)
xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRING, 1);
// string length + 4 bytes(store the string length)
xfer += oprot->writeString((*(this->test)));
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldStop(); // 1 byte
xfer += oprot->writeStructEnd(); // 0 byte
return xfer;
}
2.2.1. writeString函数4字节的字符串长度+字符串本身
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeString(const StrType& str) {
if (str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t size = static_cast<uint32_t>(str.size());
uint32_t result = writeI32((int32_t)size);
if (size > 0) {
this->trans_->write((uint8_t*)str.data(), size);
}
return result + size;
}
2.3. 当参数类型是bytes数组的时候
它还是会当做string来写入,同传字符串一样
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeBinary(const std::string& str) {
return TBinaryProtocolT<Transport_, ByteOrder_>::writeString(str);
}
2.4. 当参数类型是struct时
参数类型为struct时,那么同其他参数类型一样,也需要拿出3个字节存储struct的类型和ID,之后是依次存储struct中的内容,最后写入1 byte的field stop。
uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Calculator_ping_pargs"); // 0 byte
xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRUCT, 1); // 3 bytes
xfer += (*(this->test)).write(oprot); // 具体bytes number看下面这段代码
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldStop(); // 1 byte
xfer += oprot->writeStructEnd(); // 0 byte
return xfer;
}
下面这段代码是依次存储struct 中的内容。
uint32_t Test::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("Test"); // 0 byte
xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1); // 3 bytes
xfer += oprot->writeI32(this->num1); // int type is 4 bytes
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_DOUBLE, 2); // 3 bytes
xfer += oprot->writeDouble(this->num2); // double type is 8 bytes
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldBegin("str", ::apache::thrift::protocol::T_STRING, 3); // 3 bytes
xfer += oprot->writeString(this->str); // string length + 4 bytes(store the string length)
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldBegin("bs", ::apache::thrift::protocol::T_STRING, 4); // 3 bytes
xfer += oprot->writeBinary(this->bs); // string length + 4 bytes(store the string length)
xfer += oprot->writeFieldEnd(); // 0 byte
xfer += oprot->writeFieldStop(); // 1 byte
xfer += oprot->writeStructEnd(); // 0 byte
return xfer;
}
3. 写入message end的部分
对于写入message end来说,写入的是0 byte。
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageEnd() {
return 0;
}
4. 最后是transport层的操作
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();
由于我们TFramedTransport,所以要比TBufferedTransport多传4 bytes,先来看一下这两个函数,其中writeEnd函数并没有写入任何东西,而是直接返回缓冲区的大小。
uint32_t TFramedTransport::writeEnd() {
return static_cast<uint32_t>(wBase_ - wBuf_.get());
}
在这边transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
可以看到flush的时候是把字节串的内容和4 bytes的字节串的长度写进去了。wBase_ = wBuf_.get() + sizeof(sz_nbo);
这一部分的操作是重置wBase_
,从wBuf_
后面4 bytes的地方开始。
void TFramedTransport::flush() {
int32_t sz_hbo, sz_nbo;
assert(wBufSize_ > sizeof(sz_nbo));
// Slip the frame size into the start of the buffer.
sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
if (sz_hbo > 0) {
// Note that we reset wBase_ (with a pad for the frame size)
// prior to the underlying write to ensure we're in a sane state
// (i.e. internal buffer cleaned) if the underlying write throws
// up an exception
wBase_ = wBuf_.get() + sizeof(sz_nbo);
// Write size and frame body.
transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
}
// Flush the underlying transport.
transport_->flush();
// reclaim write buffer
if (wBufSize_ > bufReclaimThresh_) {
wBufSize_ = DEFAULT_BUFFER_SIZE;
wBuf_.reset(new uint8_t[wBufSize_]);
setWriteBuffer(wBuf_.get(), wBufSize_);
// reset wBase_ with a pad for the frame size
int32_t pad = 0;
wBase_ = wBuf_.get() + sizeof(pad);
}
}
TBufferedTransport和TFramedTransport都是有缓存的,均继承TBufferBase,调用下一层TTransport类进行读写操作,结构极为相似。只是TFramedTransport以帧为传输单位,帧结构为:4个字节(int32_t)+传输字节串,头4个字节是存储后面字节串的长度,该字节串才是正确需要传输的数据,因此TFramedTransport每传一帧要比TBufferedTransport和TSocket多传4个字节;
本段参考
xiazemin 的泽民博客 https://xiazemin.github.io/MyBlog/web/2019/06/12/transport.html#
5. 总结整理
假如client端和server端采用的都是TFramedTransport和TBinaryProtocol配置,RPC过程封包如下
-
首先是message begin的写入:4 bytes的version字段+4 bytes用来存储函数名长度的字段+函数名长度字段+4bytes的sequence ID字段;
-
接下去是函数参数部分的写入,针对不同参数的类型来说,都会写入1 byte参数的类型和2 bytes的参数ID(用于定位),之后再写入参数的值,而对于binary数组或者string,那么在写入参数值之前需要先写入binary数组的长度或者string的长度(为了得到正确的参数值),当函数参数部分的内容写完之后会最后写入1 byte的field stop(表示函数参数部分结束)。针对struct类型也是如此,struct也是一个参数类型,所以首先需要用1 byte参数的类型和2 bytes的参数ID来表示传入的struct参数,之后还需要对struct的内容进行写入,而struct的内容的写入同写函数部分内容类似,为表示struct 的内容写入完全,最后也需要写入field stop。当struct的内容写入完全之后,在传递参数这块最后依旧还得写入一个file stop表示函数参数部分结束。
-
之后是message end的写入,只是针对上述提到的配置,不会写入任何内容;
-
最后由于是TFramedTransport,那么会多加一个4 bytes的字段,用来表示上述字节串的长度;
本文参考及推荐博客