二十七 实现网关算力节点动态负载功能

基于第25章 Nginx 的负载模型,第26章动态刷新 Nginx 的配置和实现,把网关算力节点动态刷新到 Nginx 配置中,完成动态负载的功能实现。

设计

这里我们要实现的核心功能就是当有网关算力节点注册到注册中心的时候,可以被动态的配置到 Nginx 负载中,这样就可以实现动态的变更操作了。

在不使用 Nginx 代理的时候,前面章节使用网关都是通过直接方式的方式操作,如 http://10.20.111.132:7397/wg/activity/sayHi?str=1 那么现在因为有负载的设计,希望把来自于不同 URL的请求负载到不同的网关算力上去,所以这里的访问地址将变更为:http://10.20.111.132:8090/10001/wg/activity/sayHi?str=10001 从第1个地址到第2个地址来看,变化的点主要是端口由原来的访问网关算力节点到访问 Nginx,同时多了一个 10001 的路径。这个 10001 就是数据库中 group_id 网关分组的配置。我们也是用这个配置来区分访问哪一组网关。

27-设计

api-gateway-center 管理着网关算力的注册,并把注册的配置信息动态刷新到 Nginx 配置中。

同时在 Nginx 的配置中会重写URL,也就是把 10001 这个根目录路径给去掉,让它的功能只是负责路由即可,剩下的与原有直接访问网关算力不变。如果是在 通信组件中 core 中处理这样的uri的话,以后就必须带有类似的前缀才可以处理请求。而这样通过 nginx 重写,即使以后不需要做负载也可以直接访问网关算力节点。

实现

api-gateway-center

  1. GatewayContgManage#registerGatewayServerNode:在网关算力节点注册的时候,调用在第26章实现的动态刷新 Nginx 配置服务。此外注意本章节新添加了 Nginx 路径重新配置,以及在 yml 配置文件中添加了 Nginx IP 的配置。

LoadBalancingService 负载均衡配置服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package cn.ray.gateway.center.domain.docker.service;

