程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

RPC | Thrift 发送过程

发表于 2019-10-20 | 分类于 RPC | 0 | 阅读次数 2032

这边梳理一下整个RPC通信以及数据包的过程(从看源代码角度出发的),首先是客户端调用ping这个函数,

Test test;
test.num1 = 1000;
test.num2 = 1000;
test.str = "000000";
test.bs = "111111";
client.ping(test);

那么调用这个函数其实是分为两步的,一部是send_ping,另一部分是recv_ping,下面我们专注于send_ping部分。

void CalculatorClient::ping(const Test& test)
{   
  send_ping(test);
  recv_ping();
} 

send_ping部分主要如下所示,首先是写入message开始的相关信息,之后写入相关参数的信息,最后写入message结束标识符。最后transport层再写入end,这个函数返回的就是整个buffer区的大小,当然还需要flush掉。

void CalculatorClient::send_ping(const Test& test)
{   
  int32_t cseqid = 0;
    
  oprot_->writeMessageBegin("ping", ::apache::thrift::protocol::T_CALL, cseqid);
  
  Calculator_ping_pargs args;
  args.test = &test;
  args.write(oprot_);
    
  oprot_->writeMessageEnd();
    
  oprot_->getTransport()->writeEnd();
  oprot_->getTransport()->flush();
}

最后来贴一下server端和client端的配置信息

// client 
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);

// server
stdcxx::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
CalculatorClient client(protocol);

1. 写入message开始的相关信息

如下所示,是TBinaryProtocol的写入的函数,首先是4 bytes的version,再是4 bytes(string length)+函数名长度+4 bytes的seqid。

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name,
                                                                     const TMessageType messageType,
                                                                     const int32_t seqid) {
  if (this->strict_write_) {
    int32_t version = (VERSION_1) | ((int32_t)messageType);
    uint32_t wsize = 0;
    wsize += writeI32(version);
    wsize += writeString(name);
    wsize += writeI32(seqid);
    return wsize;
  } else {
    uint32_t wsize = 0;
    wsize += writeString(name);
    wsize += writeByte((int8_t)messageType);
    wsize += writeI32(seqid);
    return wsize;
  }
}

2. 传递参数部分

接下来传递的是函数参数的相关信息了,下面分以下几种方式进行讨论

2.1. 当参数类型是int_32等

参数部分的字节数是1 byte的参数Type和2 bytes的参数ID,通过这两个量可以来定位参数的位置,从而把相应参数传进去。当参数传递完成之后,最后写入field stop标志符。

1.为什么fieldend为0?个人觉得因为前面已经知道了参数type,也就没有必要在写入end标识符了。

2.bool、int_8、int_16、int_32、int_64、double是同理的。

uint32_t Calculator_add_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_add_pargs");	// 0 byte

  // 3 bytes(参数ID和Type)
  xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);	
  xfer += oprot->writeI32((*(this->num1)));	// int type is 4 bytes
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_I32, 2);
  xfer += oprot->writeI32((*(this->num2)));
  xfer += oprot->writeFieldEnd();

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

2.1.1. writeStructBegin是写入0 byte

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructBegin(const char* name) {
  (void)name;
  return 0;
}

2.1.2. writeFieldBegin是写入3 bytes

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldBegin(const char* name,
                                                                   const TType fieldType,
                                                                   const int16_t fieldId) {
  (void)name;
  uint32_t wsize = 0;
  wsize += writeByte((int8_t)fieldType);
  wsize += writeI16(fieldId);
  return wsize;
}

2.1.3. writeI32函数是写入4 bytes

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI32(const int32_t i32) {
  int32_t net = (int32_t)ByteOrder_::toWire32(i32);
  this->trans_->write((uint8_t*)&net, 4);
  return 4;
}

2.1.4. writeFieldEnd函数是写入0字节

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldEnd() {
  return 0;
}

2.1.5. writeFieldStop是写入1字节

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() {
  return writeByte((int8_t)T_STOP);
}

2.1.6. writeStructEnd函数是写入0字节

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeStructEnd() {
  return 0;
}

2.2. 当参数类型是字符串

当参数类型是字符串的话,同样需要写入1 byte的参数Type和2 bytes的参数ID,但是除写入字符串外,还要写入4字节的字符串长度。通过字符串长度字段,可以完整的将参数中的字符串传进去。

uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_ping_pargs");	// 0 byte

  // 3 bytes(参数ID和Type)
  xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRING, 1);	
  // string length + 4 bytes(store the string length)
  xfer += oprot->writeString((*(this->test)));	
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

2.2.1. writeString函数4字节的字符串长度+字符串本身

uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeString(const StrType& str) {
  if (str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
    throw TProtocolException(TProtocolException::SIZE_LIMIT);
  uint32_t size = static_cast<uint32_t>(str.size());
  uint32_t result = writeI32((int32_t)size);
  if (size > 0) {
    this->trans_->write((uint8_t*)str.data(), size);
  }
  return result + size;
}

2.3. 当参数类型是bytes数组的时候

它还是会当做string来写入,同传字符串一样

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeBinary(const std::string& str) {
  return TBinaryProtocolT<Transport_, ByteOrder_>::writeString(str);
}

2.4. 当参数类型是struct时

参数类型为struct时,那么同其他参数类型一样,也需要拿出3个字节存储struct的类型和ID,之后是依次存储struct中的内容,最后写入1 byte的field stop。

uint32_t Calculator_ping_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Calculator_ping_pargs");	// 0 byte

  xfer += oprot->writeFieldBegin("test", ::apache::thrift::protocol::T_STRUCT, 1); // 3 bytes
  xfer += (*(this->test)).write(oprot);	// 具体bytes number看下面这段代码
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
    
  return xfer;
}

