soul模塊,Soul 網關源碼閱讀(六)Sofa請求處理概覽

 2023-10-12 阅读 28 评论 0

摘要:Soul 網關源碼閱讀(六)Sofa請求處理概覽 簡介 ????今天來探索一下Sofa請求處理流程,看看和前面的HTTP、Dubbo有什么異同 Sofa示例運行 PS:如果請求加上參數運行不成功,請更新最新版本,此問題在新版本中已經修復:https://blo

Soul 網關源碼閱讀(六)Sofa請求處理概覽


簡介

????今天來探索一下Sofa請求處理流程,看看和前面的HTTP、Dubbo有什么異同

Sofa示例運行

PS:如果請求加上參數運行不成功,請更新最新版本,此問題在新版本中已經修復:https://blog.csdn.net/baidu_27627251/article/details/112726694
Add sofa param resolve service

????首先運行下官方的Sofa示例,首先啟動下mysql和zookeeper,這里使用docker啟動:

docker run -dit --name zk -p 2181:2181 zookeepe
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest

????然后運行Soul-admin,Soul-Bootst,在管理圖界面起用sofa插件

soul模塊,????運行官方示例:soul-examples --> soul-examples-sofa

????這里有個坑,需要注意,啟動后,bootstrap打印日志中沒有sofa插件,請求一直失敗

o.d.s.w.configuration.SoulConfiguration  : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[alibaba-dubbo-body-param] [org.dromara.soul.plugin.alibaba.dubbo.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
```xml&ensp;&ensp;&ensp;&ensp;查看初始的plugins也是沒有sofa```javapublic SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);final List<SoulPlugin> soulPlugins = pluginList.stream().sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));return new SoulWebHandler(soulPlugins);}

????經過探索和老哥的討論,發現是每天起用sofa的相關依賴

????我們在Bootstrap的pom.xml文件中添加下面的依賴,然后重啟

        <!--        sofa plugin start--><dependency><groupId>com.alipay.sofa</groupId><artifactId>sofa-rpc-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-plugin-sofa</artifactId><version>${project.version}</version></dependency><!--        sofa plugin end-->

????然后查看日志打印,出現了sofa相關的插件

o.d.s.w.configuration.SoulConfiguration  : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[sofa-body-param] [org.dromara.soul.plugin.sofa.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
// 新出現的sofa相關的
o.d.s.w.configuration.SoulConfiguration  : load plugin:[sofa] [org.dromara.soul.plugin.sofa.SofaPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
o.d.s.w.configuration.SoulConfiguration  : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
// 新出現的sofa相關的
o.d.s.w.configuration.SoulConfiguration  : load plugin:[response] [org.dromara.soul.plugin.sofa.response.SofaResponsePlugin]

soul端口號、????日志中還打印了成功加載sofa相關的metadata

o.d.s.p.s.cache.ApplicationConfigCache   : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache   : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache   : init sofa reference success there meteData is :MetaData

????訪問鏈接: http://localhost:9195/sofa/findAll ,成功返回如下請求

{"code": 200,"message": "Access to success!","data": {"name": "hello world Soul Sofa , findAll","id": "998932133"}
}

源碼解析

PS:Debug時間過程,會導致超時,這是正常的

????首先找到我們非常熟悉的切入點函數: SoulWebHandler ,在下面的方法中打上斷點,然后逐步進入每個plugin觀察其行為

        public Mono<Void> execute(final ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < plugins.size()) {SoulPlugin plugin = plugins.get(this.index++);Boolean skip = plugin.skip(exchange);if (skip) {return this.execute(exchange);}return plugin.execute(exchange, this);}return Mono.empty();});}

GlobalPlugin

????進入其中,執行處理邏輯,通過上篇的分析,我們知道大致作用是將請求類型放入exchange中

SignPlugin/WafPlugin/RateLimiterPlugin/HystrixPlugin/Resilience4JPlugin

什么叫網關,????進入其中,但plugin沒有起用,不執行邏輯

DividePlugin/WebClientPlugin/WebSocketPlugin

????通過類型判斷,跳過,不執行