import cn.ray.gateway.center.domain.docker.model.aggregates.NginxConfig;
import cn.ray.gateway.center.domain.docker.model.vo.LocationVO;
import cn.ray.gateway.center.domain.docker.model.vo.UpstreamVO;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.DockerClientConfig;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* @author Ray
* @date 2023/6/3 23:33
* @description 负载均衡配置服务
*/
@Service
public class LoadBalancingService extends AbstractLoadBalancing {

private Logger logger = LoggerFactory.getLogger(LoadBalancingService.class);

@Value("${nginx.server_name}")
private String nginxServerName;

@Override
protected String createNginxConfigFile(NginxConfig nginxConfig) throws IOException {
// 创建文件
String nginxConfigContentStr = buildNginxConfig(nginxConfig.getUpstreamList(), nginxConfig.getLocationList());
File file = new File("/data/nginx/nginx.conf");
if (!file.exists()) {
boolean success = file.createNewFile();
if (success) {
logger.info("nginx.conf file created successfully.");
} else {
logger.info("nginx.conf file already exists.");
}
}
// 写入内容
FileWriter writer = new FileWriter(file);
writer.write(nginxConfigContentStr);
writer.close();
// 返回结果
return file.getAbsolutePath();
}

/**
* 拷贝容器文件到本地案例;https://github.com/docker-java/docker-java/issues/991
*/
@Override
protected void copyDockerFile(String applicationName, String containerFilePath, String localNginxPath) throws InterruptedException, IOException {
// Docker client
DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
.withDockerHost("unix:///var/run/docker.sock").build();

DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();

// Copy file from container
try (TarArchiveInputStream tarStream = new TarArchiveInputStream(
dockerClient.copyArchiveFromContainerCmd(applicationName,
containerFilePath).exec())) {
unTar(tarStream, new File(localNginxPath));
}
dockerClient.close();
}

private static void unTar(TarArchiveInputStream tis, File destFile)
throws IOException {
TarArchiveEntry tarEntry = null;
while ((tarEntry = tis.getNextTarEntry()) != null) {
if (tarEntry.isDirectory()) {
if (!destFile.exists()) {
destFile.mkdirs();
}
} else {
FileOutputStream fos = new FileOutputStream(destFile);
IOUtils.copy(tis, fos);
fos.close();
}
}
tis.close();
}

@Override
protected void refreshNginxConfig(String nginxName) throws InterruptedException, IOException {
// Docker client
DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder()
.withDockerHost("unix:///var/run/docker.sock").build();

DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();

String containerId = dockerClient.listContainersCmd()
.withNameFilter(new ArrayList<String>() {{
add(nginxName);
}})
.exec()
.get(0)
.getId();

ExecCreateCmdResponse execCreateCmdResponse = dockerClient
.execCreateCmd(containerId)
.withCmd("nginx", "-s", "reload")
.exec();

dockerClient.execStartCmd(execCreateCmdResponse.getId())
.exec(new ResultCallback.Adapter<>()).awaitCompletion();

dockerClient.close();
}

private String buildNginxConfig(List<UpstreamVO> upstreamList, List<LocationVO> locationList) {
String config = "\n" +
"user nginx;\n" +
"worker_processes auto;\n" +
"\n" +
"error_log /var/log/nginx/error.log notice;\n" +
"pid /var/run/nginx.pid;\n" +
"\n" +
"\n" +
"events {\n" +
" worker_connections 1024;\n" +
"}\n" +
"\n" +
"\n" +
"http {\n" +
" include /etc/nginx/mime.types;\n" +
" default_type application/octet-stream;\n" +
"\n" +
" log_format main '$remote_addr - $remote_user [$time_local] \"$request\" '\n" +
" '$status $body_bytes_sent \"$http_referer\" '\n" +
" '\"$http_user_agent\" \"$http_x_forwarded_for\"';\n" +
"\n" +
" access_log /var/log/nginx/access.log main;\n" +
"\n" +
" sendfile on;\n" +
" #tcp_nopush on;\n" +
"\n" +
" keepalive_timeout 65;\n" +
"\n" +
" #gzip on;\n" +
"\n" +
" include /etc/nginx/conf.d/*.conf;\n" +
"\n" +
" # 设定负载均衡的服务器列表 命令:docker exec Nginx nginx -s reload\n" +
"upstream_config_placeholder" +
"\n" +
" # HTTP服务器\n" +
" server {\n" +
" # 监听80端口,用于HTTP协议\n" +
" listen 80;\n" +
"\n" +
" # 定义使用IP/域名访问\n" +
" server_name " + nginxServerName + ";\n" +
"\n" +
" # 首页\n" +
" index index.html;\n" +
"\n" +
" # 反向代理的路径(upstream绑定),location 后面设置映射的路径\n" +
" # location / {\n" +
" # proxy_pass http://" + nginxServerName + ":9001;\n" +
" # }\n" +
"\n" +
"location_config_placeholder" +
" }\n" +
"}\n";

// 组装配置 Upstream
StringBuilder upstreamStr = new StringBuilder();
for (UpstreamVO upstream : upstreamList) {
upstreamStr.append("\t").append("upstream").append(" ").append(upstream.getName()).append(" {\r\n");
upstreamStr.append("\t").append("\t").append(upstream.getStrategy()).append("\r\n").append("\r\n");
List<String> servers = upstream.getServers();
for (String server : servers) {
upstreamStr.append("\t").append("\t").append("server").append(" ").append(server).append(";\r\n");
}
upstreamStr.append("\t").append("}").append("\r\n").append("\r\n");
}

// 组装配置 Location
StringBuilder locationStr = new StringBuilder();
for (LocationVO location : locationList) {
// location /api01/
locationStr.append("\t").append("\t").append("location").append(" ").append(location.getName()).append(" {\r\n");
// rewrite ^/api01/(.*)$ /$1 break; 设置重写URL,在代理后去掉根路径 api01 此字段只是配合路由,不做处理
locationStr.append("\t").append("\t").append("\t").append("rewrite ^").append(location.getName()).append("(.*)$ /$1 break;").append("\r\n");
// proxy_pass http://api01;
locationStr.append("\t").append("\t").append("\t").append("proxy_pass").append(" ").append(location.getProxy_pass()).append("\r\n");
locationStr.append("\t").append("\t").append("}").append("\r\n").append("\r\n");
}

// 替换配置
config = config.replace("upstream_config_placeholder", upstreamStr.toString());
config = config.replace("location_config_placeholder", locationStr.toString());
return config;
}

}

