更新時間:2022-09-19 來源:黑馬程序員 瀏覽量:
概述
dubbo是一個簡單易用的RPC框架,通過簡單的提供者,消費者配置就能完成無感的網絡調用。那么在dubbo中是如何將提供者的服務暴露出去,消費者又是如何獲取到提供者相關信息的呢?這就是本章我們要討論的內容。
dubbo與spring的整合
在了解dubbo的服務注冊和服務發現之前,我們首先需要掌握一個知識點:Spring中自定義Schema。
Spring自定義Schema
Dubbo 現在的設計是完全無侵入,也就是使用者只依賴于配置契約。在 Dubbo 中,可以使用 XML 配置相關信息,也可以用來引入服務或者導出服務。配置完成,啟動工程,Spring 會讀取配置文件,生成注入相關Bean。那 Dubbo 如何實現自定義 XML 被 Spring 加載讀取呢?
從 Spring 2.0 開始,Spring 開始提供了一種基于 XML Schema 格式擴展機制,用于定義和配置 bean。
入門案例
學習和使用Spring XML Schema 擴展機制并不難,需要下面幾個步驟:
1. 創建配置屬性的JavaBean對象
2. 創建一個 XML Schema 文件,描述自定義的合法構建模塊,也就是xsd文件。
3. 自定義處理器類,并實現`NamespaceHandler`接口。
4. 自定義解析器,實現`BeanDefinitionParser`接口(最關鍵的部分)。
5. 編寫Spring.handlers和Spring.schemas文件配置所有部件
定義JavaBean對象,在spring中此對象會根據配置自動創建
public class User { private String id; private String name; private Integer age; //省略getter setter方法 }
在META-INF下定義`user.xsd`文件,使用xsd用于描述標簽的規則
<?xml version="1.0" encoding="UTF-8"?> <xsd:schema xmlns="http://www.itheima.com/schema/user" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans" targetNamespace="http://www.itheima.com/schema/user" elementFormDefault="qualified" attributeFormDefault="unqualified"> <xsd:import namespace="http://www.springframework.org/schema/beans" /> <xsd:element name="user"> <xsd:complexType> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:attribute name="name" type="xsd:string" /> <xsd:attribute name="age" type="xsd:int" /> </xsd:extension> </xsd:complexContent> </xsd:complexType> </xsd:element> </xsd:schema>
Spring讀取xml文件時,會根據標簽的命名空間找到其對應的NamespaceHandler,我們在NamespaceHandler內會注冊標簽對應的解析器BeanDefinitionParser。
package com.itheima.schema; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class UserNamespaceHandler extends NamespaceHandlerSupport { public void init() { registerBeanDefinitionParser("user", new UserBeanDefinitionParser()); } }
BeanDefinitionParser是標簽對應的解析器,Spring讀取到對應標簽時會使用該類進行解析;
public class UserBeanDefinitionParser extends AbstractSingleBeanDefinitionParser { protected Class getBeanClass(Element element) { return User.class; } protected void doParse(Element element, BeanDefinitionBuilder bean) { String name = element.getAttribute("name"); String age = element.getAttribute("age"); String id = element.getAttribute("id"); if (StringUtils.hasText(id)) { bean.addPropertyValue("id", id); } if (StringUtils.hasText(name)) { bean.addPropertyValue("name", name); } if (StringUtils.hasText(age)) { bean.addPropertyValue("age", Integer.valueOf(age)); } } }
定義spring.handlers文件,內部保存命名空間與NamespaceHandler類的對應關系;必須放在classpath下的META-INF文件夾中。
```proprties
http\://www.itheima.com/schema/user=com.itheima.schema.UserNamespaceHandler
```
定義spring.schemas文件,內部保存命名空間對應的xsd文件位置;必須放在classpath下的META-INF文件夾中。
http\://www.itheima.com/schema/user.xsd=META-INF/user.xsd
代碼準備好了之后,就可以在spring工程中進行使用和測試,定義spring配置文件,導入對應約束。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:itheima="http://www.itheima.com/schema/user" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.itheima.com/schema/user http://www.itheima.com/schema/user.xsd"> <itheima:user id="user" name="zhangsan" age="12"></itheima:user> </beans>
編寫測試類,通過spring容器獲取對象user
public class SchemaDemo { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring/applicationContext.xml"); User user = (User)ctx.getBean("user"); System.out.println(user); } }
dubbo中的相關對象
Dubbo是運行在spring容器中,dubbo的配置文件也是通過spring的配置文件applicationContext.xml來加載,所以dubbo的自定義配置標簽實現,其實同樣依賴spring的xml schema機制
可以看出Dubbo所有的組件都是由`DubboBeanDefinitionParser`解析,并通過registerBeanDefinitionParser方法來注冊到spring中最后解析對應的對象。這些對象中我們重點關注的有以下兩個:
ServiceBean:服務提供者暴露服務的核心對象
ReferenceBean:服務消費者發現服務的核心對象
RegistryConfig:定義注冊中心的核心配置對象
服務暴露
前面主要探討了 Dubbo 中 schema 、 XML 的相關原理 , 這些內容對理解框架整體至關重要 , 在此基礎上我們繼續探討服務是如何依靠前面的配置進行服務暴露。
名詞解釋
在 Dubbo 的核心領域模型中:
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 調用,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現。在服務提供方,Invoker用于調用服務提供類。在服務消費方,Invoker用于執行遠程調用。
- Protocol 是服務域,它是 Invoker 暴露和引用的主功能入口,它負責 Invoker 的生命周期管理。
- export:暴露遠程服務
- refer:引用遠程服務
- proxyFactory:獲取一個接口的代理類
- getInvoker:針對server端,將服務對象,如DemoServiceImpl包裝成一個Invoker對象
- getProxy:針對client端,創建接口的代理對象,例如DemoService的接口。
- Invocation 是會話域,它持有調用過程中的變量,比如方法名,參數等
整體流程
在詳細探討服務暴露細節之前 , 我們先看一下整體duubo的服務暴露原理
在整體上看,Dubbo 框架做服務暴露分為兩大部分 , 第一步將持有的服務實例通過代理轉換成 Invoker, 第二步會把 Invoker 通過具體的協議 ( 比如 Dubbo ) 轉換成 Exporter, 框架做了這層抽象也大大方便了功能擴展 。
服務提供方暴露服務的藍色初始化鏈,時序圖如下:
源碼分析
(1) 導出入口
服務導出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個事件響應方法,該方法會在收到 Spring 上下文刷新事件后執行服務導出操作。方法代碼如下:
public void onApplicationEvent(ContextRefreshedEvent event) { // 是否有延遲導出 && 是否已導出 && 是不是已被取消導出 if (isDelay() && !isExported() && !isUnexported()) { // 導出服務 export(); } }
onApplicationEvent 方法在經過一些判斷后,會決定是否調用 export 方法導出服務。在export 根據配置執行相應的動作。最終進入到doExportUrls導出服務方法。
private void doExportUrls() { // 加載注冊中心鏈接 List<URL> registryURLs = loadRegistries(true); // 遍歷 protocols,并在每個協議下導出服務 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
關于多協議多注冊中心導出服務首先是根據配置,以及其他一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 如果協議名為空,或空串,則將協議名變量設置為 dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); //略 // 獲取上下文路徑 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // 獲取 host 和 port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 組裝 URL URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // 省略無關代碼 }
上面的代碼首先是將一些信息,比如版本、時間戳、方法名以及各種配置對象的字段信息放入到 map 中,最后將 map 和主機名等數據傳給 URL 構造方法創建 URL 對象。前置工作做完,接下來就可以進行服務導出了。服務導出分為導出到本地 (JVM),和導出到遠程。在深入分析服務導出的源碼前,我們先來從宏觀層面上看一下服務導出邏輯。如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 省略無關代碼 String scope = url.getParameter(Constants.SCOPE_KEY); // 如果 scope = none,則什么都不做 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // scope != remote,導出到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // scope != local,導出到遠程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { //省略無關代碼 // 為服務提供類(ref)生成 Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 導出服務,并生成 Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } // 不存在注冊中心,僅導出服務 } else { //略 } } } this.urls.add(url); }
上面代碼根據 url 中的 scope 參數決定服務導出方式,分別如下:
- scope = none,不導出服務
- scope != remote,導出到本地
- scope != local,導出到遠程
不管是導出到本地,還是遠程。進行服務導出之前,均需要先創建 Invoker,這是一個很重要的步驟。因此下面先來分析 Invoker 的創建過程。Invoker 是由 ProxyFactory 創建而來,Dubbo 默認的 ProxyFactory 實現類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創建過程。如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 為目標類創建 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創建匿名 Invoker 類對象,并實現 doInvoke 方法。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
如上,JavassistProxyFactory 創建了一個繼承自 AbstractProxyInvoker 類的匿名對象,并覆寫了抽象方法 doInvoke。
(2) 導出服務到本地
Invoke創建成功之后,接下來我們來看本地導出
private void exportLocal(URL url) { // 如果 URL 的協議頭等于 injvm,說明已經導出到本地了,無需再次導出 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) // 設置協議頭為 injvm .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 創建 Invoker,并導出服務,這里的 protocol 會在運行時調用 InjvmProtocol 的 export 方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); } }
exportLocal 方法比較簡單,首先根據 URL 協議頭決定是否導出服務。若需導出,則創建一個新的 URL 并將協議頭、主機名以及端口設置成新的值。然后創建 Invoker,并調用 InjvmProtocol 的 export 方法導出服務。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 創建 InjvmExporter return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
如上,InjvmProtocol 的 export 方法僅創建了一個 InjvmExporter,無其他邏輯。到此導出服務到本地就分析完了。
(3) 導出服務到遠程
接下來,我們繼續分析導出服務到遠程的過程。導出服務到遠程包含了服務導出與服務注冊兩個過程。先來分析服務導出邏輯。我們把目光移動到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 導出服務 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 獲取注冊中心 URL URL registryUrl = getRegistryUrl(originInvoker); // 根據 URL 加載 Registry 實現類,比如 ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 獲取已注冊的服務提供者 URL,比如: final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 獲取 register 參數 boolean register = registeredProviderUrl.getParameter("register", true); // 向服務提供者與消費者注冊表中注冊服務提供者 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 根據 register 的值決定是否注冊服務 if (register) { // 向注冊中心注冊服務 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 獲取訂閱 URL,比如: final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // 創建監聽器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向注冊中心進行訂閱 override 數據 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 創建并返回 DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
上面代碼看起來比較復雜,主要做如下一些操作:
1. 調用 doLocalExport 導出服務
2. 向注冊中心注冊服務
3. 向注冊中心進行訂閱 override 數據
4. 創建并返回 DestroyableExporter
下面先來分析 doLocalExport 方法的邏輯,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); // 訪問緩存 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // 創建 Invoker 為委托類對象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 調用 protocol 的 export 方法導出服務 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // 寫緩存 bounds.put(key, exporter); } } } return exporter; }
接下來,我們把重點放在 Protocol 的 export 方法上。假設運行時協議為 dubbo,此處的 protocol 變量會在運行時加載 DubboProtocol,并調用 DubboProtocol 的 export 方法。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // 獲取服務標識,理解成服務坐標也行。由服務組名,服務名,服務版本號以及端口組成。比如: // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); // 創建 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 將 <key, exporter> 鍵值對放入緩存中 exporterMap.put(key, exporter); //省略無關代碼 // 啟動服務器 openServer(url); // 優化序列化 optimizeSerialization(url); return exporter; }
(4) 開啟Netty服務
如上,我們重點關注 DubboExporter 的創建以及 openServer 方法,其他邏輯看不懂也沒關系,不影響理解服務導出過程。下面分析 openServer 方法。
private void openServer(URL url) { // 獲取 host:port,并將其作為服務器實例的 key,用于標識當前的服務器實例 String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { // 訪問緩存 ExchangeServer server = serverMap.get(key); if (server == null) { // 創建服務器實例 serverMap.put(key, createServer(url)); } else { // 服務器已創建,則根據 url 中的配置重置服務器 server.reset(url); } } }
接下來分析服務器實例的創建過程。如下:
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, // 添加心跳檢測配置到 url 中 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 獲取 server 參數,默認為 netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加編碼解碼器參數 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 創建 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server..."); } // 獲取 client 參數,可指定 netty,mina str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 獲取所有的 Transporter 實現類名稱集合,比如 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 檢測當前 Dubbo 所支持的 Transporter 實現類名稱列表中, // 是否包含 client 所表示的 Transporter,若不包含,則拋出異常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type..."); } } return server; }
如上,createServer 包含三個核心的邏輯。第一是檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常。第二是創建服務器實例。第三是檢測是否支持 client 參數所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應的代碼比較直白了,無需多說。但創建服務器的操作目前還不是很清晰,我們繼續往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger,默認為 HeaderExchanger。 // 緊接著調用 HeaderExchanger 的 bind 方法創建 ExchangeServer 實例 return getExchanger(url).bind(url, handler); }
上面代碼比較簡單,就不多說了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 創建 HeaderExchangeServer 實例,該方法包含了多個邏輯,分別如下: // 1. new HeaderExchangeHandler(handler) // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handlers 元素數量大于1,則創建 ChannelHandler 分發器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取自適應 Transporter 實例,并調用實例方法 return getTransporter().bind(url, handler); }
如上,getTransporter() 方法獲取的 Transporter 是在運行時動態創建的,類名為 TransporterAdaptive,也就是自適應拓展類。TransporterAdaptive 會在運行時根據傳入的 URL 參數決定加載什么類型的 Transporter,默認為 NettyTransporter。調用`NettyTransporter.bind(URL, ChannelHandler)`方法。創建一個`NettyServer`實例。調用`NettyServer.doOPen()`方法,服務器被開啟,服務也被暴露出來了。
(5) 服務注冊
本節內容以 Zookeeper 注冊中心作為分析目標,其他類型注冊中心大家可自行分析。下面從服務注冊的入口方法開始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // ${導出服務} // 省略其他代碼 boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 注冊服務 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 訂閱 override 數據 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 省略部分代碼 }
RegistryProtocol 的 export 方法包含了服務導出,注冊,以及數據訂閱等邏輯。其中服務導出邏輯上一節已經分析過了,本節將分析服務注冊邏輯,相關代碼如下:
public void register(URL registryUrl, URL registedProviderUrl) { // 獲取 Registry Registry registry = registryFactory.getRegistry(registryUrl); // 注冊服務 registry.register(registedProviderUrl); }
register 方法包含兩步操作,第一步是獲取注冊中心實例,第二步是向注冊中心注冊服務。接下來分兩節內容對這兩步操作進行分析。
這里以 Zookeeper 注冊中心為例進行分析。下面先來看一下 getRegistry 方法的源碼,這個方法由 AbstractRegistryFactory 實現。如下:
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); LOCK.lock(); try { // 訪問緩存 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 緩存未命中,創建 Registry 實例 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry..."); } // 寫入緩存 REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先訪問緩存,緩存未命中則調用 createRegistry 創建 Registry。在此方法中就是通過`new ZookeeperRegistry(url, zookeeperTransporter)`實例化一個注冊中心
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 獲取組名,默認為 dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { // group = "/" + group group = Constants.PATH_SEPARATOR + group; } this.root = group; // 創建 Zookeeper 客戶端,默認為 CuratorZookeeperTransporter zkClient = zookeeperTransporter.connect(url); // 添加狀態監聽器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
在上面的代碼代碼中,我們重點關注 ZookeeperTransporter 的 connect 方法調用,這個方法用于創建 Zookeeper 客戶端。創建好 Zookeeper 客戶端,意味著注冊中心的創建過程就結束了。接下來,再來分析一下 Zookeeper 客戶端的創建過程。
public ZookeeperClient connect(URL url) { // 創建 CuratorZookeeperClient return new CuratorZookeeperClient(url); }
繼續向下看。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { private final CuratorFramework client; public CuratorZookeeperClient(URL url) { super(url); try { // 創建 CuratorFramework 構造器 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 構建 CuratorFramework 實例 client = builder.build(); //省略無關代碼 // 啟動客戶端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
CuratorZookeeperClient 構造方法主要用于創建和啟動 CuratorFramework 實例。至此Zookeeper客戶端就已經啟動了
下面我們將 Dubbo 的 demo 跑起來,然后通過 Zookeeper 可視化客戶端 [ZooInspector](https://github.com/apache/zookeeper/tree/b79af153d0f98a4f3f3516910ed47234d7b3d74e/src/contrib/zooinspector) 查看節點數據。如下:
![img](http://dubbo.apache.org/docs/zh-cn/source_code_guide/sources/images/service-registry.png)
從上圖中可以看到DemoService 這個服務對應的配置信息最終被注冊到了zookeeper節點下。搞懂了服務注冊的本質,那么接下來我們就可以去閱讀服務注冊的代碼了。
protected void doRegister(URL url) { try { // 通過 Zookeeper 客戶端創建節點,節點路徑由 toUrlPath 方法生成,路徑格式如下: // /${group}/${serviceInterface}/providers/${url} // 比如 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1...... zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register..."); } }
如上,ZookeeperRegistry 在 doRegister 中調用了 Zookeeper 客戶端創建服務節點。節點路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:
public void create(String path, boolean ephemeral) { if (!ephemeral) { // 如果要創建的節點類型非臨時節點,那么這里要檢測節點是否存在 if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { // 遞歸創建上一級路徑 create(path.substring(0, i), false); } // 根據 ephemeral 的值創建臨時或持久節點 if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
好了,到此關于服務注冊的過程就分析完了。整個過程可簡單總結為:先創建注冊中心實例,之后再通過注冊中心實例注冊服務。
總結
1. 在有注冊中心,需要注冊提供者地址的情況下,ServiceConfig 解析出的 URL 格式為:`registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服務名}/{版本號}")`
2. 基于 Dubbo SPI 的自適應機制,通過 URL `registry://` 協議頭識別,就調用 RegistryProtocol#export() 方法
1. 將具體的服務類名,比如 `DubboServiceRegistryImpl`,通過 ProxyFactory 包裝成 Invoker 實例
2. 調用 doLocalExport 方法,使用 DubboProtocol 將 Invoker 轉化為 Exporter 實例,并打開 Netty 服務端監聽客戶請求
3. 創建 Registry 實例,連接 Zookeeper,并在服務節點下寫入提供者的 URL 地址,注冊服務
4. 向注冊中心訂閱 override 數據,并返回一個 Exporter 實例
3. 根據 URL 格式中的 `"dubbo://service-host/{服務名}/{版本號}"`中協議頭 `dubbo://` 識別,調用 DubboProtocol#export() 方法,開發服務端口
4. RegistryProtocol#export() 返回的 Exporter 實例存放到 ServiceConfig 的 `List exporters` 中
服務發現
在學習了服務暴露原理之后 , 接下來重點探討服務是如何消費的 。 這里主要講解如何通過注冊中心進行服務發現進行遠程服務調用等細節 。
服務發現流程
在詳細探討服務暴露細節之前 , 我們先看一下整體duubo的服務消費原理
在整體上看 , Dubbo 框架做服務消費也分為兩大部分 , 第一步通過持有遠程服務實例生成Invoker, 這個 Invoker 在客戶端是核心的遠程代理對象 。 第二步會把 Invoker 通過動態代理轉換成實現用戶接口的動態代理引用 。
服務消費方引用服務的藍色初始化鏈,時序圖如下:
源碼分析
(1) 引用入口
服務引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。
public Object getObject() throws Exception { return get(); } public synchronized T get() { // 檢測 ref 是否為空,為空則通過 init 方法創建 if (ref == null) { // init 方法主要用于處理配置,以及調用 createProxy 生成代理類 init(); } return ref; }
Dubbo 提供了豐富的配置,用于調整和優化框架行為,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置的正確性。
private void init() { // 創建代理類 ref = createProxy(map);
此方法代碼很長,主要完成的配置加載,檢查,以及創建引用的代理對象。這里要從 createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創建代理對象的。但實際上并非如此,該方法還會調用其他方法構建以及合并 Invoker 實例。具體細節如下。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); ........... isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl) // 本地引用略 if (isJvmRefer) { } else { // 點對點調用略 if (url != null && url.length() > 0) { } else { // 加載注冊中心 url List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 參數到 url 中,并將 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } } // 單個注冊中心或服務提供者(服務直連,下同) if (urls.size() == 1) { // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多個注冊中心或多個服務提供者,或者兩者混合 } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 獲取所有的 Invoker for (URL url : urls) { // 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 // 根據 url 協議頭加載指定的 Protocol 實例,并調用實例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 如果注冊中心鏈接不為空,則將使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 創建 StaticDirectory 實例,并由 Cluster 對多個 Invoker 進行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } //省略無關代碼 // 生成代理類 return (T) proxyFactory.getProxy(invoker); }
上面代碼很多,不過邏輯比較清晰。
1、如果是本地調用,直接jvm 協議從內存中獲取實例
2、如果只有一個注冊中心,直接通過 Protocol 自適應拓展類構建 Invoker 實例接口
3、如果有多個注冊中心,此時先根據 url 構建 Invoker。然后再通過 Cluster 合并多個 Invoker,最后調用 ProxyFactory 生成代理類。
(2) 創建客戶端
在服務消費方,Invoker 用于執行遠程調用。Invoker 是由 Protocol 實現類構建而來。Protocol 實現類有很多,這里分析DubboProtocol。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // 創建 DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起來比較簡單,創建一個DubboInvoker。通過構造方法傳入遠程調用的client對象。默認情況下,Dubbo 使用 NettyClient 進行通信。接下來,我們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) { // 是否共享連接 boolean service_share_connect = false; // 獲取連接數,默認為0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果未配置 connections,則共享連接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 獲取共享客戶端 clients[i] = getSharedClient(url); } else { // 初始化新的客戶端 clients[i] = initClient(url); } } return clients; }
這里根據 connections 數量決定是獲取共享客戶端還是創建新的客戶端實例,getSharedClient 方法中也會調用 initClient 方法,因此下面我們一起看一下這個方法。
private ExchangeClient initClient(URL url) { // 獲取客戶端類型,默認為 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); //省略無關代碼 ExchangeClient client; try { // 獲取 lazy 配置,并根據配置值決定創建的客戶端類型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 創建懶加載 ExchangeClient 實例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 創建普通 ExchangeClient 實例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先獲取用戶配置的客戶端類型,默認為 netty。下面我們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 獲取 Exchanger 實例,默認為 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 這里包含了多個調用,分別如下: // 1. 創建 HeaderExchangeHandler 對象 // 2. 創建 DecodeHandler 對象 // 3. 通過 Transporters 構建 Client 實例 // 4. 創建 HeaderExchangeClient 對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
這里的調用比較多,我們這里重點看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handler 數量大于1,則創建一個 ChannelHandler 分發器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取 Transporter 自適應拓展類,并調用 connect 方法生成 Client 實例 return getTransporter().connect(url, handler); }
如上,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未配置客戶端類型,則默認加載 NettyTransporter,并調用該類的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 創建 NettyClient 對象 return new NettyClient(url, listener); }
(3) 注冊
這里就已經創建好了NettyClient對象。關于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續分析 RegistryProtocol 的 refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 取 registry 參數值,并將其設置為協議頭 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 獲取注冊中心實例 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 將 url 查詢字符串轉為 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 獲取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通過 SPI 加載 MergeableCluster 實例,并調用 doRefer 繼續執行服務引用邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } // 調用 doRefer 繼續執行服務引用邏輯 return doRefer(cluster, registry, type, url); }
上面代碼首先為 url 設置協議頭,然后根據 url 參數加載注冊中心實例。然后獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。這里的重點是 doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 創建 RegistryDirectory 實例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 設置注冊中心和協議 directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 生成服務消費者鏈接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注冊服務消費者,在 consumers 目錄下新節點 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 訂閱 providers、configurators、routers 等節點數據 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合并為一個 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法創建一個 RegistryDirectory 實例,然后生成服務者消費者鏈接,并向注冊中心進行注冊。注冊完畢后,緊接著訂閱 providers、configurators、routers 等節點下的數據。完成訂閱后,RegistryDirectory 會收到這幾個節點下的子節點信息。由于一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就需要 Cluster 將多個服務節點合并為一個,并生成一個 Invoker。
(4)創建代理對象
Invoker 創建完畢后,接下來要做的事情是為服務接口生成代理對象。有了代理對象,即可進行遠程調用。代理對象生成的入口方法為 ProxyFactory 的 getProxy,接下來進行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 調用重載方法 return getProxy(invoker, false); } public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; // 獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; // 設置服務接口類和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加載接口類 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } // 為 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; // 創建新的 interfaces 數組 interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 設置 GenericService.class 到數組中 interfaces[len] = GenericService.class; } // 調用重載方法 return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代碼都是用來獲取 interfaces 數組的,我們繼續往下看。getProxy(Invoker, Class[]) 這個方法是一個抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實現代碼。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。并調用 Proxy 子類的 newInstance 方法創建 Proxy 實例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。下面以 org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到這里代理類生成邏輯就分析完了。整個過程比較復雜,大家需要耐心看一下。
總結
1. 從注冊中心發現引用服務:在有注冊中心,通過注冊中心發現提供者地址的情況下,ReferenceConfig 解析出的 URL 格式為:`registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")`。
2. 通過 URL 的registry://協議頭識別,就會調用RegistryProtocol#refer()方法
(1). 查詢提供者 URL,如 `dubbo://service-host/com.foo.FooService?version=1.0.0` ,來獲取注冊中心
(2). 創建一個 RegistryDirectory 實例并設置注冊中心和協議
(3). 生成 conusmer 連接,在 consumer 目錄下創建節點,向注冊中心注冊
(4). 注冊完畢后,訂閱 providers,configurators,routers 等節點的數據
(5). 通過 URL 的 `dubbo://` 協議頭識別,調用 `DubboProtocol#refer()` 方法,創建一個 ExchangeClient 客戶端并返回 DubboInvoker 實例
3. 由于一個服務可能會部署在多臺服務器上,這樣就會在 providers 產生多個節點,這樣也就會得到多個 DubboInvoker 實例,就需要 RegistryProtocol 調用 Cluster 將多個服務提供者節點偽裝成一個節點,并返回一個 Invoker
4. Invoker 創建完畢后,調用 ProxyFactory 為服務接口生成代理對象,返回提供者引用