1. Gateway–服务网关 1.1 网关简介 没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用
这样的架构,会存在着诸多的问题:
客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性
认证复杂,每个服务都需要独立认证
存在跨域请求,在一定场景下处理相对复杂
上面的这些问题可以借助API网关
来解决
所谓的API网关,就是指系统的统一入口
,它封装了应用程序的内部结构,为客户端提供统一服务
一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等
在业界比较流行的网关,有下面这些:
在业界比较流行的网关,有下面这些:
Ngnix+lua
使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用
lua是一种脚本语言,可以来编写一些简单的逻辑, nginx支持lua脚本
Kong
基于Nginx+Lua开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)可以开箱即用
问题
: 只支持Http协议;二次开发,自由扩展困难;提供管理API,缺乏更易用的管控、配置方式
Zuul
Netflix开源的网关,功能丰富,使用JAVA开发,易于二次开发
问题
:缺乏管控,无法动态配 置;依赖组件较多;处理Http请求依赖的是Web容器,性能不如Nginx
Spring Cloud Gateway
Spring公司为了替换Zuul而开发的网关服务,将在下面具体介绍
注意
:SpringCloud alibaba技术栈中并没有提供自己的网关,我们可以采用Spring Cloud Gateway 来做网关
1.2 Gateway简介 Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式
它的目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能
例如:安全,监控和限流
优点:
缺点:
其实现依赖Netty与WebFlux,不是传统的Servlet编程模型,学习成本高
不能将其部署在Tomcat、Jetty等Servlet容器里,只能打成jar包执行
需要Spring Boot 2.0及以上的版本,才支持
1.3 Gateway快速入门 基本步骤:
创建模块,导入依赖
创建主类
添加配置文件
启动项目,用网关去访问
创建api-gateway
模块,导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <parent > <artifactId > shop-parent</artifactId > <groupId > cn.jyw</groupId > <version > 1.0-SNAPSHOT</version > </parent > <modelVersion > 4.0.0</modelVersion > <artifactId > api-gateway</artifactId > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > </dependencies > </project >
创建主类
1 2 3 4 5 6 @SpringBootApplication public class GatewayApplication { public static void main (String[] args) { SpringApplication.run(GatewayApplication.class,args); } }
添加配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server: port: 7000 spring: application: name: api-gateway cloud: gateway: routes: - id: product_route uri: http://localhost:8081 order: 1 predicates: - Path=/product-serv/** filters: - StripPrefix=1
启动项目, 并通过网关去访问微服务
localhost:7000/product-serv/product/1
1.4 Gateway整合Nacos 步骤:
引入依赖
主类上添加注解@EnableDiscoveryClient
修改配置文件
引入依赖
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
添加注解
1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class GatewayApplication { public static void main (String[] args) { SpringApplication.run(GatewayApplication.class,args); } }
修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 server: port: 7000 spring: application: name: api-gateway cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 gateway: discovery: locator: enabled: true routes: - id: product_route uri: lb://service-product order: 1 predicates: - Path=/product-serv/** filters: - StripPrefix=1
测试
localhost:7000/product-serv/product/1
还有一种简化版
(但不推荐)
不写路由配置 (全部默认)
1 2 3 4 5 6 7 8 9 10 11 12 13 server: port: 7000 spring: application: name: api-gateway cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 gateway: discovery: locator: enabled: true
localhost:7000/service-product/product/1
这时候,就发现只要按照网关地址/微服务/接口 的格式去访问,就可以得到成功响应
1.5 Gateway核心架构
基本概念
路由(Route) 是 gateway 中最基本的组件之一,表示一个具体的路由信息载体
主要定义了下面的几个信息:
id ,路由标识符,区别于其他 Route
uri ,路由指向的目的地 uri,即客户端请求最终被转发到的微服务
order ,用于多个 Route 之间的排序,数值越小排序越靠前,匹配优先级越高
predicate ,断言的作用是进行条件判断,只有断言都返回真,才会真正的执行路由
filter ,过滤器用于修改请求和响应信息
执行流程
执行流程大体如下:
Gateway Client
向Gateway Server
发送请求
请求首先会被HttpWebHandlerAdapter
进行提取组装成网关上下文
然后网关的上下文会传递到DispatcherHandler
,它负责将请求分发给 RoutePredicateHandlerMapping
RoutePredicateHandlerMapping
负责路由查找,并根据路由断言判断路由是否可用
如果过断言成功,由FilteringWebHandler
创建过滤器链并调用
请求会一次经过PreFilter
–微服务 –PostFilter
的方法,最终返回响应
1.6 断言 Predicate(断言, 谓词) 用于进行条件判断,只有断言都返回真,才会真正的执行路由
断言就是说: 在什么条件下 才能进行路由转发
1.6.1 内置路由断言工厂 SpringCloud Gateway包括许多内置的断言工厂,所有这些断言都与HTTP请求的不同属性匹配。具体如下:
基于Datetime类型的断言工厂
此类型的断言根据时间做判断,主要有三个:
AfterRoutePredicateFactory: 接收一个日期参数,判断请求日期是否晚于指定日期 BeforeRoutePredicateFactory: 接收一个日期参数,判断请求日期是否早于指定日期 BetweenRoutePredicateFactory: 接收两个日期参数,判断请求日期是否在指定时间段内
-After=2019-12-31T23:59:59.789+08:00[Asia/Shanghai]
基于远程地址的断言工厂
RemoteAddrRoutePredicateFactory:接收一个IP地址段,判断请求主 机地址是否在地址段中
-RemoteAddr=192.168.1.1/24
基于Cookie的断言工厂
CookieRoutePredicateFactory:接收两个参数,cookie 名字和一个正则表达式
判断请求 cookie是否具有给定名称且值与正则表达式匹配
-Cookie=chocolate, ch.
基于Header的断言工厂
HeaderRoutePredicateFactory:接收两个参数,标题名称和正则表达式
判断请求Header是否 具有给定名称且值与正则表达式匹配
-Header=X-Request-Id, \d+
基于Host的断言工厂 HostRoutePredicateFactory:接收一个参数,主机名模式
判断请求的Host是否满足匹配规则
-Host=**.testhost.org
基于Method请求方法的断言工厂
MethodRoutePredicateFactory:接收一个参数,判断请求类型是否跟指定的类型匹配
-Method=GET
基于Path请求路径的断言工厂
PathRoutePredicateFactory:接收一个参数,判断请求的URI部分是否满足路径规则
-Path=/foo/{segment}
基于Query请求参数的断言工厂
QueryRoutePredicateFactory :接收两个参数,请求param和正则表达式, 判断请求参数是否具有给定名称且值与正则表达式匹配
-Query=baz, ba.
基于路由权重的断言工厂
WeightRoutePredicateFactory:接收一个[组名,权重], 然后对于同一个组内的路由按照权重转发
routes:
-id: weight_route1 uri: host1 predicates:
-Path=/product/* *
-Weight=group3, 1
-id: weight_route2 uri: host2 predicates:
-Path=/product/* *
-Weight= group3, 9
路径相同 按权重1:9发送
内置路由断言工厂的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 server: port: 7000 spring: application: name: api-gateway cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 gateway: discovery: locator: enabled: true routes: - id: product_route uri: lb://service-product order: 1 predicates: - Path=/product-serv/** - Before=2019-11-28T00:00:00.000+08:00 - Method=POST filters: - StripPrefix=1 - id: order_route uri: lb://service-order order: 1 predicates: - Path=/order-serv/** filters: - StripPrefix=1
1.6.2 自定义断言工厂 我们来设定一个场景: 假设我们的应用仅仅让age在(min,max)之间的人来访问
第1步:在配置文件中,添加一个Age的断言配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 server: port: 7000 spring: application: name: api-gateway cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 gateway: discovery: locator: enabled: true routes: - id: product_route uri: lb://service-product order: 1 predicates: - Path=/product-serv/** - Age=18,60 filters: - StripPrefix=1
第2步:自定义一个断言工厂, 实现断言方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Component public class AgeRoutePredicateFactory extends AbstractRoutePredicateFactory <AgeRoutePredicateFactory.Config> { public AgeRoutePredicateFactory () { super (AgeRoutePredicateFactory.Config.class); } public List<String> shortcutFieldOrder () { return Arrays.asList("minAge" , "maxAge" ); } public Predicate<ServerWebExchange> apply (AgeRoutePredicateFactory.Config config) { return new Predicate <ServerWebExchange>() { @Override public boolean test (ServerWebExchange serverWebExchange) { String ageStr = serverWebExchange.getRequest().getQueryParams().getFirst("age" ); if (StringUtils.isNotEmpty(ageStr)) { int age = Integer.parseInt(ageStr); if (age < config.getMaxAge() && age > config.getMinAge()) { return true ; } else { return false ; } } return false ; } }; } @Data @NoArgsConstructor public static class Config { private int minAge; private int maxAge; } }
1.7 过滤器
作用
过滤器就是在请求的传递过程中,对请求和响应做一些手脚
生命周期
在Gateway中, Filter的生命周期只有两个:“pre” 和 “post”
分类
Gateway 的Filter从作用范围可分为两种: GatewayFilter(局部过滤器 )与GlobalFilter(全局过滤器 )
GatewayFilter:应用到单个路由或者一个分组的路由上
GlobalFilter:应用到所有的路由上。
1.7.1 局部过滤器
内置局部过滤器
过滤器工厂
作用
参数
AddRequestHeader
为原始请求添加Header
Header的名称及值
AddRequestParameter
为原始请求添加请求参数
参数名称及值
AddResponseHeader
为原始响应添加Header
Header的名称及值
DedupeResponseHeader
剔除响应头中重复的值
需要去重的Header名称及去重策略
Hystrix
为路由引入Hystrix的断路器保护
HystrixCommand的名称
FallbackHeaders
为fallbackUri的请求头中添加具体的异常信息
Header的名称
PrefixPath
为原始请求路径添加前缀
前缀路径
PreserveHostHeader
为请求添加一个 preserveHostHeader=true的属性,路由过滤器会检查该属性以决定是否要发送原始的Host
无
RequestRateLimiter
用于对请求限流,限流算法为令牌桶
keyResolver rateLimiter statusCode denyEmptyKey emptyKeyStatus
RedirectTo
将原始请求重定向到指定的URL
http状态码及重定向的 url
RemoveHopByHopHeadersFilter
为原始请求删除IETF组织规定的一系列Header
默认就会启用,可以通过配置指定仅删除哪些 Header
RemoveRequestHeader
为原始请求删除某个Header
Header名称
RemoveResponseHeader
为原始响应删除某个Header
Header名称
RewritePath
重写原始的请求路径
原始路径正则表达式以及重写后路径的正则表达式
RewriteResponseHeader
重写原始响应中的某个Header
Header名称,值的正则表达式,重写后的值
SaveSession
在转发请求之前,强制执行 WebSession::save操作
无
secureHeaders
为原始响应添加一系列起安全作用的响应头
无,支持修改这些安全响应头的值
SetPath
修改原始的请求路径
修改后的路径
SetResponseHeader
修改原始响应中某个Header的值
Header名称,修改后的值
SetStatus
修改原始响应的状态码
HTTP 状态码,可以是数字,也可以是字符串
StripPrefix
用于截断原始请求的路径
使用数字表示要截断的路径的数量
Retry
针对不同的响应进行重试
retries、statuses、 methods、series
RequestSize
设置允许接收最大请求包的大小。如果请求包大小超过设置的值,则返回 413 Payload Too Large
请求包大小,单位为字节,默认值为5M
ModifyRequestBody
在转发请求之前修改原始请求体内容
修改后的请求体内容
ModifyResponseBody
修改原始响应体的内容
修改后的响应体内容
1.7.2 自定义局部过滤器
第1步:在配置文件中,添加一个Log的过滤器配置
1 2 3 filters: - StripPrefix=1 - Log=true,false
第2步:自定义一个过滤器工厂,实现方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Component public class LogGatewayFilterFactory extends AbstractGatewayFilterFactory <LogGatewayFilterFactory.Config> { public LogGatewayFilterFactory () { super (LogGatewayFilterFactory.Config.class); } @Override public List<String> shortcutFieldOrder () { return Arrays.asList("consoleLog" , "cacheLog" ); } @Override public GatewayFilter apply (LogGatewayFilterFactory.Config config) { return new GatewayFilter () { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { if (config.isCacheLog()) { System.out.println("cacheLog已经开启了...." ); } if (config.isConsoleLog()) { System.out.println("consoleLog已经开启了...." ); } return chain.filter(exchange); } }; } @Data @NoArgsConstructor public static class Config { private boolean consoleLog; private boolean cacheLog; } }
1.7.3 全局过滤器 开发中的鉴权逻辑:
当客户端第一次请求服务时,服务端对用户进行信息认证(登录)
认证通过,将用户信息进行加密形成token,返回给客户端,作为登录凭证
以后每次请求,客户端都携带认证的token
服务端对token进行解密,判断是否有效
下面的我们自定义一个GlobalFilter,去校验所有请求的请求参数中是否包含“token”
如果不包含请求参数“token”则不转发路由,否则执行正常的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class AuthGlobalFilter implements GlobalFilter , Ordered { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { String token = exchange.getRequest().getQueryParams().getFirst("token" ); if (StringUtils.isBlank(token)) { System.out.println("鉴权失败" ); exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } @Override public int getOrder () { return 0 ; } }
1.8 网关限流 Sentinel支持对SpringCloud Gateway、Zuul等主流网关进行限流
从1.6.0版本开始,Sentinel提供了SpringCloud Gateway的适配模块,可以提供两种资源维度的限流:
route维度:即在Spring配置文件中配置的路由条目,资源名为对应的routeId
自定义API维度:用户可以利用Sentinel提供的API来自定义一些API分组
导入依赖
1 2 3 4 <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-spring-cloud-gateway-adapter</artifactId > </dependency >
编写配置类
基于Sentinel 的Gateway限流是通过其提供的Filter来完成的
使用时只需注入对应的 SentinelGatewayFilter
实例以及 SentinelGatewayBlockExceptionHandler
实例即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Configuration public class GatewayConfiguration { private final List<ViewResolver> viewResolvers; private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration (ObjectProvider<List<ViewResolver>> viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer) { this .viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList); this .serverCodecConfigurer = serverCodecConfigurer; } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public GlobalFilter sentinelGatewayFilter () { return new SentinelGatewayFilter (); } @PostConstruct public void initGatewayRules () { Set<GatewayFlowRule> rules = new HashSet <>(); rules.add( new GatewayFlowRule ("product_route" ) .setCount(1 ) .setIntervalSec(1 ) ); GatewayRuleManager.loadRules(rules); } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler () { return new SentinelGatewayBlockExceptionHandler (viewResolvers, serverCodecConfigurer); } @PostConstruct public void initBlockHandlers () { BlockRequestHandler blockRequestHandler = new BlockRequestHandler () { @Override public Mono<ServerResponse> handleRequest (ServerWebExchange serverWebExchange, Throwable throwable) { Map map = new HashMap <>(); map.put("code" , 0 ); map.put("message" , "接口被限流了" ); return ServerResponse.status(HttpStatus.OK). contentType(MediaType.APPLICATION_JSON_UTF8). body(BodyInserters.fromObject(map)); } }; GatewayCallbackManager.setBlockHandler(blockRequestHandler); } }
自定义API分组
自定义API分组是一种更细粒度的限流规则定义
与上面不同在配置初始化的限流参数与自定义API分组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Configuration public class GatewayConfiguration { private final List<ViewResolver> viewResolvers; private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration (ObjectProvider<List<ViewResolver>> viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer) { this .viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList); this .serverCodecConfigurer = serverCodecConfigurer; } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public GlobalFilter sentinelGatewayFilter () { return new SentinelGatewayFilter (); } @PostConstruct public void initGatewayRules () { Set<GatewayFlowRule> rules = new HashSet <>(); rules.add(new GatewayFlowRule ("product_api1" ).setCount(1 ).setIntervalSec(1 )); rules.add(new GatewayFlowRule ("product_api2" ).setCount(1 ).setIntervalSec(1 )); GatewayRuleManager.loadRules(rules); } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler () { return new SentinelGatewayBlockExceptionHandler (viewResolvers, serverCodecConfigurer); } @PostConstruct public void initBlockHandlers () { BlockRequestHandler blockRequestHandler = new BlockRequestHandler () { public Mono<ServerResponse> handleRequest (ServerWebExchange serverWebExchange, Throwable throwable) { Map map = new HashMap <>(); map.put("code" , 0 ); map.put("message" , "接口被限流了" ); return ServerResponse.status(HttpStatus.OK). contentType(MediaType.APPLICATION_JSON_UTF8). body(BodyInserters.fromObject(map)); } }; GatewayCallbackManager.setBlockHandler(blockRequestHandler); } @PostConstruct private void initCustomizedApis () { Set<ApiDefinition> definitions = new HashSet <>(); ApiDefinition api1 = new ApiDefinition ("product_api1" ) .setPredicateItems(new HashSet <ApiPredicateItem>() {{ add(new ApiPathPredicateItem ().setPattern("/product-serv/product/api1/**" ). setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)); }}); ApiDefinition api2 = new ApiDefinition ("product_api2" ) .setPredicateItems(new HashSet <ApiPredicateItem>() {{ add(new ApiPathPredicateItem ().setPattern("/product-serv/product/api2/demo1" )); }}); definitions.add(api1); definitions.add(api2); GatewayApiDefinitionManager.loadApiDefinitions(definitions); } }
2. Sleuth–链路追踪 2.1 链路追踪介绍 在大型系统的微服务化构建中,一个系统被拆分成了许多模块
这些模块负责不同的功能,组合成系统,最终可以提供丰富的功能
在这种架构中,一次请求往往需要涉及到多个服务
互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心,也就意味着这种架构形式也会存在一些问题:
如何快速发现问题?
如何判断故障影响范围?
如何梳理服务依赖以及依赖的合理性?
如何分析链路性能问题以及实时容量规划?
分布式链路追踪 (Distributed Tracing),就是将一次分布式请求还原成调用链路,进行日志记录,性能监控并将一次分布式请求的调用情况集中展示
比如各个服务节点上的耗时、请求具体到达哪 台机器上、每个服务节点的请求状态等等
常见的链路追踪技术有下面这些:
cat
由大众点评开源,基于Java开发的实时应用监控平台,包括实时应用监控,业务监控
集成方案是通过代码埋点的方式来实现监控,比如: 拦截器,过滤器等
对代码的侵入性很大,集成成本较高。风险较大。
zipkin
由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现
该产品结合spring-cloud-sleuth 使用较为简单, 集成很方便, 但是功能较简单
pinpoint
Pinpoint是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具
特点是支持多种插件,UI功能强大,接入端无代码侵入。
skywalking
SkyWalking是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具
特点是支持多种插件,UI功能较强,接入端无代码侵入。目前已加入Apache孵化器
Sleuth
SpringCloud 提供的分布式系统中链路追踪解决方案 但是没与UI
注意
:SpringCloud alibaba技术栈中并没有提供自己的链路追踪技术的,我们可以采用Sleuth + Zinkin来做链路追踪解决方案
2.2 Sleuth入门 2.2.1 Sleuth介绍 SpringCloud Sleuth主要功能就是在分布式系统中提供追踪解决方案
它大量借用了Google Dapper的设计, 先来了解一下Sleuth中的术语和相关概念
Trace
由一组Trace Id相同的Span串联形成一个树状结构
为了实现请求跟踪,当请求到达分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的标识(即TraceId),同时在分布式系 统内部流转的时候,框架始终保持传递该唯一值,直到整个请求的返回
那么我们就可以使用该唯一标识将所有的请求串联起来,形成一条完整的请求链路
Span
代表了一组基本的工作单元
为了统计各处理单元的延迟,当请求到达各个服务组件的时候,也通过一个唯一标识(SpanId)来标记它的开始、具体过程和结束
通过SpanId的开始和结束时间戳,就能统计该span的调用时间
除此之外,我们还可以获取如事件的名称、请求信息等元数据
Annotation
用它记录一段时间内的事件,内部使用的重要注释:
cs(Client Send)客户端发出请求,开始一个请求的生命
sr(Server Received)服务端接受到请求开始进行处理, sr-cs = 网络延迟(服务调用的时间)
ss(Server Send)服务端处理完毕准备发送到客户端,ss - sr = 服务器上的请求处理时间
cr(Client Reveived)客户端接受到服务端的响应,请求结束。 cr - sr = 请求的总时间
2.2.2 Sleuth入门
修改父工程引入Sleuth依赖
1 2 3 4 5 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-sleuth</artifactId > </dependency >
启动微服务,调用之后,我们可以在控制台观察到sleuth的日志输出
微服务名称, traceId, spanid,是否将链路的追踪结果输出到第三方平台
[api-gateway,3977125f73391553,3977125f73391553,false]
[service-order,3977125f73391553,57547b5bf71f8242,false]
[service-product,3977125f73391553,449f5b3f3ef8d5c5,false]
其中 3977125f73391553 是TraceId, 57547b5bf71f8242 是SpanId,依次调用有一个全局的 TraceId,将调用链路串起来
仔细分析每个微服务的日志,不难看出请求的具体过程
2.3 Zipkin的集成 2.3.1 ZipKin介绍 Zipkin 是 Twitter 的一个开源项目,它基于Google Dapper实现,它致力于收集服务的定时数据, 以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现
我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源
除了面向开发的 API 接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,
比如:可以查询某段时间内各用户请求的处理时间等
Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch
上图展示了 Zipkin 的基础架构,它主要由 4 个核心组件构成:
Collector:
收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为 Zipkin内部处理的 Span 格式,以支持后续的存储、分析、展示等功能
Storage:
存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中
RESTful API:
API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等
Web UI:
UI 组件, 基于API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分析跟踪信息
Zipkin分为两端,一个是 Zipkin服务端,一个是 Zipkin客户端,客户端也就是微服务的应用
客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端
2.3.2 ZipKin服务端安装
第1步: 下载ZipKin的jar包
1 https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec
访问上面的网址,即可得到一个jar包,这就是ZipKin服务端的jar包
第2步: 通过命令行,输入下面的命令启动ZipKin Server
1 java -jar zipkin-server-2 .12.9 -exec .jar
第3步:通过浏览器访问 http://localhost:9411访问
2.3.3 Zipkin客户端集成 基本步骤:
导入依赖
添加配置
访问微服务
在需要使用的微服务上导入依赖
1 2 3 4 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-zipkin</artifactId > </dependency >
添加配置
1 2 3 4 5 6 7 spring: zipkin: base-url: http://127.0.0.1:9411/ discovery-client-enabled: false sleuth: sampler: probability: 1.0
2.4 ZipKin数据持久化 Zipkin Server默认会将追踪数据信息保存到内存,但这种方式不适合生产环境
Zipkin支持将追踪 数据持久化到mysql
数据库或elasticsearch
中
2.4.1 使用mysql实现数据持久化
第1步: 创建mysql数据环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 CREATE TABLE IF NOT EXISTS zipkin_spans ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL, `id` BIGINT NOT NULL, `name` VARCHAR(255) NOT NULL, `parent_id` BIGINT, `debug` BIT(1), `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL', `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query' ) ENGINE = InnoDB ROW_FORMAT = COMPRESSED CHARACTER SET = utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_spans ADD UNIQUE KEY (`trace_id_high`, `trace_id`, `id`) COMMENT 'ignore insert on duplicate'; ALTER TABLE zipkin_spans ADD INDEX (`trace_id_high`, `trace_id`, `id`) COMMENT 'for joining with zipkin_annotations'; ALTER TABLE zipkin_spans ADD INDEX (`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds'; ALTER TABLE zipkin_spans ADD INDEX (`name`) COMMENT 'for getTraces and getSpanNames'; ALTER TABLE zipkin_spans ADD INDEX (`start_ts`) COMMENT 'for getTraces ordering and range'; CREATE TABLE IF NOT EXISTS zipkin_annotations ( `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit', `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id', `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id', `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1', `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB', `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation', `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp', `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address', `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null', `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null' ) ENGINE = InnoDB ROW_FORMAT = COMPRESSED CHARACTER SET = utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_annotations ADD UNIQUE KEY (`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate'; ALTER TABLE zipkin_annotations ADD INDEX (`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans'; ALTER TABLE zipkin_annotations ADD INDEX (`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds'; ALTER TABLE zipkin_annotations ADD INDEX (`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames'; ALTER TABLE zipkin_annotations ADD INDEX (`a_type`) COMMENT 'for getTraces'; ALTER TABLE zipkin_annotations ADD INDEX (`a_key`) COMMENT 'for getTraces'; ALTER TABLE zipkin_annotations ADD INDEX (`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job'; CREATE TABLE IF NOT EXISTS zipkin_dependencies ( `day` DATE NOT NULL, `parent` VARCHAR(255) NOT NULL, `child` VARCHAR(255) NOT NULL, `call_count` BIGINT ) ENGINE = InnoDB ROW_FORMAT = COMPRESSED CHARACTER SET = utf8 COLLATE utf8_general_ci; ALTER TABLE zipkin_dependencies ADD UNIQUE KEY (`day`, `parent`, `child`);
第2步: 在启动ZipKin Server的时候,指定数据保存的mysql的信息
1 java -jar zipkin-server-2 .12.9 -exec .jar --STORAGE_TYPE =mysql --MYSQL_HOST =127.0 .0.1 --MYSQL_TCP_PORT =3306 --MYSQL_DB =zipkin --MYSQL_USER =root --MYSQL_PASS =root
2.4.2 使用elasticsearch实现数据持久化
第1步: 下载elasticsearch
下载地址:https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-8-4
第2步: 启动elasticsearch
elasticsearch.bat文件
第3步: 在启动ZipKin Server的时候,指定数据保存的elasticsearch的信息
1 java -jar zipkin-server-2 .12.9 -exec .jar --STORAGE_TYPE =elasticsearch --ES-HOST =localhost:9200
3. RocketMQ–消息驱动 3.1 MQ简介 MQ(Message Queue)是一种跨进程的通信机制,用于传递消息
通俗点说,就是一个先进先出的数据结构
异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦
主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列
同时,由于使用了消息列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合
流量削峰
流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增
秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况
为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ
秒杀处理流程如下所述:
用户发起海量秒杀请求到秒杀业务处理系统
秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ
下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户
用户收到秒杀成功的通知。
3.2 常见的MQ产品 目前业界有很多MQ产品,比较出名的有下面这些:
ZeroMQ
号称最快的消息队列系统
,尤其针对大吞吐量的需求场景
扩展性好,开发比较灵活,采用C语言实现
,实际上只是一个socket库的重新封装
如果做为消息队列使用,需要开发大量的代码
ZeroMQ仅提供非持久性的队列
,也就是说如果down机,数据将会丢失
RabbitMQ
使用erlang语言
开发,性能较好,适合于企业级的开发
但是不利于做二次开发和维护
ActiveMQ
历史悠久的Apache开源项目
已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,支持持久化到数据库
对队列数较多的情况支持不好
RocketMQ
阿里巴巴的MQ中间件,由java语言
开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单
Kafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,现在主要用于大数据
相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布 式系统
3.3 环境搭建
第一步: 下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
Linux 64位操作系统
64bit JDK 1.8+
第二步: 安装RocketMQ
上传文件到Linux系统
1 2 [root @jyw rocketmq ] rocketmq-all-4 .4.0 -bin-release .zip
解压到安装目录
1 2 [root @jyw src ] [root @jyw src ]
第三步:启动RocketMQ
修改启动内存
启动NameServer
1 2 3 4 5 6 [root @jyw rocketmq ] [1 ] 1467 [root @jyw rocketmq ] [root @jyw rocketmq ]
启动Broker
1 2 [root @jyw rocketmq ] [root @jyw rocketmq ]
第四步: 测试RocketMQ
测试消息发送
1 2 [root @jyw rocketmq ] [root @jyw rocketmq ]
测试消息接收
1 2 [root @jyw rocketmq ] [root @jyw rocketmq ]
第五步: 关闭RocketMQ
1 2 [root @jyw rocketmq ] [root @jyw rocketmq ]
3.4 RocketMQ的架构及概念
如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer
Broker (邮递员)
Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
NameServer (邮局)
消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
Producer (寄件人)
消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
Consumer (收件人)
消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
Topic (地区)
用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
Message Queue (邮件)
为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个 Message Queue读取消息
Message
Message 是消息的载体
Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组
Consumer Group
消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组
3.5 RocketMQ控制台安装
第一步: 下载工程
https://github.com/apache/rocketmq-externals/releases
第二步: 修改配置文件
1 2 3 server.port =7777 #项目启动后的端口号 rocketmq.config.namesrvAddr =192.168.114.130:9876 #nameserv的地址,注意防火墙要开启9876端口
第三步: 打成jar包,并启动
1 2 3 4 mvn clean package -Dmaven .test.skip=true java -jar target/rocketmq-console-ng-1 .0.0 .jar
第四步: 访问控制台
127.0.0.1:7777
3.6 消息发送和接收演示 1 2 3 4 5 6 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency >
3.6.1 发送消息 消息发送步骤:
创建消息生产者, 指定生产者所属的组名
指定Nameserver地址
启动生产者
创建消息对象,指定主题、标签和消息体
发送消息
关闭生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class RocketMQSendMessageTest { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer ("myproducer-group" ); producer.setNamesrvAddr("192.168.109.131:9876" ); producer.start(); Message message = new Message ("myTopic" , "myTag" , ("Test RocketMQ Message" ).getBytes()); SendResult result = producer.send(message, 10000 ); System.out.println(result); producer.shutdown(); } }
3.6.2 接收消息 消息接收步骤:
创建消息消费者, 指定消费者所属的组名
指定Nameserver地址
指定消费者订阅的主题和标签
设置回调函数,编写处理消息的方法
启动消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class RocketMQReceiveMessageTest { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("myconsumer-group" ); consumer.setNamesrvAddr("192.168.109.131:9876" ); consumer.subscribe("myTopic" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("Message===>" + list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("启动消费者成功了" ); } }
3.7 案例 3.7.1 订单微服务发送消息
在order里添加依赖
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.4.0</version > </dependency >
添加配置
1 2 3 4 rocketmq: name-server: 192.168 .114 .130 :9876 producer: group: shop-order
编写控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @GetMapping("/order/prod/{pid}") public Order order (@PathVariable("pid") Integer pid) { log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息" , pid); Product product = productService.findByPid(pid); if (product.getPid() == -1 ) { Order order = new Order (); order.setPname("下单失败" ); return order; } log.info("查询到{}号商品的信息,内容是:{}" , pid, JSON.toJSONString(product)); Order order = new Order (); order.setUid(1 ); order.setUsername("测试用户" ); order.setPid(pid); order.setPname(product.getPname()); order.setPprice(product.getPprice()); order.setNumber(1 ); orderService.save(order); log.info("创建订单成功,订单信息为{}" , JSON.toJSONString(order)); rocketMQTemplate.convertAndSend("order-topic" ,order); return order; }
3.7.2 用户微服务订阅消息
user模块依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <parent > <artifactId > shop-parent</artifactId > <groupId > cn.jyw</groupId > <version > 1.0-SNAPSHOT</version > </parent > <modelVersion > 4.0.0</modelVersion > <artifactId > shop-user</artifactId > <properties > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > </properties > <dependencies > <dependency > <groupId > cn.jyw</groupId > <artifactId > shop-common</artifactId > <version > 1.0-SNAPSHOT</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.0.2</version > </dependency > <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.4.0</version > </dependency > </dependencies > </project >
主类
1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class UserApplication { public static void main (String[] args) { SpringApplication.run(UserApplication.class); } }
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 server: port: 8071 spring: application: name: service-user datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql:///shop?serverTimezone=UTC&useUnicode=true&c3haracterEncoding=utf-8&useSSL=true username: root password: root jpa: properties: hibernate: hbm2ddl: auto: update dialect: org.hibernate.dialect.MySQL5InnoDBDialect cloud: nacos: discovery: server-addr: 127.0 .0 .1 :8848 rocketmq: name-server: 192.168 .114 .130 :9876
编写短信接收
1 2 3 4 5 6 7 8 9 @Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class SmsService implements RocketMQListener <Order> { @Override public void onMessage (Order order) { log.info("收到一个订单信息{},接下来发送短信" , JSON.toJSONString(order)); } }
3.8 发送不同类型的消息 3.8.1 普通消息 RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送
可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式
发送方通过回调接口接收服务器响应,并对响应结果进行处理
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等
单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @RunWith(SpringRunner.class) @SpringBootTest(classes = OrderApplication.class) public class MessageTypeTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void testSyncSend () { SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1" , "这是一条同步消息" ); System.out.println(sendResult); } @Test public void testAsyncSend () throws InterruptedException { rocketMQTemplate.asyncSend("test-topic-1" , "这是一条异步消息" , new SendCallback () { @Override public void onSuccess (SendResult sendResult) { System.out.println(sendResult); } @Override public void onException (Throwable throwable) { System.out.println(throwable); } }); Thread.sleep(30000000 ); } @Test public void testOneWay () { rocketMQTemplate.sendOneWay("test-topic-1" , "这是一条单向消息" ); } }
三种发送方式的对比
发送方式
发送TPS
发送结果反馈
可靠性
同步发送
快
有
不丢失
异步发送
快
有
不丢失
单向发送
最快
无
可能丢失
3.8.2 顺序消息 顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型
1 2 3 4 5 6 public void testSyncSendOrderly () { rocketMQTemplate.syncSendOrderly("test-topic-1" , "这是一条异步顺序消息" , "xxxx" ); }
3.8.3 事务消息 RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致
事务消息交互流程
两个概念:
半事务消息
:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端 但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态 处于该种状态下的消息即半事务消息
消息回查
:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失 RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时 需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查
事务消息发送步骤:
发送方将半事务消息发送至RocketMQ服务端
RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事 务消息
发送方开始执行本地事务逻辑
发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback), 服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息; 服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息
事务消息回查步骤:
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
代码实现
日志实体类 TxLogDao ServiceImpl ServiceImplListener Controller 1 2 3 4 5 6 7 8 @Entity(name = "shop_txlog") @Data public class TxLog { @Id private String txId; private Date date; }
1 2 public interface TxLogDao extends JpaRepository <TxLog, String> {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Service public class OrderServiceImpl { @Autowired private OrderDao orderDao; @Autowired private TxLogDao txLogDao; @Autowired private RocketMQTemplate rocketMQTemplate; public void createOrderBefore (Order order) { String txId = UUID.randomUUID().toString(); rocketMQTemplate.sendMessageInTransaction( "tx_producer_group" , "tx_topic" , MessageBuilder.withPayload(order).setHeader("txId" , txId).build(), order ); } @Transactional public void createOrder (String txId, Order order) { orderDao.save(order); TxLog txLog = new TxLog (); txLog.setTxId(txId); txLog.setDate(new Date ()); txLogDao.save(txLog); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class OrderServiceImplListener implements RocketMQLocalTransactionListener { @Autowired private OrderServiceImpl OrderServiceImpl; @Autowired private TxLogDao txLogDao; @Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) { String txId = (String) msg.getHeaders().get("txId" ); try { Order order = (Order) arg; OrderServiceImpl.createOrder(txId,order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) { String txId = (String) msg.getHeaders().get("txId" ); TxLog txLog = txLogDao.findById(txId).get(); if (txLog != null ){ return RocketMQLocalTransactionState.COMMIT; }else { return RocketMQLocalTransactionState.ROLLBACK; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @RestController @Slf4j public class OrderController { @Autowired private RestTemplate restTemplate; @Autowired private OrderServiceImpl orderService; @Autowired private ProductService productService; @GetMapping("/order/prod/{pid}") public Order order (@PathVariable("pid") Integer pid) { log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息" , pid); Product product = productService.findByPid(pid); if (product.getPid() == -100 ) { Order order = new Order (); order.setOid(-100L ); order.setPname("下单失败" ); return order; } log.info("查询到{}号商品的信息,内容是:{}" , pid, JSON.toJSONString(product)); Order order = new Order (); order.setUid(1 ); order.setUsername("测试用户" ); order.setPid(pid); order.setPname(product.getPname()); order.setPprice(product.getPprice()); order.setNumber(1 ); orderService.createOrderBefore(order); log.info("创建订单成功,订单信息为{}" , JSON.toJSONString(order)); return order; } }
3.9 RocketMQ支持两种消息模式 RocketMQ支持两种消息模式:
广播消费 : 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理
集群消费 : 一条消息只能被一个消费者实例消费
1 2 3 4 5 6 @RocketMQMessageListener( consumerGroup = "shop-user", //消费者组名 topic = "order-topic",//消费主题 consumeMode = ConsumeMode.CONCURRENTLY,//消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序) messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(广播) CLUSTERING(集群,默认) )