二十九 功能完善(算力关联、接口上报、调用反馈)

在网关系统合并的工程下,拉取分支完善功能流程。包括:算力关联、接口上报、调用反馈。

设计

本章的扩展功能主要从几个方面来考虑:

  1. 网关的注册中心需要提供一个网关算力与RPC服务的分配关系。

    groupld —1vn-> gatewayld —1vn—> systemld

    • 10001 -> api-gateway-g3 -> api-gateway-test-01-provider
    • 10001 -> api-gateway-g3 -> api-gateway-test-02-provider
    • 10001 -> api-gateway-g4 -> api-gateway-test-03-provider
    • 10001 -> api-gateway-g4 -> api-gateway-test-04-provider
    • 10002 -> api-gateway-g5 -> api-gateway-test-05-provider
    • 10002 -> api-gateway-g5 -> api-gatewav-test-06-provider
  2. RPC 应用上报的 SDK 中需要添加一个开关,是否允许上报。这样可以更方便的从应用中摘除 SDK 的功能,而不需要注释掉多行代码,更便于测试。

  3. 核心通信组件 core 中需要在返回的通信协议中携带上是网关算力的地址,这样可以更方便看到负载起的作用。

实现

center 提供匹配接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 网关算力与系统挂载配置
* groupId --1vn--> gatewayId --1vn--> systemId
* 10001 -> api-gateway-g3 -> api-gateway-test-01-provider
* 10001 -> api-gateway-g3 -> api-gateway-test-02-provider
* 10001 -> api-gateway-g4 -> api-gateway-test-03-provider
* 10001 -> api-gateway-g4 -> api-gateway-test-04-provider
* 10002 -> api-gateway-g5 -> api-gateway-test-05-provider
* 10002 -> api-gateway-g5 -> api-gateway-test-06-provider
*/
@PostMapping(value = "distributionGatewayServerNode", produces = "application/json;charset=utf-8")
public Result<Boolean> distributionGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId, @RequestParam String systemId) {
try {
configManageService.distributionGatewayServerNode(groupId, gatewayId, systemId);
logger.info("网关算力与系统挂载配置成功。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId);
return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true);
} catch (DuplicateKeyException e) {
logger.warn("网关算力与系统挂载配置失败,唯一索引冲突。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
return new Result<>(ResponseCode.INDEX_DUP.getCode(), ResponseCode.INDEX_DUP.getInfo(), true);
} catch (Exception e) {
logger.error("网关算力与系统挂载配置异常。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e);
return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false);
}
}

供外部手动绑定网关与 RPC 应用服务启动时自动绑定网关

SDK 上报开关以及指定绑定的网关

GatewaySDKServiceProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cn.ray.gateway.sdk.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* @author Ray
* @date 2023/5/29 15:26
* @description 应用配置
*/
@ConfigurationProperties(prefix = "api-gateway-sdk")
public class GatewaySDKServiceProperties {

/** 网关注册中心地址 */
private String address;
/** 系统标识 */
private String systemId;
/** 系统名称 */
private String systemName;
/** RPC 注册中心;zookeeper://127.0.0.1:2181*/
private String systemRegistry;
/** 对应的网关算力分组 */
private String groupId;
/** 对应的网关算力 */
private String gatewayId;
/** 是否启动接口上报 */
private boolean enabled = true;

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public String getSystemId() {
return systemId;
}

public void setSystemId(String systemId) {
this.systemId = systemId;
}

public String getSystemName() {
return systemName;
}

public void setSystemName(String systemName) {
this.systemName = systemName;
}

public String getSystemRegistry() {
return systemRegistry;
}

public void setSystemRegistry(String systemRegistry) {
this.systemRegistry = systemRegistry;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public String getGatewayId() {
return gatewayId;
}

public void setGatewayId(String gatewayId) {
this.gatewayId = gatewayId;
}

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
}

GatewayCenterService 增加向网关中心绑定网关的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void doDistributionGatewayServerNode(String address, String groupId, String gatewayId, String systemId) {
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("groupId",groupId);
paramMap.put("gatewayId",gatewayId);
paramMap.put("systemId", systemId);
String resultStr;
try {
resultStr = HttpUtil.post(address + "/wg/admin/config/distributionGatewayServerNode", paramMap, 2000);
} catch (Exception e) {
logger.error("RPC应用服务分配异常,链接资源不可用:{}", address + "/wg/admin/config/distributionGatewayServerNode");
throw e;
}
Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() {
});
logger.info("RPC应用服务分配成功 systemId:{} 注册结果:{}", systemId, resultStr);
if (!"0000".equals(result.getCode()))
throw new GatewayException("RPC应用服务分配异常 [systemId:" + systemId + "] ");
}

GatewaySDKApplication 在系统信息写入之后,绑定到指定网关

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
ApiProducerClazz apiProducerClazz = bean.getClass().getAnnotation(ApiProducerClazz.class);
if (null == apiProducerClazz) {
return bean;
}
// 1. 系统信息
logger.info("\n应用注册:系统信息 \nsystemId: {} \nsystemName: {} \nsystemType: {} \nsystemRegistry: {}", properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry());
gatewayCenterService.doRegisterApplication(properties.getAddress(), properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry());
logger.info("\n应用分配:系统信息 \nsystemId: {} \nsystemName: {} \n网关信息 \n groupId: {} \ngatewayId: {}", properties.getSystemId(), properties.getSystemName(), properties.getGroupId(), properties.getGatewayId());
gatewayCenterService.doDistributionGatewayServerNode(properties.getAddress(), properties.getGroupId(), properties.getGatewayId(), properties.getSystemId());
…………

GatewaySDKAutoConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.ray.gateway.sdk.config;

import cn.ray.gateway.sdk.application.GatewaySDKApplication;
import cn.ray.gateway.sdk.domain.service.GatewayCenterService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Ray
* @date 2023/5/29 15:48
* @description 网关SDK配置服务
*/
@Configuration
@EnableConfigurationProperties(GatewaySDKServiceProperties.class)
@ConditionalOnProperty(
prefix = "api-gateway-sdk",
name = "enabled",
havingValue = "true",
matchIfMissing = true
)
public class GatewaySDKAutoConfig {

@Bean
public GatewayCenterService gatewayCenterService() {
return new GatewayCenterService();
}

@Bean
public GatewaySDKApplication gatewaySDKApplication(GatewaySDKServiceProperties properties, GatewayCenterService gatewayCenterService) {
return new GatewaySDKApplication(properties, gatewayCenterService);
}

}

这是一个Spring Boot配置类,用于配置网关SDK服务。它包括以下内容:

  • @Configuration注解:表明这是一个配置类,用于定义Spring Bean。
  • @EnableConfigurationProperties(GatewaySDKServiceProperties.class)注解:用于启用GatewaySDKServiceProperties类的自动配置属性功能,将配置属性注入到Spring Bean中。
  • @ConditionalOnProperty注解:用于在应用程序启动时检查是否存在具有指定属性的属性源。
  • @Bean注解:用于定义Spring Bean。

在这个配置类中,定义了两个Spring Bean:

  • GatewayCenterService:用于处理网关服务中心的业务逻辑。
  • GatewaySDKApplication:用于启动网关SDK应用程序,并将GatewaySDKServiceProperties类和GatewayCenterService类注入到应用程序中。

api-gateway-sdk.enabled属性存在且值为true时,这个配置类才会生效,否则将不会创建这两个Spring Bean。

通信层添加反馈信息

RequestParser 增加判断请求头是否存在 Test 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean isTest() {

// `Optional`类型是Java 8中引入的用于表示可能为空的值的容器类型。
// 通过使用`Optional`,可以更好地处理可能出现`null`的情况,避免空指针异常。
//1. `request.headers()`:`request`是一个请求对象,这里调用了`headers()`方法来获取请求头。
//2. `.entries()`:这个方法用于将请求头转换为一个包含键值对的集合,每个键值对表示一个请求头的键和值。
//3. `.stream()`:将集合转换为一个流,以便进行后续的操作。
//4. `.filter(...)`:这里使用`filter`方法对流中的元素进行筛选。筛选条件是键名等于"Content-Type"的元素。
//5. `val -> val.getKey().equals("Content-Type")`:这是一个Lambda表达式,它表示筛选条件。`val`代表流中的元素,这里指的是请求头的一个键值对。`val.getKey()`返回键的值,然后与"Content-Type"进行比较。
//6. `.findAny()`:找到任意一个满足筛选条件的元素。如果存在符合条件的元素,则返回一个`Optional`对象,否则返回空的`Optional`对象。
//最终,`header`变量将包含满足条件的键值对,如果存在的话。
Optional<Map.Entry<String, String>> header = request.headers().entries().stream().filter(
val -> val.getKey().equals("Test")
).findAny();

// 如果 header 存在 Content-Type ,那么该值将被返回;如果header为空,那么null将被返回。
Map.Entry<String, String> entry = header.orElse(null);
return null != entry;
}

ProtocolDataHandler 如果请求头中存在 Test ,返回结果带上IP端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求【 消息 】 uri:{} method:{}", request.uri(), request.method());

try {
// 1. 解析请求参数
RequestParser requestParser = new RequestParser(request);
String uri = requestParser.getUri();
if (null == uri) return;
Map<String, Object> args = requestParser.parse();

// 2. 调用会话服务
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
IGenericReference reference = gatewaySession.getMapper();
SessionResult result = reference.$invoke(args);

if (requestParser.isTest()) {
result.setNode(node());
}
// 3. 封装返回结果
DefaultFullHttpResponse response = ResponseParser.parse("0000".equals(result.getCode()) ? GatewayResult.buildSuccess(result.getData(), result.getNode()) : GatewayResult.buildError(AgreementConstants.ResponseCode._404.getCode(), "网关协议调用失败!"));
channel.writeAndFlush(response);
} catch (Exception e) {
// 4. 封装返回结果
DefaultFullHttpResponse response = ResponseParser.parse(GatewayResult.buildError(AgreementConstants.ResponseCode._502.getCode(), "【消息】网关协议调用失败!" + e.getMessage()));
channel.writeAndFlush(response);
}
}