下面这段代码是依次存储struct 中的内容。

uint32_t Test::write(::apache::thrift::protocol::TProtocol* oprot) const {
  uint32_t xfer = 0;
  ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
  xfer += oprot->writeStructBegin("Test");	// 0 byte

  xfer += oprot->writeFieldBegin("num1", ::apache::thrift::protocol::T_I32, 1);		// 3 bytes
  xfer += oprot->writeI32(this->num1);	// int type is 4 bytes
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("num2", ::apache::thrift::protocol::T_DOUBLE, 2);	// 3 bytes
  xfer += oprot->writeDouble(this->num2);	// double type is 8 bytes
  xfer += oprot->writeFieldEnd(); // 0 byte 

  xfer += oprot->writeFieldBegin("str", ::apache::thrift::protocol::T_STRING, 3);	// 3 bytes
  xfer += oprot->writeString(this->str);	// string length + 4 bytes(store the string length)
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldBegin("bs", ::apache::thrift::protocol::T_STRING, 4);	// 3 bytes
  xfer += oprot->writeBinary(this->bs);	// string length + 4 bytes(store the string length)
  xfer += oprot->writeFieldEnd();	// 0 byte

  xfer += oprot->writeFieldStop();	// 1 byte
  xfer += oprot->writeStructEnd();	// 0 byte
  return xfer;
}

3. 写入message end的部分

对于写入message end来说,写入的是0 byte。

template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageEnd() {
  return 0;
}

4. 最后是transport层的操作

oprot_->getTransport()->writeEnd();
oprot_->getTransport()->flush();

由于我们TFramedTransport,所以要比TBufferedTransport多传4 bytes,先来看一下这两个函数,其中writeEnd函数并没有写入任何东西,而是直接返回缓冲区的大小。

uint32_t TFramedTransport::writeEnd() {
  return static_cast<uint32_t>(wBase_ - wBuf_.get());
}

在这边transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);可以看到flush的时候是把字节串的内容和4 bytes的字节串的长度写进去了。wBase_ = wBuf_.get() + sizeof(sz_nbo);这一部分的操作是重置wBase_,从wBuf_后面4 bytes的地方开始。

void TFramedTransport::flush() {
  int32_t sz_hbo, sz_nbo;
  assert(wBufSize_ > sizeof(sz_nbo));

  // Slip the frame size into the start of the buffer.
  sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
  sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
  memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));

  if (sz_hbo > 0) {
    // Note that we reset wBase_ (with a pad for the frame size)
    // prior to the underlying write to ensure we're in a sane state
    // (i.e. internal buffer cleaned) if the underlying write throws
    // up an exception
    wBase_ = wBuf_.get() + sizeof(sz_nbo);

    // Write size and frame body.
    transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
  }

  // Flush the underlying transport.
  transport_->flush();

  // reclaim write buffer
  if (wBufSize_ > bufReclaimThresh_) {
    wBufSize_ = DEFAULT_BUFFER_SIZE;
    wBuf_.reset(new uint8_t[wBufSize_]);
    setWriteBuffer(wBuf_.get(), wBufSize_);

    // reset wBase_ with a pad for the frame size
    int32_t pad = 0;
    wBase_ = wBuf_.get() + sizeof(pad);
  }
}

TBufferedTransport和TFramedTransport都是有缓存的,均继承TBufferBase,调用下一层TTransport类进行读写操作,结构极为相似。只是TFramedTransport以帧为传输单位,帧结构为:4个字节(int32_t)+传输字节串,头4个字节是存储后面字节串的长度,该字节串才是正确需要传输的数据,因此TFramedTransport每传一帧要比TBufferedTransport和TSocket多传4个字节;

本段参考
xiazemin 的泽民博客 https://xiazemin.github.io/MyBlog/web/2019/06/12/transport.html#

5. 总结整理

假如client端和server端采用的都是TFramedTransport和TBinaryProtocol配置,RPC过程封包如下

  • 首先是message begin的写入:4 bytes的version字段+4 bytes用来存储函数名长度的字段+函数名长度字段+4bytes的sequence ID字段;

  • 接下去是函数参数部分的写入,针对不同参数的类型来说,都会写入1 byte参数的类型和2 bytes的参数ID(用于定位),之后再写入参数的值,而对于binary数组或者string,那么在写入参数值之前需要先写入binary数组的长度或者string的长度(为了得到正确的参数值),当函数参数部分的内容写完之后会最后写入1 byte的field stop(表示函数参数部分结束)。针对struct类型也是如此,struct也是一个参数类型,所以首先需要用1 byte参数的类型和2 bytes的参数ID来表示传入的struct参数,之后还需要对struct的内容进行写入,而struct的内容的写入同写函数部分内容类似,为表示struct 的内容写入完全,最后也需要写入field stop。当struct的内容写入完全之后,在传递参数这块最后依旧还得写入一个file stop表示函数参数部分结束。

  • 之后是message end的写入,只是针对上述提到的配置,不会写入任何内容;

  • 最后由于是TFramedTransport,那么会多加一个4 bytes的字段,用来表示上述字节串的长度;

本文参考及推荐博客

1.浅谈Thrift内部实现原理

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/72
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# RPC # Thrift
计算机体系结构 | 计算机指令
MIT 6.828 课程 | 4-Preemptive Multitasking PartB
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%