1. Gateway–服务网关

1.1 网关简介

没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用

这样的架构,会存在着诸多的问题:

  • 客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性
  • 认证复杂,每个服务都需要独立认证
  • 存在跨域请求,在一定场景下处理相对复杂

上面的这些问题可以借助API网关来解决

所谓的API网关,就是指系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一服务

一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等

在业界比较流行的网关,有下面这些:

SpringCloudalibaba加入网关后整体架构图

在业界比较流行的网关,有下面这些:

  • Ngnix+lua

    使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用

    lua是一种脚本语言,可以来编写一些简单的逻辑, nginx支持lua脚本

  • Kong

    基于Nginx+Lua开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)可以开箱即用

    问题: 只支持Http协议;二次开发,自由扩展困难;提供管理API,缺乏更易用的管控、配置方式

  • Zuul

    Netflix开源的网关,功能丰富,使用JAVA开发,易于二次开发

    问题:缺乏管控,无法动态配 置;依赖组件较多;处理Http请求依赖的是Web容器,性能不如Nginx

  • Spring Cloud Gateway

    Spring公司为了替换Zuul而开发的网关服务,将在下面具体介绍

注意:SpringCloud alibaba技术栈中并没有提供自己的网关,我们可以采用Spring Cloud Gateway 来做网关

1.2 Gateway简介

Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式

它的目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能

例如:安全,监控和限流

优点:

  • 性能强劲:是第一代网关Zuul的1.6倍

  • 功能强大:内置了很多实用的功能,例如转发、监控、限流等

  • 设计优雅,容易扩展

缺点:

  • 其实现依赖Netty与WebFlux,不是传统的Servlet编程模型,学习成本高

  • 不能将其部署在Tomcat、Jetty等Servlet容器里,只能打成jar包执行

  • 需要Spring Boot 2.0及以上的版本,才支持

1.3 Gateway快速入门

基本步骤:

  1. 创建模块,导入依赖
  2. 创建主类
  3. 添加配置文件
  4. 启动项目,用网关去访问

创建api-gateway模块,导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shop-parent</artifactId>
<groupId>cn.jyw</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>api-gateway</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
</dependencies>
</project>

创建主类

1
2
3
4
5
6
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class,args);
}
}

添加配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
gateway:
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: product_route # 当前路由的标识, 要求唯一
uri: http://localhost:8081 # 请求要转发到的地址
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,才进行路由转发
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径

启动项目, 并通过网关去访问微服务

localhost:7000/product-serv/product/1

1.4 Gateway整合Nacos

步骤:

  1. 引入依赖
  2. 主类上添加注解@EnableDiscoveryClient
  3. 修改配置文件

引入依赖

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

添加注解

1
2
3
4
5
6
7
@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class,args);
}
}

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true # 让gateway可以发现nacos中的微服务
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: product_route # 当前路由的标识, 要求唯一
uri: lb://service-product # lb指的是从nacos中按照名称获取微服务,并遵循负载均衡策略
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,才进行路由转发
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径

测试

localhost:7000/product-serv/product/1

还有一种简化版(但不推荐)

不写路由配置 (全部默认)

1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true # 让gateway可以发现nacos中的微服务

localhost:7000/service-product/product/1

这时候,就发现只要按照网关地址/微服务/接口的格式去访问,就可以得到成功响应

1.5 Gateway核心架构

基本概念

路由(Route) 是 gateway 中最基本的组件之一,表示一个具体的路由信息载体

主要定义了下面的几个信息:

  • id,路由标识符,区别于其他 Route
  • uri,路由指向的目的地 uri,即客户端请求最终被转发到的微服务
  • order,用于多个 Route 之间的排序,数值越小排序越靠前,匹配优先级越高
  • predicate,断言的作用是进行条件判断,只有断言都返回真,才会真正的执行路由
  • filter,过滤器用于修改请求和响应信息

执行流程

Gateway工作流程

执行流程大体如下:

  1. Gateway ClientGateway Server发送请求
  2. 请求首先会被HttpWebHandlerAdapter进行提取组装成网关上下文
  3. 然后网关的上下文会传递到DispatcherHandler,它负责将请求分发给 RoutePredicateHandlerMapping
  4. RoutePredicateHandlerMapping负责路由查找,并根据路由断言判断路由是否可用
  5. 如果过断言成功,由FilteringWebHandler创建过滤器链并调用
  6. 请求会一次经过PreFilter微服务PostFilter的方法,最终返回响应

1.6 断言

Predicate(断言, 谓词) 用于进行条件判断,只有断言都返回真,才会真正的执行路由

断言就是说: 在什么条件下 才能进行路由转发

1.6.1 内置路由断言工厂

