目录

RPC | Thrift 发送过程

这边梳理一下整个RPC通信以及数据包的过程(从看源代码角度出发的),首先是客户端调用ping这个函数,

1
2
3
4
5
6
Test test;
test.num1 = 1000;
test.num2 = 1000;
test.str = "000000";
test.bs = "111111";
client.ping(test);

那么调用这个函数其实是分为两步的,一部是send_ping,另一部分是recv_ping,下面我们专注于send_ping部分。

1
2
3
4
5
void CalculatorClient::ping(const Test& test)
{   
  send_ping(test);
  recv_ping();
} 

send_ping部分主要如下所示,首先是写入message开始的相关信息,之后写入相关参数的信息,最后写入message结束标识符。最后transport层再写入end,这个函数返回的就是整个buffer区的大小,当然还需要flush掉。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void CalculatorClient::send_ping(const Test& test)
{   
  int32_t cseqid = 0;
    
  oprot_->writeMessageBegin("ping", ::apache::thrift::protocol::T_CALL, cseqid);
  
  Calculator_ping_pargs args;
  args.test = &test;
  args.write(oprot_);
    
  oprot_->writeMessageEnd();
    
  oprot_->getTransport()->writeEnd();
  oprot_->getTransport()->flush();
}

最后来贴一下server端和client端的配置信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// client 
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);

// server
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);

1. 写入message开始的相关信息

如下所示,是TBinaryProtocol的写入的函数,首先是4 bytes的version,再是4 bytes(string length)+函数名长度+4 bytes的seqid。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name,
                                                                     const TMessageType messageType,
                                                                     const int32_t seqid) {
  if (this->strict_write_) {
    int32_t version = (VERSION_1) | ((int32_t)messageType);
    uint32_t wsize = 0;
    wsize += writeI32(version);
    wsize += writeString(name);
    wsize += writeI32(seqid);
    return wsize;
  } else {
    uint32_t wsize = 0;
    wsize += writeString(name);
    wsize += writeByte((int8_t)messageType);
    wsize += writeI32(seqid);
    return wsize;
  }
}

2. 传递参数部分

接下来传递的是函数参数的相关信息了,下面分以下几种方式进行讨论

2.1. 当参数类型是int_32等

参数部分的字节数是1 byte的参数Type和2 bytes的参数ID,通过这两个量可以来定位参数的位置,从而把相应参数传进去。当参数传递完成之后,最后写入field stop标志符。

1.为什么fieldend为0?个人觉得因为前面已经知道了参数type,也就没有必要在写入end标识符了。

2.bool、int_8、int_16、int_32、int_64、double是同理的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
uint32_t Calculator_add_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_add_pargs");	// 0 byte

  // 3 bytes(参数ID和Type)
  xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);	
  xfer += oprot->writeI32((*(this->num1)));	// int type is 4 bytes
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
  xfer += oprot->writeI32((*(this->num2)));
  xfer += oprot->writeFieldEnd();

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

2.1.1. writeStructBegin是写入0 byte

1
2
3
4
5
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructBegin(const char* name) {
  (void)name;
  return 0;
}

2.1.2. writeFieldBegin是写入3 bytes

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldBegin(const char* name,
                                                                   const TType fieldType,
                                                                   const int16_t fieldId) {
  (void)name;
  uint32_t wsize = 0;
  wsize += writeByte((int8_t)fieldType);
  wsize += writeI16(fieldId);
  return wsize;
}

2.1.3. writeI32函数是写入4 bytes

1
2
3
4
5
6
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI32(const int32_t i32) {
  int32_t net = (int32_t)ByteOrder_::toWire32(i32);
  this->trans_->write((uint8_t*)&net, 4);
  return 4;
}

2.1.4. writeFieldEnd函数是写入0字节

1
2
3
4
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldEnd() {
  return 0;
}

2.1.5. writeFieldStop是写入1字节

1
2
3
4
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() {
  return writeByte((int8_t)T_STOP);
}

2.1.6. writeStructEnd函数是写入0字节

1
2
3
4
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructEnd() {
  return 0;
}

2.2. 当参数类型是字符串

当参数类型是字符串的话,同样需要写入1 byte的参数Type和2 bytes的参数ID,但是除写入字符串外,还要写入4字节的字符串长度。通过字符串长度字段,可以完整的将参数中的字符串传进去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_ping_pargs");	// 0 byte

  // 3 bytes(参数ID和Type)
  xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRING, 1);	
  // string length + 4 bytes(store the string length)
  xfer += oprot->writeString((*(this->test)));	
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

2.2.1. writeString函数4字节的字符串长度+字符串本身

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeString(const StrType& str) {
  if (str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
    throw TProtocolException(TProtocolException::SIZE_LIMIT);
  uint32_t size = static_cast<uint32_t>(str.size());
  uint32_t result = writeI32((int32_t)size);
  if (size > 0) {
    this->trans_->write((uint8_t*)str.data(), size);
  }
  return result + size;
}

2.3. 当参数类型是bytes数组的时候

它还是会当做string来写入,同传字符串一样

1
2
3
4
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeBinary(const std::string& str) {
  return TBinaryProtocolT<Transport_, ByteOrder_>::writeString(str);
}

2.4. 当参数类型是struct时

参数类型为struct时,那么同其他参数类型一样,也需要拿出3个字节存储struct的类型和ID,之后是依次存储struct中的内容,最后写入1 byte的field stop。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_ping_pargs");	// 0 byte

  xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRUCT, 1); // 3 bytes
  xfer += (*(this->test)).write(oprot);	// 具体bytes number看下面这段代码
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
    
  return xfer;
}