application.yml 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
port: 8001

nginx:
server_name: 10.20.111.132

spring:
datasource:
username: root
password: 123456
url: jdbc:mysql://localhost:3306/api-gateway?useUnicode=true&characterEncoding=UTF-8
driver-class-name: com.mysql.cj.jdbc.Driver
redis:
host: localhost
port: 6379

mybatis:
mapper-locations: classpath:/mapper/*.xml
config-location: classpath:/config/mybatis-config.xml

GatewayContgManage#registerGatewayServerNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 注册网关服务节点
*
* @param groupId 分组标识
* @param gatewayId 网关标识
* @param gatewayName 网关名称
* @param gatewayAddress 网关地址
* @return
*/
@PostMapping(value = "registerGateway")
public Result<Boolean> registerGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId, @RequestParam String gatewayName, @RequestParam String gatewayAddress) {
try {
logger.info("注册网关服务节点 gatewayId:{} gatewayName:{} gatewayAddress:{}", gatewayId, gatewayName, gatewayAddress);
// 1. 注册&更新网关算力信息
boolean isSuccess = configManageService.registerGatewayServerNode(groupId, gatewayId, gatewayName, gatewayAddress);
// 2. 读取最新网关算力数据【由于可能来自于多套注册中心,所以从数据库或者Redis中获取,更为准确】
List<GatewayServerDetailVO> gatewayServerDetailVOList = configManageService.queryGatewayServerDetailList();
// 3. 组装Nginx网关刷新配置信息
Map<String, List<GatewayServerDetailVO>> gatewayServerDetailMap = gatewayServerDetailVOList.stream()
.collect(Collectors.groupingBy(GatewayServerDetailVO::getGroupId));
Set<String> uniqueGroupIdList = gatewayServerDetailMap.keySet();
// 3.1 Location 信息
List<LocationVO> locationList = new ArrayList<>();
for (String name : uniqueGroupIdList) {
// location /api01/ {
// rewrite ^/api01/(.*)$ /$1 break;
// proxy_pass http://api01;
// }
locationList.add(new LocationVO("/" + name + "/", "http://" + name + ";"));
}
// 3.2 Upstream 信息
List<UpstreamVO> upstreamList = new ArrayList<>();
for (String name : uniqueGroupIdList) {
// upstream api01 {
// least_conn;
// server 172.20.10.12:9001;
// #server 172.20.10.12:9002;
// }
List<String> servers = gatewayServerDetailMap.get(name).stream()
.map(GatewayServerDetailVO::getGatewayAddress)
.collect(Collectors.toList());
upstreamList.add(new UpstreamVO(name, "least_conn;", servers));
}
// 4. 刷新Nginx配置
loadBalancingService.updateNginxConfig(new NginxConfig(upstreamList, locationList));
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), isSuccess);
} catch (Exception e) {
logger.error("注册网关服务节点异常", e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false);
}
}

api-gateway-core

  1. RequestParser#parse:bug 优化,将获取请求 getContenType() 放在判断时 POST 方法后,这样就不会导致 GET 方法也必须带上 content-Type 了。