private String node() {
return gatewaySessionFactory.getConfiguration().getHostName() + ":" + gatewaySessionFactory.getConfiguration().getPort();
}

这样方便研发人员测试负载均衡的功能,也不让外部得到服务的IP端口

测试

api-gateway-test#application.yml

1
2
3
4
5
6
7
8
api-gateway-sdk:
address: http://127.0.0.1:8901 # 注册中心;从这里获取接口信息以及完成注册网关操作
systemId: api-gateway-test
systemName: 网关SDK测试工程
systemRegistry: zookeeper://10.20.111.132:2181
groupId: 10001
gateway-id: api-gateway-g3
enabled: true

api-gateway-test

29-测试-1

gateway_distribution

29-测试-2

http://10.20.111.132:8090/10001/wg/activity/sayHi?str=10001

没有 Test 请求头

29-测试-3

存在 Test 请求头

29-测试-4

思考

感觉这里的 systemId 应该以 groupId 进行分配,因为这里如果一个 group 下有两个 gateway ,此时这个系统只与其中一个 gateway 绑定,那么负载均衡的时候就会导致同组的另一个网关虽然也被路由到了,但是由于并没有进行配置映射,导致请求失败。(可以在 RPC 应用服务向网关系统分配时,查询相同groupId 的gateway进行分配,不过之后可能Redis发布订阅时, 订阅 topic 需要由 gatewayId 替换为 groupId 。

29-思考-1

29-思考-2

29-思考-3

29-思考-4

当然也可以把这里的 group 认为是一个系统而不是一个网关集群,相同的 gatewayId 认为是一个集群,这样或许说得通。

目前存在的优化点:

网关算力:

  1. 目前 Netty 功能主要是http -> dubbo泛化调用,可以同时支持多协议请求转发
  2. 目前采用是无池连接,可以新增连接池,支持配置
  3. 鉴权的修改调整,感觉可以拓展增加个鉴权链条或者策略,支持动态配置、多维度校验、多方式校验
  4. 目前 core 核心模块中 Dubbo 的泛化调用主要是用 zookeeper,可以考虑加入其他注册中心的支持比如 nacos,而且还可以加入更丰富的泛化调用参数支持。

接口上报 SDK:

  1. 每次扫描一个bean 就 HTTP 请求了几次 center,可以改成扫描后缓存,在容器完成后一次性注册,或者改为 MQ 形式进行异步通知(整体项目内有多种类似场景,如循环查库这种也算)
  2. 可以考虑加入engine、center 节点的状态监控处理。
  3. 心跳、请求重试、Netty 的重启机制。