BodyParamPlugin

????這個plugin在dubbo的時候也是要執行,我們來看看它干了寫啥事。從下面邏輯中大概能看出先判斷是否符合執行條件,然后將請求地址替換成真實的后端地址

    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {final ServerHttpRequest request = exchange.getRequest();final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);// 判斷類型是不是sofaif (Objects.nonNull(soulContext) && RpcTypeEnum.SOFA.getName().equals(soulContext.getRpcType())) {MediaType mediaType = request.getHeaders().getContentType();ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {return body(exchange, serverRequest, chain);}if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {return formData(exchange, serverRequest, chain);}// 進行路徑替換,換成后端服務器的return query(exchange, serverRequest, chain);}return chain.execute(exchange);}private Mono<Void> query(final ServerWebExchange exchange, final ServerRequest serverRequest, final SoulPluginChain chain) {exchange.getAttributes().put(Constants.SOFA_PARAMS,HttpParamConverter.ofString(() -> serverRequest.uri().getQuery()));return chain.execute(exchange);}

????這里有個非常有趣的現象,我們第四篇分析中,dubbo也走了一模一樣的類,在上面函數邏輯中,我們看出它并不能兼容dubbo,那dubbo是如何走這個類的呢?

????通過調試我們發現,當同時啟動dubbo和sofa的時候,會生成兩個BodyParamPlugin,名稱是一模一樣的,但里面的判斷類型換了,很神奇,猜測這個類是動態生成之類的手段,這里先不探索了,可以后面研究研究

AlibabaDubboPlugin

soul暗語標簽、????通過類型判斷,跳過

SofaPlugin

????這個從名字就看出來是核心類,我們看看它具體干了啥。通過下面注釋的地方,可以看出和dubbo請求的非常的相像。進行路由匹配,成功后rpc調用,獲得結果后放入exchange中

    # AbstractSoulPluginpublic Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {String pluginName = named();final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);if (pluginData != null && pluginData.getEnabled()) {final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);if (CollectionUtils.isEmpty(selectors)) {return handleSelectorIsNull(pluginName, exchange, chain);}final SelectorData selectorData = matchSelector(exchange, selectors);if (Objects.isNull(selectorData)) {return handleSelectorIsNull(pluginName, exchange, chain);}selectorLog(selectorData, pluginName);final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());if (CollectionUtils.isEmpty(rules)) {return handleRuleIsNull(pluginName, exchange, chain);}// 判斷是否有路由規則能匹配上RuleData rule;if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {//get lastrule = rules.get(rules.size() - 1);} else {rule = matchRule(exchange, rules);}if (Objects.isNull(rule)) {return handleRuleIsNull(pluginName, exchange, chain);}ruleLog(rule, pluginName);// 匹配上后執行處理邏輯return doExecute(exchange, chain, selectorData, rule);}return chain.execute(exchange);}# SofaPluginprotected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {String body = exchange.getAttribute(Constants.SOFA_PARAMS);SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;MetaData metaData = exchange.getAttribute(Constants.META_DATA);if (!checkMetaData(metaData)) {assert metaData != null;log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.SOFA_HAVE_BODY_PARAM.getCode(), SoulResultEnum.SOFA_HAVE_BODY_PARAM.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 這里得到結果,跟下去final Mono<Object> result = sofaProxyService.genericInvoker(body, metaData, exchange);return result.then(chain.execute(exchange));}# SofaProxyServicepublic Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {// 根據請求路徑,獲得rpc中的consumer ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());reference = ApplicationConfigCache.getInstance().initRef(metaData);}GenericService genericService = reference.refer();Pair<String[], Object[]> pair;if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {pair = new ImmutablePair<>(new String[]{}, new Object[]{});} else {pair = sofaParamResolveService.buildParameter(body, metaData.getParameterTypes());}CompletableFuture<Object> future = new CompletableFuture<>();RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback<Object>() {@Overridepublic void onAppResponse(final Object o, final String s, final RequestBase requestBase) {future.complete(o);}@Overridepublic void onAppException(final Throwable throwable, final String s, final RequestBase requestBase) {future.completeExceptionally(throwable);}@Overridepublic void onSofaException(final SofaRpcException e, final String s, final RequestBase requestBase) {future.completeExceptionally(e);}});// 通過函數名,能猜到是rpc調用,然后得到結果,并將結果放入exchange中genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());return Mono.fromFuture(future.thenApply(ret -> {if (Objects.isNull(ret)) {ret = Constants.SOFA_RPC_RESULT_EMPTY;}exchange.getAttributes().put(Constants.SOFA_RPC_RESULT, ret);exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());return ret;})).onErrorMap(SoulException::new);}