RequestParser 请求解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package cn.ray.gateway.core.socket.agreement;

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* @author Ray
* @date 2023/5/16 15:29
* @description 请求解析器,解析 HTTP 请求,GET/POST, form-data/raw-json
*/
public class RequestParser {

private final FullHttpRequest request;

public RequestParser(FullHttpRequest request) {
this.request = request;
}

public Map<String,Object> parse() {
// 获取请求类型
HttpMethod method = request.method();
if (HttpMethod.GET == method) {
Map<String, Object> parameterMap = new HashMap<>();
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
decoder.parameters().forEach( (key, value) -> parameterMap.put(key,value.get(0)));
return parameterMap;
} else if (HttpMethod.POST == method) {
// 获取 Content-Type
String contentType = getContentType();
switch (contentType) {
case "multipart/form-data" :
Map<String, Object> parameterMap = new HashMap<>();
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
decoder.offer(request);
decoder.getBodyHttpDatas().forEach(
data -> {
Attribute attribute = (Attribute) data;
try {
parameterMap.put(data.getName(), attribute.getValue());
} catch (IOException e) {
e.printStackTrace();
}
}
);
return parameterMap;
case "application/json" :
ByteBuf byteBuf = request.content().copy();
if (byteBuf.isReadable()) {
String content = byteBuf.toString(StandardCharsets.UTF_8);
return JSON.parseObject(content);
}
break;
case "none":
return new HashMap<>();
default:
throw new RuntimeException("未实现的协议类型 Content-Type : " + contentType);
}
}
throw new RuntimeException("未实现的请求类型 HttpMethod : " + method);
}

private String getContentType() {

// `Optional`类型是Java 8中引入的用于表示可能为空的值的容器类型。
// 通过使用`Optional`,可以更好地处理可能出现`null`的情况,避免空指针异常。
//1. `request.headers()`:`request`是一个请求对象,这里调用了`headers()`方法来获取请求头。
//2. `.entries()`:这个方法用于将请求头转换为一个包含键值对的集合,每个键值对表示一个请求头的键和值。
//3. `.stream()`:将集合转换为一个流,以便进行后续的操作。
//4. `.filter(...)`:这里使用`filter`方法对流中的元素进行筛选。筛选条件是键名等于"Content-Type"的元素。
//5. `val -> val.getKey().equals("Content-Type")`:这是一个Lambda表达式,它表示筛选条件。`val`代表流中的元素,这里指的是请求头的一个键值对。`val.getKey()`返回键的值,然后与"Content-Type"进行比较。
//6. `.findAny()`:找到任意一个满足筛选条件的元素。如果存在符合条件的元素,则返回一个`Optional`对象,否则返回空的`Optional`对象。
//最终,`header`变量将包含满足条件的键值对,如果存在的话。
Optional<Map.Entry<String, String>> header = request.headers().entries().stream().filter(
val -> val.getKey().equals("Content-Type")
).findAny();

// 如果 header 存在 Content-Type ,那么该值将被返回;如果header为空,那么null将被返回。
Map.Entry<String, String> entry = header.orElse(null);
if (null == entry) return "none";
// application/json、multipart/form-data;
String contentType = entry.getValue();
int indexOf = contentType.indexOf(";");
if (indexOf > 0) {
return contentType.substring(0,indexOf);
} else {
return contentType;
}
}

/**
* 简单处理请求 URI
*/
public String getUri() {
String uri = request.uri();
int idx = uri.indexOf("?");
uri = idx > 0 ? uri.substring(0, idx) : uri;
if (uri.equals("/favicon.ico")) return null;
return uri;
}
}

测试

本章的测试需要启动 api-gateway-center、api-gateway-engine、api-gateway-test 三个服务。

  1. 注意开启服务:zookeeper、Nginx 、Redis, 并按照网关所需启动以及修改对应的 IP 信息。
  2. api-gateway-center 提供注册中心服务。
  3. api-gateway-engine 启动的时候会由引擎下的 assist 助手组件拉取注册中心所中归属此算力节点 core 的 RPC 应用接口方法配置信息,并在本地完成 HTTP 和 RPC 的映射。
  4. api-gateway-test 提供RPC接口测试服务。

以上服务启动完成后,就可以调用接口测试了;在测试过程中,我们先通过直连网关不走 Nginx 代理,确保网关可用。之后再走 Nginx 接口访问。

