项目笔记API网关二十九 功能完善(算力关联、接口上报、调用反馈)
hiriki在网关系统合并的工程下,拉取分支完善功能流程。包括:算力关联、接口上报、调用反馈。
设计
本章的扩展功能主要从几个方面来考虑:
网关的注册中心需要提供一个网关算力与RPC服务的分配关系。
groupld —1vn-> gatewayld —1vn—> systemld
- 10001 -> api-gateway-g3 -> api-gateway-test-01-provider
- 10001 -> api-gateway-g3 -> api-gateway-test-02-provider
- 10001 -> api-gateway-g4 -> api-gateway-test-03-provider
- 10001 -> api-gateway-g4 -> api-gateway-test-04-provider
- 10002 -> api-gateway-g5 -> api-gateway-test-05-provider
- 10002 -> api-gateway-g5 -> api-gatewav-test-06-provider
RPC 应用上报的 SDK 中需要添加一个开关,是否允许上报。这样可以更方便的从应用中摘除 SDK 的功能,而不需要注释掉多行代码,更便于测试。
核心通信组件 core 中需要在返回的通信协议中携带上是网关算力的地址,这样可以更方便看到负载起的作用。
实现
center 提供匹配接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@PostMapping(value = "distributionGatewayServerNode", produces = "application/json;charset=utf-8") public Result<Boolean> distributionGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId, @RequestParam String systemId) { try { configManageService.distributionGatewayServerNode(groupId, gatewayId, systemId); logger.info("网关算力与系统挂载配置成功。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true); } catch (DuplicateKeyException e) { logger.warn("网关算力与系统挂载配置失败,唯一索引冲突。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e); return new Result<>(ResponseCode.INDEX_DUP.getCode(), ResponseCode.INDEX_DUP.getInfo(), true); } catch (Exception e) { logger.error("网关算力与系统挂载配置异常。groupId:{} gatewayId:{} systemId:{}", groupId, gatewayId, systemId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
|
供外部手动绑定网关与 RPC 应用服务启动时自动绑定网关
SDK 上报开关以及指定绑定的网关
GatewaySDKServiceProperties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| package cn.ray.gateway.sdk.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "api-gateway-sdk") public class GatewaySDKServiceProperties {
private String address; private String systemId; private String systemName; private String systemRegistry; private String groupId; private String gatewayId; private boolean enabled = true;
public String getAddress() { return address; }
public void setAddress(String address) { this.address = address; }
public String getSystemId() { return systemId; }
public void setSystemId(String systemId) { this.systemId = systemId; }
public String getSystemName() { return systemName; }
public void setSystemName(String systemName) { this.systemName = systemName; }
public String getSystemRegistry() { return systemRegistry; }
public void setSystemRegistry(String systemRegistry) { this.systemRegistry = systemRegistry; }
public String getGroupId() { return groupId; }
public void setGroupId(String groupId) { this.groupId = groupId; }
public String getGatewayId() { return gatewayId; }
public void setGatewayId(String gatewayId) { this.gatewayId = gatewayId; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; } }
|
GatewayCenterService 增加向网关中心绑定网关的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void doDistributionGatewayServerNode(String address, String groupId, String gatewayId, String systemId) { Map<String, Object> paramMap = new HashMap<>(); paramMap.put("groupId",groupId); paramMap.put("gatewayId",gatewayId); paramMap.put("systemId", systemId); String resultStr; try { resultStr = HttpUtil.post(address + "/wg/admin/config/distributionGatewayServerNode", paramMap, 2000); } catch (Exception e) { logger.error("RPC应用服务分配异常,链接资源不可用:{}", address + "/wg/admin/config/distributionGatewayServerNode"); throw e; } Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() { }); logger.info("RPC应用服务分配成功 systemId:{} 注册结果:{}", systemId, resultStr); if (!"0000".equals(result.getCode())) throw new GatewayException("RPC应用服务分配异常 [systemId:" + systemId + "] "); }
|
GatewaySDKApplication 在系统信息写入之后,绑定到指定网关
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { ApiProducerClazz apiProducerClazz = bean.getClass().getAnnotation(ApiProducerClazz.class); if (null == apiProducerClazz) { return bean; } logger.info("\n应用注册:系统信息 \nsystemId: {} \nsystemName: {} \nsystemType: {} \nsystemRegistry: {}", properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry()); gatewayCenterService.doRegisterApplication(properties.getAddress(), properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry()); logger.info("\n应用分配:系统信息 \nsystemId: {} \nsystemName: {} \n网关信息 \n groupId: {} \ngatewayId: {}", properties.getSystemId(), properties.getSystemName(), properties.getGroupId(), properties.getGatewayId()); gatewayCenterService.doDistributionGatewayServerNode(properties.getAddress(), properties.getGroupId(), properties.getGatewayId(), properties.getSystemId()); …………
|
GatewaySDKAutoConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package cn.ray.gateway.sdk.config;
import cn.ray.gateway.sdk.application.GatewaySDKApplication; import cn.ray.gateway.sdk.domain.service.GatewayCenterService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration @EnableConfigurationProperties(GatewaySDKServiceProperties.class) @ConditionalOnProperty( prefix = "api-gateway-sdk", name = "enabled", havingValue = "true", matchIfMissing = true ) public class GatewaySDKAutoConfig {
@Bean public GatewayCenterService gatewayCenterService() { return new GatewayCenterService(); }
@Bean public GatewaySDKApplication gatewaySDKApplication(GatewaySDKServiceProperties properties, GatewayCenterService gatewayCenterService) { return new GatewaySDKApplication(properties, gatewayCenterService); }
}
|
这是一个Spring Boot配置类,用于配置网关SDK服务。它包括以下内容:
@Configuration
注解:表明这是一个配置类,用于定义Spring Bean。
@EnableConfigurationProperties(GatewaySDKServiceProperties.class)
注解:用于启用GatewaySDKServiceProperties
类的自动配置属性功能,将配置属性注入到Spring Bean中。
@ConditionalOnProperty
注解:用于在应用程序启动时检查是否存在具有指定属性的属性源。
@Bean
注解:用于定义Spring Bean。
在这个配置类中,定义了两个Spring Bean:
GatewayCenterService
:用于处理网关服务中心的业务逻辑。
GatewaySDKApplication
:用于启动网关SDK应用程序,并将GatewaySDKServiceProperties
类和GatewayCenterService
类注入到应用程序中。
当api-gateway-sdk.enabled
属性存在且值为true
时,这个配置类才会生效,否则将不会创建这两个Spring Bean。
通信层添加反馈信息
RequestParser 增加判断请求头是否存在 Test 的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean isTest() {
Optional<Map.Entry<String, String>> header = request.headers().entries().stream().filter( val -> val.getKey().equals("Test") ).findAny();
Map.Entry<String, String> entry = header.orElse(null); return null != entry; }
|
ProtocolDataHandler 如果请求头中存在 Test ,返回结果带上IP端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Override protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) { logger.info("网关接收请求【 消息 】 uri:{} method:{}", request.uri(), request.method());
try { RequestParser requestParser = new RequestParser(request); String uri = requestParser.getUri(); if (null == uri) return; Map<String, Object> args = requestParser.parse();
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri); IGenericReference reference = gatewaySession.getMapper(); SessionResult result = reference.$invoke(args);
if (requestParser.isTest()) { result.setNode(node()); } DefaultFullHttpResponse response = ResponseParser.parse("0000".equals(result.getCode()) ? GatewayResult.buildSuccess(result.getData(), result.getNode()) : GatewayResult.buildError(AgreementConstants.ResponseCode._404.getCode(), "网关协议调用失败!")); channel.writeAndFlush(response); } catch (Exception e) { DefaultFullHttpResponse response = ResponseParser.parse(GatewayResult.buildError(AgreementConstants.ResponseCode._502.getCode(), "【消息】网关协议调用失败!" + e.getMessage())); channel.writeAndFlush(response); } }
private String node() { return gatewaySessionFactory.getConfiguration().getHostName() + ":" + gatewaySessionFactory.getConfiguration().getPort(); }
|
这样方便研发人员测试负载均衡的功能,也不让外部得到服务的IP端口
测试
api-gateway-test#application.yml
1 2 3 4 5 6 7 8
| api-gateway-sdk: address: http://127.0.0.1:8901 systemId: api-gateway-test systemName: 网关SDK测试工程 systemRegistry: zookeeper://10.20.111.132:2181 groupId: 10001 gateway-id: api-gateway-g3 enabled: true
|
api-gateway-test
gateway_distribution
http://10.20.111.132:8090/10001/wg/activity/sayHi?str=10001
没有 Test 请求头
存在 Test 请求头
思考
感觉这里的 systemId 应该以 groupId 进行分配,因为这里如果一个 group 下有两个 gateway ,此时这个系统只与其中一个 gateway 绑定,那么负载均衡的时候就会导致同组的另一个网关虽然也被路由到了,但是由于并没有进行配置映射,导致请求失败。(可以在 RPC 应用服务向网关系统分配时,查询相同groupId 的gateway进行分配,不过之后可能Redis发布订阅时, 订阅 topic 需要由 gatewayId 替换为 groupId 。
当然也可以把这里的 group 认为是一个系统而不是一个网关集群,相同的 gatewayId 认为是一个集群,这样或许说得通。
目前存在的优化点:
网关算力:
- 目前 Netty 功能主要是http -> dubbo泛化调用,可以同时支持多协议请求转发
- 目前采用是无池连接,可以新增连接池,支持配置
- 鉴权的修改调整,感觉可以拓展增加个鉴权链条或者策略,支持动态配置、多维度校验、多方式校验
- 目前 core 核心模块中 Dubbo 的泛化调用主要是用 zookeeper,可以考虑加入其他注册中心的支持比如 nacos,而且还可以加入更丰富的泛化调用参数支持。
接口上报 SDK:
- 每次扫描一个bean 就 HTTP 请求了几次 center,可以改成扫描后缓存,在容器完成后一次性注册,或者改为 MQ 形式进行异步通知(整体项目内有多种类似场景,如循环查库这种也算)
- 可以考虑加入engine、center 节点的状态监控处理。
- 心跳、请求重试、Netty 的重启机制。