SpringCloud Gateway包括许多内置的断言工厂,所有这些断言都与HTTP请求的不同属性匹配。具体如下:

  • 基于Datetime类型的断言工厂

    此类型的断言根据时间做判断,主要有三个:

    AfterRoutePredicateFactory: 接收一个日期参数,判断请求日期是否晚于指定日期 BeforeRoutePredicateFactory: 接收一个日期参数,判断请求日期是否早于指定日期 BetweenRoutePredicateFactory: 接收两个日期参数,判断请求日期是否在指定时间段内

    -After=2019-12-31T23:59:59.789+08:00[Asia/Shanghai]

  • 基于远程地址的断言工厂

    RemoteAddrRoutePredicateFactory:接收一个IP地址段,判断请求主 机地址是否在地址段中

    -RemoteAddr=192.168.1.1/24

  • 基于Cookie的断言工厂

    CookieRoutePredicateFactory:接收两个参数,cookie 名字和一个正则表达式

    判断请求 cookie是否具有给定名称且值与正则表达式匹配

    -Cookie=chocolate, ch.

  • 基于Header的断言工厂

    HeaderRoutePredicateFactory:接收两个参数,标题名称和正则表达式

    判断请求Header是否 具有给定名称且值与正则表达式匹配

    -Header=X-Request-Id, \d+

  • 基于Host的断言工厂 HostRoutePredicateFactory:接收一个参数,主机名模式

    判断请求的Host是否满足匹配规则

    -Host=**.testhost.org

  • 基于Method请求方法的断言工厂

    MethodRoutePredicateFactory:接收一个参数,判断请求类型是否跟指定的类型匹配

    -Method=GET

  • 基于Path请求路径的断言工厂

    PathRoutePredicateFactory:接收一个参数,判断请求的URI部分是否满足路径规则

    -Path=/foo/{segment}

  • 基于Query请求参数的断言工厂

    QueryRoutePredicateFactory :接收两个参数,请求param和正则表达式, 判断请求参数是否具有给定名称且值与正则表达式匹配

    -Query=baz, ba.

  • 基于路由权重的断言工厂

    WeightRoutePredicateFactory:接收一个[组名,权重], 然后对于同一个组内的路由按照权重转发

    routes:

    -id: weight_route1 uri: host1 predicates:

    -Path=/product/**

    -Weight=group3, 1

    -id: weight_route2 uri: host2 predicates:

    -Path=/product/**

    -Weight= group3, 9

路径相同 按权重1:9发送

内置路由断言工厂的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: product_route # 当前路由的标识, 要求唯一
uri: lb://service-product # 请求要转发到的地址
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,才进行路由转发
- Before=2019-11-28T00:00:00.000+08:00 #限制请求时间在2019-11-28之前
- Method=POST #限制请求方式为POST
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径
- id: order_route # 当前路由的标识, 要求唯一
uri: lb://service-order # 请求要转发到的地址
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/order-serv/** # 当请求路径满足Path指定的规则时,才进行路由转发
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径

1.6.2 自定义断言工厂

我们来设定一个场景: 假设我们的应用仅仅让age在(min,max)之间的人来访问

第1步:在配置文件中,添加一个Age的断言配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server:
port: 7000
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true
routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
- id: product_route # 当前路由的标识, 要求唯一
uri: lb://service-product # 请求要转发到的地址
order: 1 # 路由的优先级,数字越小级别越高
predicates: # 断言(就是路由转发要满足的条件)
- Path=/product-serv/** # 当请求路径满足Path指定的规则时,才进行路由转发
- Age=18,60 # 限制年龄只有在18到60岁之间的人能访问
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径

第2步:自定义一个断言工厂, 实现断言方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//这是一个自定义的路由断言工厂类,要求有两个
//1 名字必须是 配置+RoutePredicateFactory
//2 必须继承AbstractRoutePredicateFactory<配置类>
@Component
public class AgeRoutePredicateFactory extends AbstractRoutePredicateFactory<AgeRoutePredicateFactory.Config> {

//构造函数
public AgeRoutePredicateFactory() {
super(AgeRoutePredicateFactory.Config.class);
}

//读取配置文件的中参数值 给他赋值到配置类中的属性上
public List<String> shortcutFieldOrder() {
//这个位置的顺序必须跟配置文件中的值的顺序对应
return Arrays.asList("minAge", "maxAge");
}

//断言逻辑
public Predicate<ServerWebExchange> apply(AgeRoutePredicateFactory.Config config) {
return new Predicate<ServerWebExchange>() {
@Override
public boolean test(ServerWebExchange serverWebExchange) {
//1 接收前台传入的age参数
String ageStr = serverWebExchange.getRequest().getQueryParams().getFirst("age");

//2 先判断是否为空
if (StringUtils.isNotEmpty(ageStr)) {
//3 如果不为空,再进行路由逻辑判断
int age = Integer.parseInt(ageStr);
if (age < config.getMaxAge() && age > config.getMinAge()) {
return true;
} else {
return false;
}
}
return false;
}
};
}

//配置类,用于接收配置文件中的对应参数
@Data
@NoArgsConstructor
public static class Config {
private int minAge;//18
private int maxAge;//60
}
}

1.7 过滤器

作用

过滤器就是在请求的传递过程中,对请求和响应做一些手脚

生命周期

在Gateway中, Filter的生命周期只有两个:“pre” 和 “post”

  • PRE: 这种过滤器在请求被路由之前调用

    我们可利用这种过滤器实现身份验证、在集群中选择请求的微服务、记录调试信息等

  • POST:这种过滤器在路由到微服务以后执行

    这种过滤器可用来为响应添加标准的HTTP Header、收集统计信息和指标、将响应从微服务发送给客户端等

分类

Gateway 的Filter从作用范围可分为两种: GatewayFilter(局部过滤器)与GlobalFilter(全局过滤器)

  • GatewayFilter:应用到单个路由或者一个分组的路由上
  • GlobalFilter:应用到所有的路由上。

1.7.1 局部过滤器

内置局部过滤器

过滤器工厂 作用 参数
AddRequestHeader 为原始请求添加Header Header的名称及值
AddRequestParameter 为原始请求添加请求参数 参数名称及值
AddResponseHeader 为原始响应添加Header Header的名称及值
DedupeResponseHeader 剔除响应头中重复的值 需要去重的Header名称及去重策略
Hystrix 为路由引入Hystrix的断路器保护 HystrixCommand的名称
FallbackHeaders 为fallbackUri的请求头中添加具体的异常信息 Header的名称
PrefixPath 为原始请求路径添加前缀 前缀路径
PreserveHostHeader 为请求添加一个 preserveHostHeader=true的属性,路由过滤器会检查该属性以决定是否要发送原始的Host
RequestRateLimiter 用于对请求限流,限流算法为令牌桶 keyResolver
rateLimiter
statusCode
denyEmptyKey
emptyKeyStatus
RedirectTo 将原始请求重定向到指定的URL http状态码及重定向的 url
RemoveHopByHopHeadersFilter 为原始请求删除IETF组织规定的一系列Header 默认就会启用,可以通过配置指定仅删除哪些 Header
RemoveRequestHeader 为原始请求删除某个Header Header名称
RemoveResponseHeader 为原始响应删除某个Header Header名称
RewritePath 重写原始的请求路径 原始路径正则表达式以及重写后路径的正则表达式
RewriteResponseHeader 重写原始响应中的某个Header Header名称,值的正则表达式,重写后的值
SaveSession 在转发请求之前,强制执行 WebSession::save操作
secureHeaders 为原始响应添加一系列起安全作用的响应头 无,支持修改这些安全响应头的值
SetPath 修改原始的请求路径 修改后的路径
SetResponseHeader 修改原始响应中某个Header的值 Header名称,修改后的值
SetStatus 修改原始响应的状态码 HTTP 状态码,可以是数字,也可以是字符串
StripPrefix 用于截断原始请求的路径 使用数字表示要截断的路径的数量
Retry 针对不同的响应进行重试 retries、statuses、 methods、series
RequestSize 设置允许接收最大请求包的大小。如果请求包大小超过设置的值,则返回 413 Payload Too Large 请求包大小,单位为字节,默认值为5M
ModifyRequestBody 在转发请求之前修改原始请求体内容 修改后的请求体内容
ModifyResponseBody 修改原始响应体的内容 修改后的响应体内容

1.7.2 自定义局部过滤器

第1步:在配置文件中,添加一个Log的过滤器配置

1
2
3
filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
- StripPrefix=1 # 转发之前去掉1层路径
- Log=true,false # 控制日志是否开启

​ 第2步:自定义一个过滤器工厂,实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//自定义局部过滤器
@Component
public class LogGatewayFilterFactory
extends AbstractGatewayFilterFactory<LogGatewayFilterFactory.Config> {

//构造函数
public LogGatewayFilterFactory() {
super(LogGatewayFilterFactory.Config.class);
}

//读取配置文件中的参数 赋值到 配置类中
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("consoleLog", "cacheLog");
}

//过滤器逻辑
@Override
public GatewayFilter apply(LogGatewayFilterFactory.Config config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (config.isCacheLog()) {
System.out.println("cacheLog已经开启了....");
}
if (config.isConsoleLog()) {
System.out.println("consoleLog已经开启了....");
}

return chain.filter(exchange);
}
};
}

//配置类 接收配置参数
@Data
@NoArgsConstructor
public static class Config {
private boolean consoleLog;
private boolean cacheLog;
}
}

1.7.3 全局过滤器

开发中的鉴权逻辑:

  • 当客户端第一次请求服务时,服务端对用户进行信息认证(登录)
  • 认证通过,将用户信息进行加密形成token,返回给客户端,作为登录凭证
  • 以后每次请求,客户端都携带认证的token
  • 服务端对token进行解密,判断是否有效

下面的我们自定义一个GlobalFilter,去校验所有请求的请求参数中是否包含“token”

如果不包含请求参数“token”则不转发路由,否则执行正常的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//自定义全局过滤器需要实现GlobalFilter和Ordered接口
@Component
public class AuthGlobalFilter implements GlobalFilter, Ordered {
//完成判断逻辑
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain
chain) {
String token = exchange.getRequest().getQueryParams().getFirst("token");
if (StringUtils.isBlank(token)) {
System.out.println("鉴权失败");
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
//调用chain.filter继续向下游执行
return chain.filter(exchange);
}
//顺序,数值越小,优先级越高
@Override
public int getOrder() {
return 0;
}
}

1.8 网关限流

Sentinel支持对SpringCloud Gateway、Zuul等主流网关进行限流

Sentinel网关限流

从1.6.0版本开始,Sentinel提供了SpringCloud Gateway的适配模块,可以提供两种资源维度的限流:

  • route维度:即在Spring配置文件中配置的路由条目,资源名为对应的routeId
  • 自定义API维度:用户可以利用Sentinel提供的API来自定义一些API分组

导入依赖

1
2
3
4
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>

编写配置类

基于Sentinel 的Gateway限流是通过其提供的Filter来完成的

使用时只需注入对应的 SentinelGatewayFilter实例以及 SentinelGatewayBlockExceptionHandler 实例即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;
private final ServerCodecConfigurer serverCodecConfigurer;

public GatewayConfiguration(ObjectProvider<List<ViewResolver>>
viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers =
viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}

// 初始化一个限流的过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}

// 配置初始化的限流参数
@PostConstruct
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("product_route") //资源名称,对应路由id
.setCount(1) // 限流阈值
.setIntervalSec(1) // 统计时间窗口,单位是秒,默认是 1 秒
);
GatewayRuleManager.loadRules(rules);
}

// 配置限流的异常处理器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler
sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers,
serverCodecConfigurer);
}

// 自定义限流异常页面
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
@Override
public Mono<ServerResponse> handleRequest(ServerWebExchange
serverWebExchange, Throwable throwable) {
Map map = new HashMap<>();
map.put("code", 0);
map.put("message", "接口被限流了");
return ServerResponse.status(HttpStatus.OK).
contentType(MediaType.APPLICATION_JSON_UTF8).
body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}
}

自定义API分组

自定义API分组是一种更细粒度的限流规则定义

与上面不同在配置初始化的限流参数与自定义API分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Configuration
public class GatewayConfiguration {
private final List<ViewResolver> viewResolvers;

private final ServerCodecConfigurer serverCodecConfigurer;

public GatewayConfiguration(ObjectProvider<List<ViewResolver>> viewResolversProvider,
ServerCodecConfigurer serverCodecConfigurer) {
this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList);
this.serverCodecConfigurer = serverCodecConfigurer;
}

// 初始化一个限流的过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}

// 配置初始化的限流参数
@PostConstruct
public void initGatewayRules() {
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(new GatewayFlowRule("product_api1").setCount(1).setIntervalSec(1));
rules.add(new GatewayFlowRule("product_api2").setCount(1).setIntervalSec(1));
GatewayRuleManager.loadRules(rules);
}

// 配置限流的异常处理器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() {
return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer);
}

// 自定义限流异常页面
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
public Mono<ServerResponse> handleRequest(ServerWebExchange serverWebExchange, Throwable throwable) {
Map map = new HashMap<>();
map.put("code", 0);
map.put("message", "接口被限流了");
return ServerResponse.status(HttpStatus.OK).
contentType(MediaType.APPLICATION_JSON_UTF8).
body(BodyInserters.fromObject(map));
}
};
GatewayCallbackManager.setBlockHandler(blockRequestHandler);
}

//自定义API分组
@PostConstruct
private void initCustomizedApis() {
Set<ApiDefinition> definitions = new HashSet<>();
ApiDefinition api1 = new ApiDefinition("product_api1")
.setPredicateItems(new HashSet<ApiPredicateItem>() {{
// 以/product-serv/product/api1 开头的请求
add(new ApiPathPredicateItem().setPattern("/product-serv/product/api1/**").
setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX));
}});
ApiDefinition api2 = new ApiDefinition("product_api2")
.setPredicateItems(new HashSet<ApiPredicateItem>() {{
// 以/product-serv/product/api2/demo1 完成的url路径匹配
add(new ApiPathPredicateItem().setPattern("/product-serv/product/api2/demo1"));
}});
definitions.add(api1);
definitions.add(api2);
GatewayApiDefinitionManager.loadApiDefinitions(definitions);
}

}

2. Sleuth–链路追踪

2.1 链路追踪介绍

在大型系统的微服务化构建中,一个系统被拆分成了许多模块

这些模块负责不同的功能,组合成系统,最终可以提供丰富的功能

在这种架构中,一次请求往往需要涉及到多个服务

互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心,也就意味着这种架构形式也会存在一些问题:

  • 如何快速发现问题?
  • 如何判断故障影响范围?
  • 如何梳理服务依赖以及依赖的合理性?
  • 如何分析链路性能问题以及实时容量规划?

分布式链路追踪(Distributed Tracing),就是将一次分布式请求还原成调用链路,进行日志记录,性能监控并将一次分布式请求的调用情况集中展示

比如各个服务节点上的耗时、请求具体到达哪 台机器上、每个服务节点的请求状态等等

常见的链路追踪技术有下面这些:

  • cat

    由大众点评开源,基于Java开发的实时应用监控平台,包括实时应用监控,业务监控

    集成方案是通过代码埋点的方式来实现监控,比如: 拦截器,过滤器等

    对代码的侵入性很大,集成成本较高。风险较大。

  • zipkin

    由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现

    该产品结合spring-cloud-sleuth 使用较为简单, 集成很方便, 但是功能较简单

  • pinpoint

    Pinpoint是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具

    特点是支持多种插件,UI功能强大,接入端无代码侵入。

  • skywalking

    SkyWalking是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具

    特点是支持多种插件,UI功能较强,接入端无代码侵入。目前已加入Apache孵化器

  • Sleuth

    SpringCloud 提供的分布式系统中链路追踪解决方案 但是没与UI

注意:SpringCloud alibaba技术栈中并没有提供自己的链路追踪技术的,我们可以采用Sleuth + Zinkin来做链路追踪解决方案

2.2 Sleuth入门

2.2.1 Sleuth介绍

SpringCloud Sleuth主要功能就是在分布式系统中提供追踪解决方案

它大量借用了Google Dapper的设计, 先来了解一下Sleuth中的术语和相关概念

  • Trace

    由一组Trace Id相同的Span串联形成一个树状结构

    为了实现请求跟踪,当请求到达分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的标识(即TraceId),同时在分布式系 统内部流转的时候,框架始终保持传递该唯一值,直到整个请求的返回

    那么我们就可以使用该唯一标识将所有的请求串联起来,形成一条完整的请求链路

  • Span

    代表了一组基本的工作单元

    为了统计各处理单元的延迟,当请求到达各个服务组件的时候,也通过一个唯一标识(SpanId)来标记它的开始、具体过程和结束

    通过SpanId的开始和结束时间戳,就能统计该span的调用时间

    除此之外,我们还可以获取如事件的名称、请求信息等元数据

  • Annotation

    用它记录一段时间内的事件,内部使用的重要注释:

    cs(Client Send)客户端发出请求,开始一个请求的生命

    sr(Server Received)服务端接受到请求开始进行处理, sr-cs = 网络延迟(服务调用的时间)

    ss(Server Send)服务端处理完毕准备发送到客户端,ss - sr = 服务器上的请求处理时间

    cr(Client Reveived)客户端接受到服务端的响应,请求结束。 cr - sr = 请求的总时间

2.2.2 Sleuth入门

修改父工程引入Sleuth依赖

1
2
3
4
5
<!--链路追踪 Sleuth-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

启动微服务,调用之后,我们可以在控制台观察到sleuth的日志输出

微服务名称, traceId, spanid,是否将链路的追踪结果输出到第三方平台

[api-gateway,3977125f73391553,3977125f73391553,false]

[service-order,3977125f73391553,57547b5bf71f8242,false]

[service-product,3977125f73391553,449f5b3f3ef8d5c5,false]

其中 3977125f73391553 是TraceId, 57547b5bf71f8242 是SpanId,依次调用有一个全局的 TraceId,将调用链路串起来

仔细分析每个微服务的日志,不难看出请求的具体过程

2.3 Zipkin的集成

2.3.1 ZipKin介绍

Zipkin 是 Twitter 的一个开源项目,它基于Google Dapper实现,它致力于收集服务的定时数据, 以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现

我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源

除了面向开发的 API 接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,

比如:可以查询某段时间内各用户请求的处理时间等

Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch

Zipkin 的基础架构

上图展示了 Zipkin 的基础架构,它主要由 4 个核心组件构成:

  • Collector:

    收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为 Zipkin内部处理的 Span 格式,以支持后续的存储、分析、展示等功能

  • Storage:

    存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中

  • RESTful API:

    API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等

  • Web UI:

    UI 组件, 基于API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分析跟踪信息

Zipkin分为两端,一个是 Zipkin服务端,一个是 Zipkin客户端,客户端也就是微服务的应用

客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端

2.3.2 ZipKin服务端安装

第1步: 下载ZipKin的jar包

1
https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec

访问上面的网址,即可得到一个jar包,这就是ZipKin服务端的jar包

第2步: 通过命令行,输入下面的命令启动ZipKin Server

1
java -jar zipkin-server-2.12.9-exec.jar

第3步:通过浏览器访问 http://localhost:9411访问

2.3.3 Zipkin客户端集成

基本步骤:

  1. 导入依赖

  2. 添加配置

  3. 访问微服务

在需要使用的微服务上导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

添加配置

1
2
3
4
5
6
7
spring:
zipkin:
base-url: http://127.0.0.1:9411/ #zipkin server的请求地址
discovery-client-enabled: false #让nacos把它当成一个URL,而不要当做服务名
sleuth:
sampler:
probability: 1.0 #采样的百分比

2.4 ZipKin数据持久化

Zipkin Server默认会将追踪数据信息保存到内存,但这种方式不适合生产环境

Zipkin支持将追踪 数据持久化到mysql数据库或elasticsearch

2.4.1 使用mysql实现数据持久化

第1步: 创建mysql数据环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
CREATE TABLE IF NOT EXISTS zipkin_spans
(
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this
means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL,
`id` BIGINT NOT NULL,
`name` VARCHAR(255) NOT NULL,
`parent_id` BIGINT,
`debug` BIT(1),
`start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs
query and to implement TTL',
`duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration
and maxDuration query'
) ENGINE = InnoDB
ROW_FORMAT = COMPRESSED
CHARACTER SET = utf8
COLLATE
utf8_general_ci;
ALTER TABLE zipkin_spans
ADD UNIQUE KEY (`trace_id_high`, `trace_id`, `id`)
COMMENT 'ignore insert on duplicate';
ALTER TABLE zipkin_spans
ADD INDEX (`trace_id_high`, `trace_id`, `id`)
COMMENT 'for joining with zipkin_annotations';
ALTER TABLE zipkin_spans
ADD INDEX (`trace_id_high`, `trace_id`) COMMENT 'for
getTracesByIds';
ALTER TABLE zipkin_spans
ADD INDEX (`name`) COMMENT 'for getTraces and
getSpanNames';
ALTER TABLE zipkin_spans
ADD INDEX (`start_ts`) COMMENT 'for getTraces
ordering and range';
CREATE TABLE IF NOT EXISTS zipkin_annotations
(
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this
means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL COMMENT 'coincides with
zipkin_spans.trace_id',
`span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
`a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or
Annotation.value if type == -1',
`a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller
than 64KB',
`a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if
Annotation',
`a_timestamp` BIGINT COMMENT 'Used to implement TTL;
Annotation.timestamp or zipkin_spans.timestamp',
`endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is
null',
`endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint
is null, or no IPv6 address',
`endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint
is null',
`endpoint_service_name` VARCHAR(255) COMMENT 'Null when
Binary/Annotation.endpoint is null'
) ENGINE = InnoDB
ROW_FORMAT = COMPRESSED
CHARACTER SET = utf8
COLLATE
utf8_general_ci;
ALTER TABLE zipkin_annotations
ADD UNIQUE KEY (`trace_id_high`, `trace_id`,
`span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations
ADD INDEX (`trace_id_high`, `trace_id`,
`span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations
ADD INDEX (`trace_id_high`, `trace_id`)
COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations
ADD INDEX (`endpoint_service_name`) COMMENT
'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations
ADD INDEX (`a_type`) COMMENT 'for getTraces';
ALTER TABLE zipkin_annotations
ADD INDEX (`a_key`) COMMENT 'for getTraces';
ALTER TABLE zipkin_annotations
ADD INDEX (`trace_id`, `span_id`, `a_key`)
COMMENT 'for dependencies job';
CREATE TABLE IF NOT EXISTS zipkin_dependencies
(
`day` DATE NOT NULL,
`parent` VARCHAR(255) NOT NULL,
`child` VARCHAR(255) NOT NULL,
`call_count` BIGINT
) ENGINE = InnoDB
ROW_FORMAT = COMPRESSED
CHARACTER SET = utf8
COLLATE
utf8_general_ci;
ALTER TABLE zipkin_dependencies
ADD UNIQUE KEY (`day`, `parent`, `child`);

第2步: 在启动ZipKin Server的时候,指定数据保存的mysql的信息

1
java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=mysql --MYSQL_HOST=127.0.0.1 --MYSQL_TCP_PORT=3306 --MYSQL_DB=zipkin --MYSQL_USER=root --MYSQL_PASS=root

2.4.2 使用elasticsearch实现数据持久化

第1步: 下载elasticsearch

下载地址:https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-8-4

第2步: 启动elasticsearch

elasticsearch.bat文件

第3步: 在启动ZipKin Server的时候,指定数据保存的elasticsearch的信息

1
java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=elasticsearch --ES-HOST=localhost:9200

3. RocketMQ–消息驱动

3.1 MQ简介

MQ(Message Queue)是一种跨进程的通信机制,用于传递消息

通俗点说,就是一个先进先出的数据结构

异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦

主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列

同时,由于使用了消息列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合

流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增

秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况

为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ

秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ
  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户
  4. 用户收到秒杀成功的通知。

3.2 常见的MQ产品

目前业界有很多MQ产品,比较出名的有下面这些:

  • ZeroMQ

    号称最快的消息队列系统,尤其针对大吞吐量的需求场景

    扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装

    如果做为消息队列使用,需要开发大量的代码

    ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失

  • RabbitMQ

    使用erlang语言开发,性能较好,适合于企业级的开发

    但是不利于做二次开发和维护

  • ActiveMQ

    历史悠久的Apache开源项目

    已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,支持持久化到数据库

    对队列数较多的情况支持不好

  • RocketMQ

    阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单

  • Kafka

    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,现在主要用于大数据

    相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布 式系统

3.3 环境搭建

第一步: 下载RocketMQ

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

环境要求

  • Linux 64位操作系统
  • 64bit JDK 1.8+

第二步: 安装RocketMQ

  1. 上传文件到Linux系统
1
2
[root@jyw rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip
  1. 解压到安装目录
1
2
[root@jyw src]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@jyw src]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

第三步:启动RocketMQ

  1. 修改启动内存
1
2
3
# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
  1. 启动NameServer
1
2
3
4
5
6
[root@jyw rocketmq]# nohup ./bin/mqnamesrv &
[1] 1467
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@jyw rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log
# 查看端口
[root@jyw rocketmq]# netstat -an | grep 9876
  1. 启动Broker
1
2
[root@jyw rocketmq]# nohup bin/mqbroker -n localhost:9876 &
[root@jyw rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

第四步: 测试RocketMQ

  1. 测试消息发送
1
2
[root@jyw rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@jyw rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  1. 测试消息接收
1
2
[root@jyw rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@jyw rocketmq]# bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

第五步: 关闭RocketMQ

1
2
[root@jyw rocketmq]# bin/mqshutdown broker
[root@jyw rocketmq]# bin/mqshutdown namesrv

3.4 RocketMQ的架构及概念

 RocketMQ的架构

如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer

  • Broker(邮递员)

    Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能

  • NameServer(邮局)

    消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息

  • Producer(寄件人)

    消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息

  • Consumer(收件人)

    消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息

  • Topic(地区)

    用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息

  • Message Queue(邮件)

    为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个 Message Queue读取消息

  • Message

    Message 是消息的载体

  • Producer Group

    生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组

  • Consumer Group

    消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组

3.5 RocketMQ控制台安装

第一步: 下载工程

https://github.com/apache/rocketmq-externals/releases

第二步: 修改配置文件

1
2
3
# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.114.130:9876 #nameserv的地址,注意防火墙要开启9876端口

第三步: 打成jar包,并启动

1
2
3
4
# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

第四步: 访问控制台

127.0.0.1:7777

3.6 消息发送和接收演示

1
2
3
4
5
6
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>

3.6.1 发送消息

消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RocketMQSendMessageTest {

//发送消息
public static void main(String[] args) throws Exception {
//1.创建消息生产者,并且设置生产组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

//2 为生产者设置NameServer的地址
producer.setNamesrvAddr("192.168.109.131:9876");

//3 启动生产者
producer.start();

//4 构建消息对象,主要是设置消息的主题 标签 内容
Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

//5 发送消息 第二个参数代表超时时间
SendResult result = producer.send(message, 10000);
System.out.println(result);

//6 关闭生产者
producer.shutdown();
}
}

3.6.2 接收消息

消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RocketMQReceiveMessageTest {

//接收消息
public static void main(String[] args) throws Exception {

//1 创建消费者,并且为其指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");

//2 为消费者设置NameServer的地址
consumer.setNamesrvAddr("192.168.109.131:9876");

//3 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");

//4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
//处理获取到的消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//消费逻辑
System.out.println("Message===>" + list);

//返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//5 启动消费者
consumer.start();
System.out.println("启动消费者成功了");
}
}

3.7 案例

3.7.1 订单微服务发送消息

在order里添加依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

添加配置

1
2
3
4
rocketmq:
name-server: 192.168.114.130:9876 #rocketMQ服务的地址
producer:
group: shop-order # 生产者组

编写控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@GetMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);

//调用商品微服务,查询商品信息
Product product = productService.findByPid(pid);
if (product.getPid() == -1) {
Order order = new Order();
order.setPname("下单失败");
return order;
}

log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));

//下单(创建订单)
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);

orderService.save(order);

log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));
rocketMQTemplate.convertAndSend("order-topic",order);

return order;
}

3.7.2 用户微服务订阅消息

user模块依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shop-parent</artifactId>
<groupId>cn.jyw</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shop-user</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>cn.jyw</groupId>
<artifactId>shop-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
</project>

主类

1
2
3
4
5
6
7
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class);
}
}

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server:
port: 8071
spring:
application:
name: service-user
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///shop?serverTimezone=UTC&useUnicode=true&c3haracterEncoding=utf-8&useSSL=true
username: root
password: root
jpa:
properties:
hibernate:
hbm2ddl:
auto: update
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
rocketmq:
name-server: 192.168.114.130:9876 #rocketMQ服务的地址

编写短信接收

1
2
3
4
5
6
7
8
9
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
}
}

3.8 发送不同类型的消息

3.8.1 普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送

可靠同步发送

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等

可靠异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式

发送方通过回调接口接收服务器响应,并对响应结果进行处理

异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等

单向发送

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
public class MessageTypeTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;

//同步消息
@Test
public void testSyncSend() {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
System.out.println(sendResult);
}

//异步消息
@Test
public void testAsyncSend() throws InterruptedException {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
//让线程不要终止
Thread.sleep(30000000);
}

//单向消息
@Test
public void testOneWay() {
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
}
}

三种发送方式的对比

发送方式 发送TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

3.8.2 顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型

1
2
3
4
5
6
//同步顺序消息[异步顺序 单向顺序写法类似]
public void testSyncSendOrderly() {
//第三个参数用于队列的选择
rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息",
"xxxx");
}

3.8.3 事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致

事务消息交互流程

RocketMQ事务信息交互流程

两个概念:

  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端
    但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态
    处于该种状态下的消息即半事务消息

  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失
    RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时
    需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查

事务消息发送步骤:

  1. 发送方将半事务消息发送至RocketMQ服务端
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事 务消息
  3. 发送方开始执行本地事务逻辑
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),
    服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;
    服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息

事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

代码实现

1
2
3
4
5
6
7
8
//消息事物状态记录
@Entity(name = "shop_txlog")
@Data
public class TxLog {
@Id
private String txId;
private Date date;
}
1
2
public interface TxLogDao extends JpaRepository<TxLog, String> {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
public class OrderServiceImpl {

@Autowired
private OrderDao orderDao;

@Autowired
private TxLogDao txLogDao;

@Autowired
private RocketMQTemplate rocketMQTemplate;


public void createOrderBefore(Order order) {
String txId = UUID.randomUUID().toString();
//发送半事务消息
rocketMQTemplate.sendMessageInTransaction(
"tx_producer_group",
"tx_topic",
MessageBuilder.withPayload(order).setHeader("txId", txId).build(),
order
);
}

@Transactional
public void createOrder(String txId, Order order) {
//保存订单
orderDao.save(order);
TxLog txLog = new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date());
//记录事物日志
txLogDao.save(txLog);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImplListener implements RocketMQLocalTransactionListener {

@Autowired
private OrderServiceImpl OrderServiceImpl;

@Autowired
private TxLogDao txLogDao;

//执行本地事物
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

String txId = (String) msg.getHeaders().get("txId");

try {
//本地事物
Order order = (Order) arg;
OrderServiceImpl.createOrder(txId,order);

return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

//消息回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String txId = (String) msg.getHeaders().get("txId");
TxLog txLog = txLogDao.findById(txId).get();

if (txLog != null){
//本地事物(订单)成功了
return RocketMQLocalTransactionState.COMMIT;
}else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@RestController
@Slf4j
public class OrderController {

@Autowired
private RestTemplate restTemplate;

@Autowired
private OrderServiceImpl orderService;

@Autowired
private ProductService productService;

@GetMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {
log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);

//调用商品微服务,查询商品信息
Product product = productService.findByPid(pid);

if (product.getPid() == -100) {
Order order = new Order();
order.setOid(-100L);
order.setPname("下单失败");
return order;
}

log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));

//下单(创建订单)
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setNumber(1);

orderService.createOrderBefore(order);

log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));

return order;
}
}

3.9 RocketMQ支持两种消息模式

RocketMQ支持两种消息模式:

  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理
  • 集群消费: 一条消息只能被一个消费者实例消费
1
2
3
4
5
6
@RocketMQMessageListener(
consumerGroup = "shop-user", //消费者组名
topic = "order-topic",//消费主题
consumeMode = ConsumeMode.CONCURRENTLY,//消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(广播) CLUSTERING(集群,默认)
)