MonitorPlugin

????不跳過,但插件沒有開啟

DubboResponsePlugin/WebClientResponsePlugin

????通過類型判斷,跳過執行

SofaResponsePlugin

????通過上幾篇分析和名字能猜出來是將響應返回給客戶端的,通過下面代碼的邏輯也可以看出

    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {return chain.execute(exchange).then(Mono.defer(() -> {// 從exchange中獲取結果final Object result = exchange.getAttribute(Constants.SOFA_RPC_RESULT);if (Objects.isNull(result)) {Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));// 熟悉的返回響應的函數return WebFluxResultUtils.result(exchange, success);}));}

總結

soul服務器端口登錄?????上面的plugin流程大致如下:

  • GlobalPlugin : 將請求類型置入
  • SignPlugin : 跳過不執行邏輯
  • WafPlugin : 跳過不執行邏輯
  • RateLimiterPlugin : 跳過不執行邏輯
  • HystrixPlugin : 跳過不執行邏輯
  • Resilience4JPlugin : 跳過不執行邏輯
  • DividePlugin : 跳過不執行邏輯
  • WebClientPlugin : 跳過不執行邏輯
  • WebSocketPlugin : 跳過不執行邏輯
  • BodyParamPlugin : 執行RPC的請求路徑替換,替換成真實的服務器后端路徑,作用類似于dividePlugin;不同rpc有相關的這個插件名,也就是會有多個BodyParamPlugin
  • AlibabaDubboPlugin : 跳過不執行邏輯
  • SofaPlugin : 發送請求到后臺服務器,拿到結果,寫入exchange
  • MonitorPlugin : 跳過不執行邏輯
  • DubboResponsePlugin : 跳過不執行邏輯
  • WebClientResponsePlugin : 跳過不執行邏輯
  • SofaResponsePlugin : 從exchange中拿到響應,發送給客戶端

????經過這幾篇的分析,我們進一步優化我們對Soul網關的請求流程,大致如下:

在這里插入圖片描述

????更新了我們對處理流程中一些類的認知:

  • 通過上篇分析,得到GlobalPlugin的具體作用,是置入請求類型
  • BodyParamPlugin 作用類似于 dividePlugin,能進行路由匹配,匹配后將路徑修改真實的后端服務器路徑;并且能動態的生成同名的但針對不同rpc實現的plugin

Soul網關源碼分析文章列表

Github

  • Soul 源碼閱讀(一) 概覽
  • Soul 源碼閱讀(二)代碼初步運行
  • Soul 源碼閱讀(三)HTTP請求處理概覽
  • Soul 網關源碼閱讀(四)Dubbo請求概覽
  • Soul網關源碼閱讀(五)請求類型探索
  • Soul 網關源碼閱讀(六)Sofa請求處理概覽

掘金

  • Soul 網關源碼閱讀(一) 概覽 #掘金文章# https://juejin.cn/post/6917864624423436296
  • Soul 網關源碼閱讀(二)代碼初步運行 #掘金文章# https://juejin.cn/post/6917865804121767944
  • Soul 網關源碼閱讀(三)請求處理概覽 #掘金文章# https://juejin.cn/post/6917866538712334343
  • Soul 網關源碼閱讀(四)Dubbo請求概覽 #掘金文章# https://juejin.cn/post/6917867369909977102
  • Soul網關源碼閱讀(五)請求類型探索 #掘金文章# https://juejin.cn/post/6918575905962983438

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/135566.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息