下面这段代码是依次存储struct 中的内容。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
uint32_t Test::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Test");	// 0 byte

  xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);		// 3 bytes
  xfer += oprot->writeI32(this->num1);	// int type is 4 bytes
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_DOUBLE, 2);	// 3 bytes
  xfer += oprot->writeDouble(this->num2);	// double type is 8 bytes
  xfer += oprot->writeFieldEnd(); // 0 byte 

  xfer += oprot->writeFieldBegin("str", ::apache::thrift::protocol::T_STRING, 3);	// 3 bytes
  xfer += oprot->writeString(this->str);	// string length + 4 bytes(store the string length)
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("bs", ::apache::thrift::protocol::T_STRING, 4);	// 3 bytes
  xfer += oprot->writeBinary(this->bs);	// string length + 4 bytes(store the string length)
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

3. 写入message end的部分

对于写入message end来说,写入的是0 byte。

1
2
3
4
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageEnd() {
  return 0;
}

4. 最后是transport层的操作

1
2
oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();

由于我们TFramedTransport,所以要比TBufferedTransport多传4 bytes,先来看一下这两个函数,其中writeEnd函数并没有写入任何东西,而是直接返回缓冲区的大小。

1
2
3
uint32_t TFramedTransport::writeEnd() {
  return static_cast<uint32_t>(wBase_ - wBuf_.get());
}

在这边transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);可以看到flush的时候是把字节串的内容和4 bytes的字节串的长度写进去了。 wBase_ = wBuf_.get() + sizeof(sz_nbo);这一部分的操作是重置wBase_,从wBuf_后面4 bytes的地方开始。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void TFramedTransport::flush() {
  int32_t sz_hbo, sz_nbo;
  assert(wBufSize_ > sizeof(sz_nbo));

  // Slip the frame size into the start of the buffer.
  sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
  sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
  memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));

  if (sz_hbo > 0) {
    // Note that we reset wBase_ (with a pad for the frame size)
    // prior to the underlying write to ensure we're in a sane state
    // (i.e. internal buffer cleaned) if the underlying write throws
    // up an exception
    wBase_ = wBuf_.get() + sizeof(sz_nbo);

    // Write size and frame body.
    transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
  }

  // Flush the underlying transport.
  transport_->flush();

  // reclaim write buffer
  if (wBufSize_ > bufReclaimThresh_) {
    wBufSize_ = DEFAULT_BUFFER_SIZE;
    wBuf_.reset(new uint8_t[wBufSize_]);
    setWriteBuffer(wBuf_.get(), wBufSize_);

    // reset wBase_ with a pad for the frame size
    int32_t pad = 0;
    wBase_ = wBuf_.get() + sizeof(pad);
  }
}

TBufferedTransport和TFramedTransport都是有缓存的,均继承TBufferBase,调用下一层TTransport类进行读写操作,结构极为相似。只是TFramedTransport以帧为传输单位,帧结构为:4个字节(int32_t)+传输字节串,头4个字节是存储后面字节串的长度,该字节串才是正确需要传输的数据,因此TFramedTransport每传一帧要比TBufferedTransport和TSocket多传4个字节;

本段参考 xiazemin 的泽民博客 https://xiazemin.github.io/MyBlog/web/2019/06/12/transport.html#

5. 总结整理

假如client端和server端采用的都是TFramedTransport和TBinaryProtocol配置,RPC过程封包如下

  • 首先是message begin的写入:4 bytes的version字段+4 bytes用来存储函数名长度的字段+函数名长度字段+4bytes的sequence ID字段;

  • 接下去是函数参数部分的写入,针对不同参数的类型来说,都会写入1 byte参数的类型和2 bytes的参数ID(用于定位),之后再写入参数的值,而对于binary数组或者string,那么在写入参数值之前需要先写入binary数组的长度或者string的长度(为了得到正确的参数值),当函数参数部分的内容写完之后会最后写入1 byte的field stop(表示函数参数部分结束)。针对struct类型也是如此,struct也是一个参数类型,所以首先需要用1 byte参数的类型和2 bytes的参数ID来表示传入的struct参数,之后还需要对struct的内容进行写入,而struct的内容的写入同写函数部分内容类似,为表示struct 的内容写入完全,最后也需要写入field stop。当struct的内容写入完全之后,在传递参数这块最后依旧还得写入一个file stop表示函数参数部分结束。

  • 之后是message end的写入,只是针对上述提到的配置,不会写入任何内容;

  • 最后由于是TFramedTransport,那么会多加一个4 bytes的字段,用来表示上述字节串的长度;

本文参考及推荐博客

1.浅谈Thrift内部实现原理