基于第25章 Nginx 的负载模型,第26章动态刷新 Nginx 的配置和实现,把网关算力节点动态刷新到 Nginx 配置中,完成动态负载的功能实现。
设计
这里我们要实现的核心功能就是当有网关算力节点注册到注册中心的时候,可以被动态的配置到 Nginx 负载中,这样就可以实现动态的变更操作了。
在不使用 Nginx 代理的时候,前面章节使用网关都是通过直接方式的方式操作,如 http://10.20.111.132:7397/wg/activity/sayHi?str=1 那么现在因为有负载的设计,希望把来自于不同 URL的请求负载到不同的网关算力上去,所以这里的访问地址将变更为:http://10.20.111.132:8090/10001/wg/activity/sayHi?str=10001 从第1个地址到第2个地址来看,变化的点主要是端口由原来的访问网关算力节点到访问 Nginx,同时多了一个 10001 的路径。这个 10001 就是数据库中 group_id 网关分组的配置。我们也是用这个配置来区分访问哪一组网关。
api-gateway-center 管理着网关算力的注册,并把注册的配置信息动态刷新到 Nginx 配置中。
同时在 Nginx 的配置中会重写URL,也就是把 10001 这个根目录路径给去掉,让它的功能只是负责路由即可,剩下的与原有直接访问网关算力不变。如果是在 通信组件中 core 中处理这样的uri的话,以后就必须带有类似的前缀才可以处理请求。而这样通过 nginx 重写,即使以后不需要做负载也可以直接访问网关算力节点。
实现
api-gateway-center
- GatewayContgManage#registerGatewayServerNode:在网关算力节点注册的时候,调用在第26章实现的动态刷新 Nginx 配置服务。此外注意本章节新添加了 Nginx 路径重新配置,以及在 yml 配置文件中添加了 Nginx IP 的配置。
LoadBalancingService 负载均衡配置服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
| package cn.ray.gateway.center.domain.docker.service;
import cn.ray.gateway.center.domain.docker.model.aggregates.NginxConfig; import cn.ray.gateway.center.domain.docker.model.vo.LocationVO; import cn.ray.gateway.center.domain.docker.model.vo.UpstreamVO; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.command.ExecCreateCmdResponse; import com.github.dockerjava.core.DefaultDockerClientConfig; import com.github.dockerjava.core.DockerClientBuilder; import com.github.dockerjava.core.DockerClientConfig; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;
import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List;
@Service public class LoadBalancingService extends AbstractLoadBalancing {
private Logger logger = LoggerFactory.getLogger(LoadBalancingService.class);
@Value("${nginx.server_name}") private String nginxServerName;
@Override protected String createNginxConfigFile(NginxConfig nginxConfig) throws IOException { String nginxConfigContentStr = buildNginxConfig(nginxConfig.getUpstreamList(), nginxConfig.getLocationList()); File file = new File("/data/nginx/nginx.conf"); if (!file.exists()) { boolean success = file.createNewFile(); if (success) { logger.info("nginx.conf file created successfully."); } else { logger.info("nginx.conf file already exists."); } } FileWriter writer = new FileWriter(file); writer.write(nginxConfigContentStr); writer.close(); return file.getAbsolutePath(); }
@Override protected void copyDockerFile(String applicationName, String containerFilePath, String localNginxPath) throws InterruptedException, IOException { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder() .withDockerHost("unix:///var/run/docker.sock").build();
DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
try (TarArchiveInputStream tarStream = new TarArchiveInputStream( dockerClient.copyArchiveFromContainerCmd(applicationName, containerFilePath).exec())) { unTar(tarStream, new File(localNginxPath)); } dockerClient.close(); }
private static void unTar(TarArchiveInputStream tis, File destFile) throws IOException { TarArchiveEntry tarEntry = null; while ((tarEntry = tis.getNextTarEntry()) != null) { if (tarEntry.isDirectory()) { if (!destFile.exists()) { destFile.mkdirs(); } } else { FileOutputStream fos = new FileOutputStream(destFile); IOUtils.copy(tis, fos); fos.close(); } } tis.close(); }
@Override protected void refreshNginxConfig(String nginxName) throws InterruptedException, IOException { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder() .withDockerHost("unix:///var/run/docker.sock").build();
DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
String containerId = dockerClient.listContainersCmd() .withNameFilter(new ArrayList<String>() {{ add(nginxName); }}) .exec() .get(0) .getId();
ExecCreateCmdResponse execCreateCmdResponse = dockerClient .execCreateCmd(containerId) .withCmd("nginx", "-s", "reload") .exec();
dockerClient.execStartCmd(execCreateCmdResponse.getId()) .exec(new ResultCallback.Adapter<>()).awaitCompletion();
dockerClient.close(); }
private String buildNginxConfig(List<UpstreamVO> upstreamList, List<LocationVO> locationList) { String config = "\n" + "user nginx;\n" + "worker_processes auto;\n" + "\n" + "error_log /var/log/nginx/error.log notice;\n" + "pid /var/run/nginx.pid;\n" + "\n" + "\n" + "events {\n" + " worker_connections 1024;\n" + "}\n" + "\n" + "\n" + "http {\n" + " include /etc/nginx/mime.types;\n" + " default_type application/octet-stream;\n" + "\n" + " log_format main '$remote_addr - $remote_user [$time_local] \"$request\" '\n" + " '$status $body_bytes_sent \"$http_referer\" '\n" + " '\"$http_user_agent\" \"$http_x_forwarded_for\"';\n" + "\n" + " access_log /var/log/nginx/access.log main;\n" + "\n" + " sendfile on;\n" + " #tcp_nopush on;\n" + "\n" + " keepalive_timeout 65;\n" + "\n" + " #gzip on;\n" + "\n" + " include /etc/nginx/conf.d/*.conf;\n" + "\n" + " # 设定负载均衡的服务器列表 命令:docker exec Nginx nginx -s reload\n" + "upstream_config_placeholder" + "\n" + " # HTTP服务器\n" + " server {\n" + " # 监听80端口,用于HTTP协议\n" + " listen 80;\n" + "\n" + " # 定义使用IP/域名访问\n" + " server_name " + nginxServerName + ";\n" + "\n" + " # 首页\n" + " index index.html;\n" + "\n" + " # 反向代理的路径(upstream绑定),location 后面设置映射的路径\n" + " # location / {\n" + " # proxy_pass http://" + nginxServerName + ":9001;\n" + " # }\n" + "\n" + "location_config_placeholder" + " }\n" + "}\n";
StringBuilder upstreamStr = new StringBuilder(); for (UpstreamVO upstream : upstreamList) { upstreamStr.append("\t").append("upstream").append(" ").append(upstream.getName()).append(" {\r\n"); upstreamStr.append("\t").append("\t").append(upstream.getStrategy()).append("\r\n").append("\r\n"); List<String> servers = upstream.getServers(); for (String server : servers) { upstreamStr.append("\t").append("\t").append("server").append(" ").append(server).append(";\r\n"); } upstreamStr.append("\t").append("}").append("\r\n").append("\r\n"); }
StringBuilder locationStr = new StringBuilder(); for (LocationVO location : locationList) { locationStr.append("\t").append("\t").append("location").append(" ").append(location.getName()).append(" {\r\n"); locationStr.append("\t").append("\t").append("\t").append("rewrite ^").append(location.getName()).append("(.*)$ /$1 break;").append("\r\n"); locationStr.append("\t").append("\t").append("\t").append("proxy_pass").append(" ").append(location.getProxy_pass()).append("\r\n"); locationStr.append("\t").append("\t").append("}").append("\r\n").append("\r\n"); }
config = config.replace("upstream_config_placeholder", upstreamStr.toString()); config = config.replace("location_config_placeholder", locationStr.toString()); return config; }
}
|
application.yml 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| server: port: 8001
nginx: server_name: 10.20.111.132
spring: datasource: username: root password: 123456 url: jdbc:mysql://localhost:3306/api-gateway?useUnicode=true&characterEncoding=UTF-8 driver-class-name: com.mysql.cj.jdbc.Driver redis: host: localhost port: 6379
mybatis: mapper-locations: classpath:/mapper/*.xml config-location: classpath:/config/mybatis-config.xml
|
GatewayContgManage#registerGatewayServerNode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
@PostMapping(value = "registerGateway") public Result<Boolean> registerGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId, @RequestParam String gatewayName, @RequestParam String gatewayAddress) { try { logger.info("注册网关服务节点 gatewayId:{} gatewayName:{} gatewayAddress:{}", gatewayId, gatewayName, gatewayAddress); boolean isSuccess = configManageService.registerGatewayServerNode(groupId, gatewayId, gatewayName, gatewayAddress); List<GatewayServerDetailVO> gatewayServerDetailVOList = configManageService.queryGatewayServerDetailList(); Map<String, List<GatewayServerDetailVO>> gatewayServerDetailMap = gatewayServerDetailVOList.stream() .collect(Collectors.groupingBy(GatewayServerDetailVO::getGroupId)); Set<String> uniqueGroupIdList = gatewayServerDetailMap.keySet(); List<LocationVO> locationList = new ArrayList<>(); for (String name : uniqueGroupIdList) { locationList.add(new LocationVO("/" + name + "/", "http://" + name + ";")); } List<UpstreamVO> upstreamList = new ArrayList<>(); for (String name : uniqueGroupIdList) { List<String> servers = gatewayServerDetailMap.get(name).stream() .map(GatewayServerDetailVO::getGatewayAddress) .collect(Collectors.toList()); upstreamList.add(new UpstreamVO(name, "least_conn;", servers)); } loadBalancingService.updateNginxConfig(new NginxConfig(upstreamList, locationList)); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), isSuccess); } catch (Exception e) { logger.error("注册网关服务节点异常", e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
|
api-gateway-core
- RequestParser#parse:bug 优化,将获取请求 getContenType() 放在判断时 POST 方法后,这样就不会导致 GET 方法也必须带上 content-Type 了。
RequestParser 请求解析器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| package cn.ray.gateway.core.socket.agreement;
import com.alibaba.fastjson.JSON; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.multipart.Attribute; import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Optional;
public class RequestParser {
private final FullHttpRequest request;
public RequestParser(FullHttpRequest request) { this.request = request; }
public Map<String,Object> parse() { HttpMethod method = request.method(); if (HttpMethod.GET == method) { Map<String, Object> parameterMap = new HashMap<>(); QueryStringDecoder decoder = new QueryStringDecoder(request.uri()); decoder.parameters().forEach( (key, value) -> parameterMap.put(key,value.get(0))); return parameterMap; } else if (HttpMethod.POST == method) { String contentType = getContentType(); switch (contentType) { case "multipart/form-data" : Map<String, Object> parameterMap = new HashMap<>(); HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request); decoder.offer(request); decoder.getBodyHttpDatas().forEach( data -> { Attribute attribute = (Attribute) data; try { parameterMap.put(data.getName(), attribute.getValue()); } catch (IOException e) { e.printStackTrace(); } } ); return parameterMap; case "application/json" : ByteBuf byteBuf = request.content().copy(); if (byteBuf.isReadable()) { String content = byteBuf.toString(StandardCharsets.UTF_8); return JSON.parseObject(content); } break; case "none": return new HashMap<>(); default: throw new RuntimeException("未实现的协议类型 Content-Type : " + contentType); } } throw new RuntimeException("未实现的请求类型 HttpMethod : " + method); }
private String getContentType() {
Optional<Map.Entry<String, String>> header = request.headers().entries().stream().filter( val -> val.getKey().equals("Content-Type") ).findAny();
Map.Entry<String, String> entry = header.orElse(null); if (null == entry) return "none"; String contentType = entry.getValue(); int indexOf = contentType.indexOf(";"); if (indexOf > 0) { return contentType.substring(0,indexOf); } else { return contentType; } }
public String getUri() { String uri = request.uri(); int idx = uri.indexOf("?"); uri = idx > 0 ? uri.substring(0, idx) : uri; if (uri.equals("/favicon.ico")) return null; return uri; } }
|
测试
本章的测试需要启动 api-gateway-center、api-gateway-engine、api-gateway-test 三个服务。
- 注意开启服务:zookeeper、Nginx 、Redis, 并按照网关所需启动以及修改对应的 IP 信息。
- api-gateway-center 提供注册中心服务。
- api-gateway-engine 启动的时候会由引擎下的 assist 助手组件拉取注册中心所中归属此算力节点 core 的 RPC 应用接口方法配置信息,并在本地完成 HTTP 和 RPC 的映射。
- api-gateway-test 提供RPC接口测试服务。
以上服务启动完成后,就可以调用接口测试了;在测试过程中,我们先通过直连网关不走 Nginx 代理,确保网关可用。之后再走 Nginx 接口访问。
api-gateway-center
api-gateway-engine
api-gateway-test
此时的 nginx.conf 配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| user nginx; worker_processes auto;
error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;
events { worker_connections 1024; }
http { include /etc/nginx/mime.types; default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on; #tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d
|
之后再利用 idea 的 开启多配置实例,更改配置文件启动另一个网关引擎
变更后的 nginx.conf 配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| user nginx; worker_processes auto;
error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;
events { worker_connections 1024; }
http { include /etc/nginx/mime.types; default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on; #tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d
|
这时再通过 Nginx 代理请求网关:
engine(7398)
engine(7397)
可以看到请求被打到不同的网关服务上了
思考
问题:
engine 部署在 docker 时无法连接宿主机 redis :RedisMessageListenerContainer : Connection failure occurred. Restarting subscription task after 5000 ms
解决方案:
这里目前是换成了个人云服务器的 Redis。
关于Nginx
1 2 3 4
| location /10001/ { rewrite ^/10001/(.*)$ /$1 break; proxy_pass http: }
|
这段代码是Nginx的location
配置,用于处理请求路径为/10001/
的情况。
具体操作如下:
location /10001/
:该配置指定了匹配的请求路径为/10001/
。
rewrite ^/10001/(.*)$ /$1 break;
:这一行配置使用正则表达式匹配请求路径中/10001/
后面的内容,并将其重写为不包含/10001/
的路径(/$1
表示匹配的内容),然后使用break
指令终止重写阶段的处理。
- 例如,如果请求路径为
/10001/example
,经过重写后会变成/example
proxy_pass http://10001;
:这一行配置指定了反向代理的目标服务器为http://10001
,即将请求转发到名为10001
的后端服务器。
- 当客户端发起请求时,Nginx会将请求转发给
http://10001
,并将响应返回给客户端。
综合起来,这段配置的作用是将请求路径为/10001/
及其后面的内容重写为不包含/10001/
的路径,并将请求转发给名为10001
的后端服务器进行处理。
个人对于 docker 以及 Nginx 可能还不太了解,后续需要深入去理解一下。