1. Thrift框架理解
对于Thrift框架的理解,我们使用官方提供的框架图来讲解一下
-
client/server
对于client来说,该层是比较简单的client的业务逻辑代码,而对于server来说,有提供的几种模式的server。
-
Thrift的Protocol层
对于RPC来说需要能正确的传输调用的信息已经返回的结果,那么protocol层则主要负责序列化和反序列化,该层主要有以下几种,对于Binary来说,那么使用的是Binary的方式来进行序列化,那么会涉及到一定的协议格式,对于JSON来说,使用的则是以JSON的方式序列化。
-
Thrift的Transport层
由于RPC是一种特殊的网络编程,那么需要封装一层传输层来支持底层的网络通信,而Transport则负责这层。Transport层不仅仅用来支持底层的网络通信,它可能还会对Protocol层的数据进一步分装。比如针对Framed类型来说,该层还会多4个字节的字段用来存储Protocol层的字节串长度。
那么针对采用TCP/IP作为更底层的通信协议的话,整个通信过程如下图所示
2. Thrift类概述
该图引用自CSDN_Thrift源码解析(一)主要类概述。该图解析的thrift源码是基于Java语言的1.0.0版本,虽然本系列接下去是基于C++在那进行讲解的,但是也具有一定的参考性。
根据上图大概可以分为以下几个类
- TTransport:客户端传输层相关的类;
- TServerTransport:服务端传输层相关的类;
- TProtocol:序列化、反序列化相关的类;
- TServer:服务器的IO事件流模型相关的类;
- TProcessor:函数,接口调用相关的类;
下面主要针对TTransport和Tprotocol层的代码进行阐述。
3. Protocol层的源码
上面提到Protocol层主要是负责序列化的,Thrift的序列化协议主要包含以下几种,
- TBinaryProtocol
- TCompactProtocol
- TJSONProtocol
- ......
以上这些协议都继承自TProtocol这个抽象类,那么我们先来看一下TProtocol这个类大致的内容,比较详细的可以直接去看c++源代码,该类中主要是一些读写函数的抽象,write主要负责序列化,read主要负责反序列化。
class TProtocol {
public:
virtual ~TProtocol();
/**
* Writing functions.
*/
virtual uint32_t writeMessageBegin_virt(const std::string& name,
const TMessageType messageType,
const int32_t seqid) = 0;
virtual uint32_t writeMessageEnd_virt() = 0;
virtual uint32_t writeStructBegin_virt(const char* name) = 0;
virtual uint32_t writeStructEnd_virt() = 0;
virtual uint32_t writeFieldBegin_virt(const char* name,
const TType fieldType,
const int16_t fieldId) = 0;
virtual uint32_t writeFieldEnd_virt() = 0;
virtual uint32_t writeFieldStop_virt() = 0;
......
virtual uint32_t writeBool_virt(const bool value) = 0;
virtual uint32_t writeByte_virt(const int8_t byte) = 0;
virtual uint32_t writeI16_virt(const int16_t i16) = 0;
virtual uint32_t writeI32_virt(const int32_t i32) = 0;
virtual uint32_t writeI64_virt(const int64_t i64) = 0;
virtual uint32_t writeDouble_virt(const double dub) = 0;
virtual uint32_t writeString_virt(const std::string& str) = 0;
virtual uint32_t writeBinary_virt(const std::string& str) = 0;
/**
* Reading functions
*/
virtual uint32_t readMessageBegin_virt(std::string& name,
TMessageType& messageType,
int32_t& seqid) = 0;
virtual uint32_t readMessageEnd_virt() = 0;
virtual uint32_t readStructBegin_virt(std::string& name) = 0;
virtual uint32_t readStructEnd_virt() = 0;
virtual uint32_t readFieldBegin_virt(std::string& name, TType& fieldType, int16_t& fieldId) = 0;
......
}
下面我们重点看一下TBinaryProtocol的具体实现,这边就挑几个主要的进行阐述。
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
if (this->strict_write_) {
int32_t version = (VERSION_1) | ((int32_t)messageType);
uint32_t wsize = 0;
wsize += writeI32(version);
wsize += writeString(name);
wsize += writeI32(seqid);
return wsize;
} else {
uint32_t wsize = 0;
wsize += writeString(name);
wsize += writeByte((int8_t)messageType);
wsize += writeI32(seqid);
return wsize;
}
}
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeFieldStop() {
return writeByte((int8_t)T_STOP);
}
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeI32(const int32_t i32) {
int32_t net = (int32_t)ByteOrder_::toWire32(i32);
this->trans_->write((uint8_t*)&net, 4);
return 4;
}
template <class Transport_, class ByteOrder_>
template <typename StrType>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeString(const StrType& str) {
if (str.size() > static_cast<size_t>((std::numeric_limits<int32_t>::max)()))
throw TProtocolException(TProtocolException::SIZE_LIMIT);
uint32_t size = static_cast<uint32_t>(str.size());
uint32_t result = writeI32((int32_t)size);
if (size > 0) {
this->trans_->write((uint8_t*)str.data(), size);
}
return result + size;
}
template <class Transport_, class ByteOrder_>
uint32_t TBinaryProtocolT<Transport_, ByteOrder_>::writeBinary(const std::string& str) {
return TBinaryProtocolT<Transport_, ByteOrder_>::writeString(str);
}
writeMessageBegin
:序列化message的开始部分,可以理解为message的头部,thrift中的message表示一次接口调用、接口调用结果或者异常。这个函数中主要是调用相应的write函数来序列和写入thrift的版本、message的name以及seqid等基本信息,比如针对一个函数调用而言,那么name字段就是函数的名字。
writeFieldStop
:所有字段序列化完成之后,调用writeByte
函数写入一个T_STOP标识符,表示结束。
writeI32
:序列化int类型的数据,调用protocol层封装的transport类的实例来写入一个int类型。
writeString
:序列化string类型的数据,写入字符串的内容和4字节的字符串大小。
writeBinary
:在C++中,binary类型还是按照string类型来写入。
相关的read操作就是上述write操作的逆操作者,想要了解的可以直接去看源码。
4. Transport层的源码
protocol层主要是负责传输数据的序列化,那么transport层主要是负责数据流的传输。当然Transport不仅仅是底层网络传输,它还是上层流的封装。下面常见的transport层协议有以下这些
- TSocket:阻塞式socket,封装了Socket接口;
- TFramedTransport:以frame为单位进行传输,该类封装了TMemoryInputTransport做输入流,封装了TByteArryOutPutStream做输出流,作为内存读写缓冲区的一个封装。TFramedTransport的flush方法时,会先写4个字节的输出流的长度作为消息头,然后写消息体(帧结构为:4个字节(int32_t)+传输字节串)。TBufferedTransport和TFramedTransport都是有缓存的,但是TBufferedTransport比TFramedTransport比少传四个字节;
- TFileTransport:以文件形式进行传输;
- TZlibTransport:使用zlib进行压缩, 与其他传输方式联合使用;
- ......
Thrift中所有的transport层协议的基类是TTransport,下面我们来看一下这个基类有哪些内容,只列出了主要的部分,更详细的可以去看一下源码,
/**
* Helper template to hoist readAll implementation out of TTransport
*/
template <class Transport_>
uint32_t readAll(Transport_& trans, uint8_t* buf, uint32_t len) {
uint32_t have = 0;
uint32_t get = 0;
while (have < len) {
get = trans.read(buf + have, len - have);
if (get <= 0) {
throw TTransportException(TTransportException::END_OF_FILE, "No more data to read.");
}
have += get;
}
return have;
}
/**
* Generic interface for a method of transporting data. A TTransport may be
* capable of either reading or writing, but not necessarily both.
*/
class TTransport {
public:
virtual bool isOpen() { return false; }
virtual void open() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
}
virtual void close() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
}
uint32_t read(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return read_virt(buf, len);
}
virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
}
uint32_t readAll(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return readAll_virt(buf, len);
}
virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
return apache::thrift::transport::readAll(*this, buf, len);
}
virtual uint32_t readEnd() {
// default behaviour is to do nothing
return 0;
}
void write(const uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
write_virt(buf, len);
}
virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) {
throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
}
virtual uint32_t writeEnd() {
// default behaviour is to do nothing
return 0;
}
virtual void flush() {
// default behaviour is to do nothing
}
......
};
上面的内容中主要包括了以下内容:
isOpen
:用户判断底层传输链路是否是ready的;
open
:用于打开底层的传输链路;
close
:用于关闭底层传输链路;
read
:用于从链路中读取数据;
write
:用于往链路中写入数据;
flush
:用于将内存中的buffer数据写到链路中;
基类将transport层一些共同的操作进行了抽象。下面以TFramedTransport为例来讲一下具体的实现是怎么样的
/**
* Framed transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
* binary chunk followed by the data payload. This allows the receiver on the
* other end to always do fixed-length reads.
*/
class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> {
public:
void open() { transport_->open(); }
bool isOpen() { return transport_->isOpen(); }
bool peek() { return (rBase_ < rBound_) || transport_->peek(); }
void close() {
flush();
transport_->close();
}
virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
virtual void writeSlow(const uint8_t* buf, uint32_t len);
virtual void flush();
uint32_t readEnd();
uint32_t writeEnd();
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
stdcxx::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
/*
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
*/
using TBufferBase::readAll;
/**
* Returns the origin of the underlying transport
*/
virtual const std::string getOrigin() { return transport_->getOrigin(); }
/**
* Set the maximum size of the frame at read
*/
void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
/**
* Get the maximum size of the frame at read
*/
uint32_t getMaxFrameSize() { return maxFrameSize_; }
protected:
virtual bool readFrame();
void initPointers() {
setReadBuffer(NULL, 0);
setWriteBuffer(wBuf_.get(), wBufSize_);
// Pad the buffer so we can insert the size later.
int32_t pad = 0;
this->write((uint8_t*)&pad, sizeof(pad));
}
stdcxx::shared_ptr<TTransport> transport_;
uint32_t rBufSize_;
uint32_t wBufSize_;
boost::scoped_array<uint8_t> rBuf_;
boost::scoped_array<uint8_t> wBuf_;
uint32_t bufReclaimThresh_;
uint32_t maxFrameSize_;
};
上面代码中有一个成员变量,stdcxx::shared_ptr<TTransport> transport_
,这个成员变量表示更底层的TTransport,一般情况下是TSocket,通过上述已有的实现比如close
、open
等IO操作,最终都是交给这个更加底层的Transport来实现的。下面我们来看一下flush
这个函数
void TFramedTransport::flush() {
int32_t sz_hbo, sz_nbo;
assert(wBufSize_ > sizeof(sz_nbo));
// Slip the frame size into the start of the buffer.
sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
if (sz_hbo > 0) {
// Note that we reset wBase_ (with a pad for the frame size)
// prior to the underlying write to ensure we're in a sane state
// (i.e. internal buffer cleaned) if the underlying write throws
// up an exception
wBase_ = wBuf_.get() + sizeof(sz_nbo);
// Write size and frame body.
transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
}
// Flush the underlying transport.
transport_->flush();
// reclaim write buffer
if (wBufSize_ > bufReclaimThresh_) {
wBufSize_ = DEFAULT_BUFFER_SIZE;
wBuf_.reset(new uint8_t[wBufSize_]);
setWriteBuffer(wBuf_.get(), wBufSize_);
// reset wBase_ with a pad for the frame size
int32_t pad = 0;
wBase_ = wBuf_.get() + sizeof(pad);
}
}
通过TFramedTransport类上面的提示可知,上述的一些write操作,其实都是往in-memory buffer中写内容的,而flush操作是真正将数据flush到链路中,通过网络发送出去。
Framed transport. All writes go into an in-memory buffer until flush is called, at which point the transport writes the length of the entire binary chunk followed by the data payload. This allows the receiver on the other end to always do fixed-length reads.
对于TFramedTransport的flush操作来说,会首先计算buff中的总长度,之后将这个长度写到4个字节字段中(这4个字节表示整字节串的长度是多少),之后调用更加底层的Transport(比如TSocket)的flush操作将数据发送出去。
// Write size and frame body.
transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
一般来说TFramedTransport都是配合TSocket使用的,如下所示就是TFramedTransport中包含了更加底层的TSocket.
stdcxx::shared_ptr<TTransport> socket(new TSocket("192.168.250.192", 9090));
stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket));
那么我们下面大致来看一下TSocket的实现,更多细节可以看一下源代码。
bool TSocket::isOpen() {
return (socket_ != THRIFT_INVALID_SOCKET);
}
void TSocket::openConnection(struct addrinfo* res) {
if (isOpen()) {
return;
}
if (!path_.empty()) {
socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
} else {
socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
}
if (socket_ == THRIFT_INVALID_SOCKET) {
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
}
......
}
void TSocket::write(const uint8_t* buf, uint32_t len) {
uint32_t sent = 0;
while (sent < len) {
uint32_t b = write_partial(buf + sent, len - sent);
if (b == 0) {
throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
}
sent += b;
}
}
uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
if (socket_ == THRIFT_INVALID_SOCKET) {
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
}
uint32_t sent = 0;
......
int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
......
return b;
}
通过TSocket的实现,我们可以看到Tsocket底层的实现还是原生的socket,上层的Transport在底层基本上都是使用TSocket进行网络的传输,上层只是加一些缓存或者是压缩之类的逻辑。
参考和推荐博客