更新時間:2022-09-16 來源:黑馬程序員 瀏覽量:
在之前的內容中,我們講解了消費者端服務發現與提供者端服務暴露的相關內容,同時也知道消費者端通過內置的負載均衡算法獲取合適的調用invoker進行遠程調用。那么,本章節重點關注的就是遠程調用過程即網絡通信。
網絡通信位于Remoting模塊:
- Remoting 實現是 Dubbo 協議的實現,如果你選擇 RMI 協議,整個 Remoting 都不會用上;
- Remoting 內部再劃為 `Transport 傳輸層` 和 `Exchange 信息交換層`;
- Transport 層只負責單向消息傳輸,是對 Mina, Netty, Grizzly 的抽象,它也可以擴展 UDP 傳輸;
- Exchange 層是在傳輸層之上封裝了 Request-Response 語義;
網絡通信的問題:
客戶端與服務端連通性問題
粘包拆包問題
異步多線程數據一致問題
通信協議
dubbo內置,dubbo協議 ,rmi協議,hessian協議,http協議,webservice協議,thrift協議,rest協議,grpc協議,memcached協議,redis協議等10種通訊協議。各個協議特點如下
dubbo協議
Dubbo 缺省協議采用單一長連接和 NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者機器數遠大于服務提供者機器數的情況。
缺省協議,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 tbremoting 交互。
- 連接個數:單連接
- 連接方式:長連接
- 傳輸協議:TCP
- 傳輸方式:NIO 異步傳輸
- 序列化:Hessian 二進制序列化
- 適用范圍:傳入傳出參數數據包較小(建議小于100K),消費者比提供者個數多,單一消費者無法壓滿提供者,盡量不要用 dubbo 協議傳輸大文件或超大字符串。
- 適用場景:常規遠程服務方法調用
rmi協議
RMI 協議采用 JDK 標準的 `java.rmi.*` 實現,采用阻塞式短連接和 JDK 標準序列化方式。
- 連接個數:多連接
- 連接方式:短連接
- 傳輸協議:TCP
- 傳輸方式:同步傳輸
- 序列化:Java 標準二進制序列化
- 適用范圍:傳入傳出參數數據包大小混合,消費者與提供者個數差不多,可傳文件。
- 適用場景:常規遠程服務方法調用,與原生RMI服務互操作
hessian協議
Hessian 協議用于集成 Hessian 的服務,Hessian 底層采用 Http 通訊,采用 Servlet 暴露服務,Dubbo 缺省內嵌 Jetty 作為服務器實現。
Dubbo 的 Hessian 協議可以和原生 Hessian 服務互操作,即:
- 提供者用 Dubbo 的 Hessian 協議暴露服務,消費者直接用標準 Hessian 接口調用
- 或者提供方用標準 Hessian 暴露服務,消費方用 Dubbo 的 Hessian 協議調用。
- 連接個數:多連接
- 連接方式:短連接
- 傳輸協議:HTTP
- 傳輸方式:同步傳輸
- 序列化:Hessian二進制序列化
- 適用范圍:傳入傳出參數數據包較大,提供者比消費者個數多,提供者壓力較大,可傳文件。
- 適用場景:頁面傳輸,文件傳輸,或與原生hessian服務互操作
http協議
基于 HTTP 表單的遠程調用協議,采用 Spring 的 HttpInvoker 實現
- 連接個數:多連接
- 連接方式:短連接
- 傳輸協議:HTTP
- 傳輸方式:同步傳輸
- 序列化:表單序列化
- 適用范圍:傳入傳出參數數據包大小混合,提供者比消費者個數多,可用瀏覽器查看,可用表單或URL傳入參數,暫不支持傳文件。
- 適用場景:需同時給應用程序和瀏覽器 JS 使用的服務。
webservice協議
基于 WebService 的遠程調用協議,基于 Apache CXF 實現](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。
可以和原生 WebService 服務互操作,即:
- 提供者用 Dubbo 的 WebService 協議暴露服務,消費者直接用標準 WebService 接口調用,
- 或者提供方用標準 WebService 暴露服務,消費方用 Dubbo 的 WebService 協議調用。
- 連接個數:多連接
- 連接方式:短連接
- 傳輸協議:HTTP
- 傳輸方式:同步傳輸
- 序列化:SOAP 文本序列化(http + xml)
- 適用場景:系統集成,跨語言調用
thrift協議
當前 dubbo 支持 [[1\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn1)的 thrift 協議是對 thrift 原生協議 [[2\]](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/thrift.html#fn2) 的擴展,在原生協議的基礎上添加了一些額外的頭信息,比如 service name,magic number 等。
rest協議
基于標準的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現的REST調用支持
grpc協議
Dubbo 自 2.7.5 版本開始支持 gRPC 協議,對于計劃使用 HTTP/2 通信,或者想利用 gRPC 帶來的 Stream、反壓、Reactive 編程等能力的開發者來說, 都可以考慮啟用 gRPC 協議。
- 為期望使用 gRPC 協議的用戶帶來服務治理能力,方便接入 Dubbo 體系
- 用戶可以使用 Dubbo 風格的,基于接口的編程風格來定義和使用遠程服務
memcached協議
基于 memcached實現的 RPC 協議
redis協議
基于 Redis 實現的 RPC 協議
序列化
序列化就是將對象轉成字節流,用于網絡傳輸,以及將字節流轉為對象,用于在收到字節流數據后還原成對象。序列化的優勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網絡通訊,在`NettyClient.doOpen()`方法中可以看到Netty的相關類
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } });
然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. byte[] header = new byte[HEADER_LENGTH];
是的,就是Serialization接口,默認是Hessian2Serialization序列化接口。
Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。
- dubbo序列化:阿里尚未開發成熟的高效java序列化實現,阿里不建議在生產環境使用它。
- **hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認啟用的序列化方式。**
- json序列化:目前有兩種實現,一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現的簡單json庫,但其實現都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。
- java序列化:主要是采用JDK自帶的Java序列化實現,性能很不理想。
最近幾年,各種新的高效序列化方式層出不窮,不斷刷新序列化性能的上限,最典型的包括:
- 專門針對Java語言的:Kryo,FST等等
- 跨語言的:Protostuff,ProtoBuf,Thrift,Avro,MsgPack等等
這些序列化方式的性能多數都顯著優于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我們可以為 dubbo 引入 Kryo 和 FST 這兩種高效 Java 來優化 dubbo 的序列化。
使用Kryo和FST非常簡單,只需要在dubbo RPC的XML配置中添加一個屬性即可:
<dubbo:protocol name="dubbo" serialization="kryo"/>
網絡通信
dubbo中數據格式
解決socket中數據粘包拆包問題,一般有三種方式
* 定長協議(數據包長度一致)
* 定長的協議是指協議內容的長度是固定的,比如協議byte長度是50,當從網絡上讀取50個byte后,就進行decode解碼操作。定長協議在讀取或者寫入時,效率比較高,因為數據緩存的大小基本都確定了,就好比數組一樣,缺陷就是適應性不足,以RPC場景為例,很難估計出定長的長度是多少。
* 特殊結束符(數據尾:通過特殊的字符標識#)
* 相比定長協議,如果能夠定義一個特殊字符作為每個協議單元結束的標示,就能夠以變長的方式進行通信,從而在數據傳輸和高效之間取得平衡,比如用特殊字符`\n`。特殊結束符方式的問題是過于簡單的思考了協議傳輸的過程,對于一個協議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸的數據不能同結束符相同,否則就會出現紊亂。
* 變長協議(協議頭+payload模式)
* 這種一般是自定義協議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內容長度。
* dubbo就是使用這種形式的數據傳輸格式
Dubbo 框架定義了私有的RPC協議,其中請求和響應協議的具體內容我們使用表格來展示。
Dubbo 數據包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。
| 偏移量(Bit) | 字段 | 取值 |
| ----------- | ------------ | ------------------------------------------------------------ |
| 0 ~ 7 | 魔數高位 | 0xda00 |
| 8 ~ 15 | 魔數低位 | 0xbb |
| 16 | 數據包類型 | 0 - Response, 1 - Request |
| 17 | 調用方式 | 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用 |
| 18 | 事件標識 | 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包 |
| 19 ~ 23 | 序列化器編號 | 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization |
| 24 ~ 31 | 狀態 | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ...... |
| 32 ~ 95 | 請求編號 | 共8字節,運行時生成 |
| 96 ~ 127 | 消息體長度 | 運行時計算
消費方發送請求
(1)發送請求
為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調用路徑貼出來。
proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 調用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
dubbo消費方,自動生成代碼對象如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { private InvocationHandler handler; public String sayHello(String string) { // 將參數存儲到 Object 數組中 Object[] arrobject = new Object[]{string}; // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果 Object object = this.handler.invoke(this, methods[0], arrobject); // 返回調用結果 return (String)object; } }
InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內部封裝了服務降級邏輯。下面簡單看一下:
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 獲取 mock 配置值 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法, // 比如 FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // force:xxx 直接執行 mock 邏輯,不發起遠程調用 result = doMockInvoke(invocation, null); } else { // fail:xxx 表示消費方對調用服務失敗后,再執行 mock 邏輯,不拋出異常 try { result = this.invoker.invoke(invocation); } catch (RpcException e) { // 調用失敗,執行 mock 邏輯 result = doMockInvoke(invocation, e); } } return result; }
考慮到前文已經詳細分析過 FailoverClusterInvoker,因此本節略過 FailoverClusterInvoker,直接分析 DubboInvoker。
public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service ..."); } RpcInvocation invocation = (RpcInvocation) inv; // 設置 Invoker invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { // 設置 attachment invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { // 添加 contextAttachments 到 RpcInvocation#attachment 變量中 invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // 設置異步信息到 RpcInvocation#attachment 中 invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 抽象方法,由子類實現 return doInvoke(invocation); } catch (InvocationTargetException e) { // ... } catch (RpcException e) { // ... } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; // 省略其他方法 }
上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調用 doInvoke 執行后續的調用。doInvoke 是一個抽象方法,需要由子類實現,下面到 DubboInvoker 中看一下。
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); //將目標方法以及版本好作為參數放入到Invocation中 inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); //獲得客戶端連接 ExchangeClient currentClient; //初始化invoker的時候,構建的一個遠程通信連接 if (clients.length == 1) { //默認 currentClient = clients[0]; } else { //通過取模獲得其中一個連接 currentClient = clients[index.getAndIncrement() % clients.length]; } try { //表示當前的方法是否存在返回值 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); //isOneway 為 true,表示“單向”通信 if (isOneway) {//異步無返回值 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { //存在返回值 //是否采用異步 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); responseFuture.whenComplete((obj, t) -> { if (t != null) { asyncRpcResult.completeExceptionally(t); } else { asyncRpcResult.complete((AppResponse) obj); } }); RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult)); return asyncRpcResult; } } //省略無關代碼 }
最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發送出去
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed tosend request " + request + ", cause: The channel " + this + " is closed!"); } // 創建請求對象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { //NettyClient channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
(2)請求編碼
在netty啟動時,我們設置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:
public class ExchangeCodec extends TelnetCodec { // 消息頭長度 protected static final int HEADER_LENGTH = 16; // 魔數內容 protected static final short MAGIC = (short) 0xdabb; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; protected static final byte FLAG_REQUEST = (byte) 0x80; protected static final byte FLAG_TWOWAY = (byte) 0x40; protected static final byte FLAG_EVENT = (byte) 0x20; protected static final int SERIALIZATION_MASK = 0x1f; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); public Short getMagicCode() { return MAGIC; } @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { // 對 Request 對象進行編碼 encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 對 Response 對象進行編碼,后面分析 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // 創建消息頭字節數組,長度為 16 byte[] header = new byte[HEADER_LENGTH]; // 設置魔數 Bytes.short2bytes(MAGIC, header); // 設置數據包類型(Request/Response)和序列化器編號 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); // 設置通信方式(單向/雙向) if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; } // 設置事件標識 if (req.isEvent()) { header[2] |= FLAG_EVENT; } // 設置請求編號,8個字節,從第4個字節開始設置 Bytes.long2bytes(req.getId(), header, 4); // 獲取 buffer 當前的寫位置 int savedWriteIndex = buffer.writerIndex(); // 更新 writerIndex,為消息頭預留 16 個字節的空間 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 創建序列化器,比如 Hessian2ObjectOutput ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { // 對事件數據進行序列化操作 encodeEventData(channel, out, req.getData()); } else { // 對請求數據進行序列化操作 encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 獲取寫入的字節數,也就是消息體長度 int len = bos.writtenBytes(); checkPayload(channel, len); // 將消息體長度寫入到消息頭中 Bytes.int2bytes(len, header, 12); // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備 buffer.writerIndex(savedWriteIndex); // 從 savedWriteIndex 下標處寫入消息頭 buffer.writeBytes(header); // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } // 省略其他方法 }
以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數組中。然后對 Request 對象的 data 字段執行序列化操作,序列化后的數據最終會存儲到 ChannelBuffer 中。序列化操作執行完后,可得到數據序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節數組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:
public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; // 依次序列化 dubbo version、path、version out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); // 序列化調用方法名 out.writeUTF(inv.getMethodName()); // 將參數類型轉換為字符串,并進行序列化 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { // 對運行時參數進行序列化 out.writeObject(encodeInvocationArgument(channel, inv, i)); } // 序列化 attachments out.writeObject(inv.getAttachments()); } }
至此,關于服務消費方發送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。
提供方接收請求
(1) 請求解碼
這里直接分析請求數據的解碼邏輯,忽略中間過程,如下:
public class ExchangeCodec extends TelnetCodec { @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); // 創建消息頭字節數組 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // 讀取消息頭數據 buffer.readBytes(header); // 調用重載方法進行后續解碼工作 return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // 檢查魔數是否相等 if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } // 通過 telnet 命令行發送的數據包不包含消息頭,所以這里 // 調用 TelnetCodec 的 decode 方法對數據包進行解碼 return super.decode(channel, buffer, readable, header); } // 檢測可讀數據量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // 從消息頭中獲取消息體長度 int len = Bytes.bytes2int(header, 12); // 檢測消息體長度是否超出限制,超出則拋出異常 checkPayload(channel, len); int tt = len + HEADER_LENGTH; // 檢測可讀的字節數是否小于實際的字節數 if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { // 繼續進行解碼工作 return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
上面方法通過檢測消息頭中的魔數是否與規定的魔數相等,提前攔截掉非常規數據包,比如通過 telnet 命令行發出的數據包。接著再對消息體長度,以及可讀字節數進行檢測。最后調用 decodeBody 方法進行后續的解碼工作,ExchangeCodec 中實現了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調用。下面我們來看一下該方法的代碼。
public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { // 獲取消息頭中的第三個字節,并通過邏輯與運算得到序列化器編號 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 獲取調用編號 long id = Bytes.bytes2long(header, 4); // 通過邏輯與運算得到調用類型,0 - Response,1 - Request if ((flag & FLAG_REQUEST) == 0) { // 對響應結果進行解碼,得到 Response 對象。這個非本節內容,后面再分析 // ... } else { // 創建 Request 對象 Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); // 通過邏輯與運算得到通信方式,并設置到 Request 對象中 req.setTwoWay((flag & FLAG_TWOWAY) != 0); // 通過位運算檢測數據包是否為事件類型 if ((flag & FLAG_EVENT) != 0) { // 設置心跳事件到 Request 對象中 req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { // 對心跳包進行解碼,該方法已被標注為廢棄 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { // 對事件數據進行解碼 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcInvocation inv; // 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); // 在當前線程,也就是 IO 線程上進行后續的解碼工作。此工作完成后,可將 // 調用方法名、attachment、以及調用參數解析出來 inv.decode(); } else { // 僅創建 DecodeableRpcInvocation 對象,但不在當前線程上執行解碼邏輯 inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } // 設置 data 到 Request 對象中 req.setData(data); } catch (Throwable t) { // 若解碼過程中出現異常,則將 broken 字段設為 true, // 并將異常對象設置到 Reqeust 對象中 req.setBroken(true); req.setData(t); } return req; } } }
如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調用 DecodeableRpcInvocation 的 decode 方法進行后續的解碼工作。此工作完成后,可將調用方法名、attachment、以及調用參數解析出來。
(2)調用服務
解碼器將數據包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續向下傳遞。整個調用棧如下:
NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable) // 由線程池執行后續的調用邏輯
這里我們直接分析調用棧中的分析第一個和最后一個調用方法邏輯。如下:
考慮到篇幅,以及很多中間調用的邏輯并非十分重要,所以這里就不對調用棧中的每個方法都進行分析了。這里我們直接分析最后一個調用方法邏輯。如下:
public class ChannelEventRunnable implements Runnable { private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @Override public void run() { // 檢測通道狀態,對于請求或響應消息,此時 state = RECEIVED if (state == ChannelState.RECEIVED) { try { // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續的調用 handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..."); } } // 其他消息類型通過 switch 進行處理 else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("... operation error, channel is ..."); } break; case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); } } } }
如上,請求和響應消息出現頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉站,它的 run 方法中并不包含具體的調用邏輯,僅用于將參數傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。
public class DecodeHandler extends AbstractChannelHandlerDelegate { public DecodeHandler(ChannelHandler handler) { super(handler); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { // 對 Decodeable 接口實現類對象進行解碼 decode(message); } if (message instanceof Request) { // 對 Request 的 data 字段進行解碼 decode(((Request) message).getData()); } if (message instanceof Response) { // 對 Request 的 result 字段進行解碼 decode(((Response) message).getResult()); } // 執行后續邏輯 handler.received(channel, message); } private void decode(Object message) { // Decodeable 接口目前有兩個實現類, // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult if (message != null && message instanceof Decodeable) { try { // 執行解碼邏輯 ((Decodeable) message).decode(); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }
DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的 Request 對象會繼續向后傳遞
public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 獲取 Invoker 實例 Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // 回調相關,忽略 } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 通過 Invoker 調用具體的服務 return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..."); } // 忽略其他方法 } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { // 忽略回調和本地存根相關邏輯 // ... int port = channel.getLocalAddress().getPort(); // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如: // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象, // 服務導出過程中會將 <serviceKey, DubboExporter> 映射關系存儲到 exporterMap 集合中 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service ..."); // 獲取 Invoker 對象,并返回 return exporter.getInvoker(); } // 忽略其他方法 }
在之前課程中介紹過,服務全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過 Invoker 的 invoke 方法調用服務邏輯
public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { try { // 調用 doInvoke 執行后續的調用,并將調用結果封裝到 RpcResult 中,并 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..."); } } protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable; }
如上,doInvoke 是一個抽象方法,這個需要由具體的 Invoker 實例實現。Invoker 實例是在運行時通過 JavassistProxyFactory 創建的,創建邏輯如下:
public class JavassistProxyFactory extends AbstractProxyFactory { // 省略其他方法 @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創建匿名類對象 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調用 invokeMethod 方法進行后續的調用 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現類,并實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其他方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 類型轉換 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根據方法名調用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }
到這里,整個服務調用過程就分析完了。最后把調用過程貼出來,如下:
ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper0#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String)
提供方返回調用結果
服務提供方調用指定服務后,會將調用結果封裝到 Response 對象中,并將該對象返回給服務消費方。服務提供方也是通過 NettyChannel 的 send 方法將 Response 對象返回,這里就不在重復分析了。本節我們僅需關注 Response 對象的編碼過程即可。
public class ExchangeCodec extends TelnetCodec { public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { // 對響應對象進行編碼 encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { Serialization serialization = getSerialization(channel); // 創建消息頭字節數組 byte[] header = new byte[HEADER_LENGTH]; // 設置魔數 Bytes.short2bytes(MAGIC, header); // 設置序列化器編號 header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) header[2] |= FLAG_EVENT; // 獲取響應狀態 byte status = res.getStatus(); // 設置響應狀態 header[3] = status; // 設置請求編號 Bytes.long2bytes(res.getId(), header, 4); // 更新 writerIndex,為消息頭預留 16 個字節的空間 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { // 對心跳響應結果進行序列化,已廢棄 encodeHeartbeatData(channel, out, res.getResult()); } else { // 對調用結果進行序列化 encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { // 對錯誤信息進行序列化 out.writeUTF(res.getErrorMessage()) }; out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 獲取寫入的字節數,也就是消息體長度 int len = bos.writtenBytes(); checkPayload(channel, len); // 將消息體長度寫入到消息頭中 Bytes.int2bytes(len, header, 12); // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備 buffer.writerIndex(savedWriteIndex); // 從 savedWriteIndex 下標處寫入消息頭 buffer.writeBytes(header); // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // 異常處理邏輯不是很難理解,但是代碼略多,這里忽略了 } } } public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; // 檢測當前協議版本是否支持帶有 attachment 集合的 Response 對象 boolean attach = Version.isSupportResponseAttachment(version); Throwable th = result.getException(); // 異常信息為空 if (th == null) { Object ret = result.getValue(); // 調用結果為空 if (ret == null) { // 序列化響應類型 out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } // 調用結果非空 else { // 序列化響應類型 out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); // 序列化調用結果 out.writeObject(ret); } } // 異常信息非空 else { // 序列化響應類型 out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); // 序列化異常對象 out.writeObject(th); } if (attach) { // 記錄 Dubbo 協議版本 result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); // 序列化 attachments 集合 out.writeObject(result.getAttachments()); } } }
以上就是 Response 對象編碼的過程,和前面分析的 Request 對象編碼過程很相似。如果大家能看 Request 對象的編碼邏輯,那么這里的 Response 對象的編碼邏輯也不難理解,就不多說了。接下來我們再來分析雙向通信的最后一環 —— 服務消費方接收調用結果。
消費方接收調用結果
服務消費方在收到響應數據后,首先要做的事情是對響應數據進行解碼,得到 Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是 NettyHandler。接下來 NettyHandler 會將這個對象繼續向下傳遞,最后 AllChannelHandler 的 received 方法會收到這個對象,并將這個對象派發到線程池中。這個過程和服務提供方接收請求的過程是一樣的,因此這里就不重復分析了。
(1)響應數據解碼
響應數據解碼邏輯主要的邏輯封裝在 DubboCodec 中,我們直接分析這個類的代碼。如下:
public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 獲取請求編號 long id = Bytes.bytes2long(header, 4); // 檢測消息類型,若下面的條件成立,表明消息類型為 Response if ((flag & FLAG_REQUEST) == 0) { // 創建 Response 對象 Response res = new Response(id); // 檢測事件標志位 if ((flag & FLAG_EVENT) != 0) { // 設置心跳事件 res.setEvent(Response.HEARTBEAT_EVENT); } // 獲取響應狀態 byte status = header[3]; // 設置響應狀態 res.setStatus(status); // 如果響應狀態為 OK,表明調用過程正常 if (status == Response.OK) { try { Object data; if (res.isHeartbeat()) { // 反序列化心跳數據,已廢棄 data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) { // 反序列化事件數據 data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcResult result; // 根據 url 參數決定是否在 IO 線程上執行解碼邏輯 if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { // 創建 DecodeableRpcResult 對象 result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); // 進行后續的解碼工作 result.decode(); } else { // 創建 DecodeableRpcResult 對象 result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } // 設置 DecodeableRpcResult 對象到 Response 對象中 res.setResult(data); } catch (Throwable t) { // 解碼過程中出現了錯誤,此時設置 CLIENT_ERROR 狀態碼到 Response 對象中 res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } // 響應狀態非 OK,表明調用過程出現了異常 else { // 反序列化異常信息,并設置到 Response 對象中 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else { // 對請求數據進行解碼,前面已分析過,此處忽略 } } }
以上就是響應數據的解碼過程,上面邏輯看起來是不是似曾相識。對的,我們在前面章節分析過 DubboCodec 的 decodeBody 方法中關于請求數據的解碼過程,該過程和響應數據的解碼過程很相似。下面,我們繼續分析調用結果的反序列化過程
public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable { private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class); private Channel channel; private byte serializationType; private InputStream inputStream; private Response response; private Invocation invocation; private volatile boolean hasDecoded; public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) { Assert.notNull(channel, "channel == null"); Assert.notNull(response, "response == null"); Assert.notNull(is, "inputStream == null"); this.channel = channel; this.response = response; this.inputStream = is; this.invocation = invocation; this.serializationType = id; } @Override public void encode(Channel channel, OutputStream output, Object message) throws IOException { throw new UnsupportedOperationException(); } @Override public Object decode(Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); // 反序列化響應類型 byte flag = in.readByte(); switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE: break; case DubboCodec.RESPONSE_VALUE: handleValue(in); break; case DubboCodec.RESPONSE_WITH_EXCEPTION: handleException(in); break; // 返回值為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: handleAttachment(in); break; //返回值不為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: handleValue(in); handleAttachment(in); break; // 異常對象不為空,且攜帶了 attachments 集合 case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: handleException(in); handleAttachment(in); break; default: throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag); } if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this; }
正常調用下,線程會進入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后線程會從 invocation 變量(大家探索一下 invocation 變量的由來)中獲取返回值類型,接著對調用結果進行反序列化,并將序列化后的結果存儲起來。最后對 attachments 集合進行反序列化,并存到指定字段中。
異步轉同步
Dubbo發送數據至服務方后,在通信層面是異步的,通信線程并不會等待結果數據返回。而我們在使用Dubbo進行RPC調用缺省就是同步的,這其中就涉及到了異步轉同步的操作。
而在2.7.x版本中,這種自實現的異步轉同步操作進行了修改。新的`DefaultFuture`繼承了`CompletableFuture`,新的`doReceived(Response res)`方法如下:
private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
通過`CompletableFuture#complete`方法來設置異步的返回結果,且刪除舊的`get()`方法,使用`CompletableFuture#get()`方法:
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); }
使用`CompletableFuture`完成了異步轉同步的操作。
異步多線程數據一致
這里簡單說明一下。一般情況下,服務消費方會并發調用多個服務,每個用戶線程發送請求后,會調用 get 方法進行等待。 一段時間后,服務消費方的線程池會收到多個響應對象。這個時候要考慮一個問題,如何將每個響應對象傳遞給相應的 Future 對象,不出錯。答案是通過調用**編號**。Future 被創建時,會要求傳入一個 Request 對象。此時 DefaultFuture 可從 Request 對象中獲取調用編號,并將 <調用編號, DefaultFuture 對象> 映射關系存入到靜態 Map 中,即 FUTURES。線程池中的線程在收到 Response 對象后,會根據 Response 對象中的調用編號到 FUTURES 集合中取出相應的 DefaultFuture 對象,然后再將 Response 對象設置到 DefaultFuture 對象中。這樣用戶線程即可從 DefaultFuture 對象中獲取調用結果了。整個過程大致如下圖:
private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); }
心跳檢查
Dubbo采用雙向心跳的方式檢測Client端與Server端的連通性。
我們再來看看 Dubbo 是如何設計應用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會給服務端發送心跳,反之,服務端也會向客戶端發送心跳。
創建定時器
public class HeaderExchangeClient implements ExchangeClient { private final Client client; private final ExchangeChannel channel; private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL); private HeartbeatTimerTask heartBeatTimerTask; private ReconnectTimerTask reconnectTimerTask; public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client); if (startTimer) { URL url = client.getUrl(); //開啟心跳失敗之后處理重連,斷連的邏輯定時任務 startReconnectTask(url); //開啟發送心跳請求定時任務 startHeartBeatTask(url); } }
Dubbo 在 `HeaderExchangeClient `初始化時開啟了兩個定時任務
`startReconnectTask` 主要用于定時發送心跳請求
`startHeartBeatTask` 主要用于心跳失敗之后處理重連,斷連的邏輯
發送心跳請求
詳細解析下心跳檢測定時任務的邏輯 `HeartbeatTimerTask#doTask`:
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); } }
前面已經介紹過,**Dubbo 采取的是雙向心跳設計**,即服務端會向客戶端發送心跳,客戶端也會向服務端發送心跳,接收的一方更新 lastRead 字段,發送的一方更新 lastWrite 字段,超過心跳間隙的時間,便發送心跳請求給對端。這里的 lastRead/lastWrite 同樣會被同一個通道上的普通調用更新,通過更新這兩個字段,實現了只在連接空閑時才會真正發送空閑報文的機制,符合我們一開始科普的做法。
處理重連和斷連
繼續研究下重連和斷連定時器都實現了什么 `ReconnectTimerTask#doTask`。
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long now = now(); if (!channel.isConnected()) { ((Client) channel).reconnect(); // check pong at client } else if (lastRead != null && now - lastRead > idleTimeout) { ((Client) channel).reconnect(); } }
第二個定時器則負責根據客戶端、服務端類型來對連接做不同的處理,當超過設置的心跳總時間之后,客戶端選擇的是重新連接,服務端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調用是強依賴可用連接的,而服務端可以等待客戶端重新建立連接。
Dubbo 對于建立的每一個連接,同時在客戶端和服務端開啟了 2 個定時器,一個用于定時發送心跳,一個用于定時重連、斷連,執行的頻率均為各自檢測周期的 1/3。定時發送心跳的任務負責在連接空閑時,向對端發送心跳包。定時重連、斷連的任務負責檢測 lastRead 是否在超時周期內仍未被更新,如果判定為超時,客戶端處理的邏輯是重連,服務端則采取斷連的措施。