api-gateway-center

27-测试-1

api-gateway-engine

27-测试-2

api-gateway-test

27-测试-3

此时的 nginx.conf 配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
user  nginx;
worker_processes auto;

error_log /var/log/nginx/error.log notice;
pid /var/run/nginx.pid;


events {
worker_connections 1024;
}


http {
include /etc/nginx/mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

sendfile on;
#tcp_nopush on;

keepalive_timeout 65;

#gzip on;

include /etc/nginx/conf.d/*.conf;

# 设定负载均衡的服务器列表 命令:docker exec Nginx nginx -s reload
upstream 10001 {
least_conn;

server 10.20.111.132:7397;
}


# HTTP服务器
server {
# 监听80端口,用于HTTP协议
listen 80;

# 定义使用IP/域名访问
server_name 10.20.111.132;

# 首页
index index.html;

# 反向代理的路径(upstream绑定),location 后面设置映射的路径
# location / {
# proxy_pass http://10.20.111.132:9001;
# }

location /10001/ {
rewrite ^/10001/(.*)$ /$1 break;
proxy_pass http://10001;
}

}
}

之后再利用 idea 的 开启多配置实例,更改配置文件启动另一个网关引擎

27-测试-4

27-测试-5

27-测试-6

变更后的 nginx.conf 配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
user  nginx;
worker_processes auto;

error_log /var/log/nginx/error.log notice;
pid /var/run/nginx.pid;


events {
worker_connections 1024;
}


http {
include /etc/nginx/mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log /var/log/nginx/access.log main;

sendfile on;
#tcp_nopush on;

keepalive_timeout 65;

#gzip on;

include /etc/nginx/conf.d/*.conf;

# 设定负载均衡的服务器列表 命令:docker exec Nginx nginx -s reload
upstream 10001 {
least_conn;

server 10.20.111.132:7397;
server 10.20.111.132:7398;
}


# HTTP服务器
server {
# 监听80端口,用于HTTP协议
listen 80;

# 定义使用IP/域名访问
server_name 10.20.111.132;

# 首页
index index.html;

# 反向代理的路径(upstream绑定),location 后面设置映射的路径
# location / {
# proxy_pass http://10.20.111.132:9001;
# }

location /10001/ {
rewrite ^/10001/(.*)$ /$1 break;
proxy_pass http://10001;
}

}
}

这时再通过 Nginx 代理请求网关:

engine(7398)

27-测试-7

engine(7397)

27-测试-8可以看到请求被打到不同的网关服务上了

思考

问题:

engine 部署在 docker 时无法连接宿主机 redis :RedisMessageListenerContainer : Connection failure occurred. Restarting subscription task after 5000 ms

解决方案:

这里目前是换成了个人云服务器的 Redis。

关于Nginx

1
2
3
4
location /10001/ {
rewrite ^/10001/(.*)$ /$1 break;
proxy_pass http://10001;
}

这段代码是Nginx的location配置,用于处理请求路径为/10001/的情况。

具体操作如下:

  1. location /10001/:该配置指定了匹配的请求路径为/10001/
  2. rewrite ^/10001/(.*)$ /$1 break;:这一行配置使用正则表达式匹配请求路径中/10001/后面的内容,并将其重写为不包含/10001/的路径(/$1表示匹配的内容),然后使用break指令终止重写阶段的处理。
    • 例如,如果请求路径为/10001/example,经过重写后会变成/example
  3. proxy_pass http://10001;:这一行配置指定了反向代理的目标服务器为http://10001,即将请求转发到名为10001的后端服务器。
    • 当客户端发起请求时,Nginx会将请求转发给http://10001,并将响应返回给客户端。

综合起来,这段配置的作用是将请求路径为/10001/及其后面的内容重写为不包含/10001/的路径,并将请求转发给名为10001的后端服务器进行处理。

个人对于 docker 以及 Nginx 可能还不太了解,后续需要深入去理解一下。