通过 Redis 发布和订阅的模块,在网关注册中心和网关助手中分别提供相关功能,在应用接口注册到网关中心后触发通知,让网关助手完成新增接口的映射处理。
设计
这一章节的开发有一个需求的背景,我们的网关算力服务在启动过程中会拉取注册中心的接口信息,到网关算法上进行注册操作。通过这样的一个步骤,才能让我们在访问网关接口的时候,泛化调用到对应的 RPC 服务。
那么,当网关算力服务已经是部署好后,再有新的服务或者接口注册到网关注册中心的时候,那么网关算力该如何把这些信息获取到并完成网关的映射呢?
可以通过几个方案来处理;
接口的轮询,在网关算力 api-gateway-assist 助手服务中通过不断的像网关中心请求接口的方式,拉取到所有需要被注册的接口。这里可以在已经拉取的服务接口上,在Redis中做数据的记录,减少重复拉取。不过这样的方式会给网关中心带来不小的压力。
网关引擎在Spring生命周期初始化中,另起一个线程,在后台while(true)不断重复拉取所有服务,并存入缓存。
显然不论是对于网关引擎还是注册中心都消耗占用不少资源。
服务的连接,在网关助手类与网关中心建立一个 Netty 的服务,由网关中心接收到新的接口注册时候进行信息通知。但这样的长链接,已经会占用不少的资源。
网关引擎再启动一个 Netty 服务端(监听端口不能和己有网关通信服务监听端口相同),注册中心启动一个 Netty 客户端和 engine 建立连接,每次有新的服务接口注册进来后,通过管道 writeAndFlush(systemld) 通知服务端,拉取注册新的接口。但维持这样的长链接也会占用不少资源。
事件的通知,其实通过事件来通知是一个比较合适的方式,比如你可以使用 MQ、ZK等方式,也可以使用 Redis 的发布和订阅。在有接口变化的时候,可以通过消息的推送,让网关算力获取到变化的接口信息进行注册处理。那么这里就是通过 Redis 的方式进行处理。
最佳实践:使用事件发布订阅机制异步进行,比如 Redis 、MQ,这里网关引擎与注册中心采用 Redis 的发布订阅模式进行通信。新的应用接口启动注册后,触发注册中心的事件发布机制,gateway-center 向网关引擎推送新注册系统的信息,gateway-assist 收到后根据 systemId 查询对应的接口信息进行映射。
此处每启动一个新的 Rpc 应用服务注册进注册中心时,都是以 systemld 为单位,assist 注册时需要保存应用下的所有接口+所有方法。因此事件推送时仅需要传递 systemld 即可。而后续如果细化到只注册某个方法时,addmapper也需要细化拆分注册不同的模块,根据不同的信息进行不同的映射处理。(后期这里或许可以考虑策略模式?
在 api-gateway-center 工程中添加 Redis 发布消息模块,并提供应用服务注册后的事件通知操作。这个通知只会通知给对应的网关算力服务,不会全局通知。
在 api-gateway-assist 工程中开发 Redis 订阅消息模块,当收到注册中心的消息推送时,则根据系统的标识信息进行拉取服务。注意这里你可以进行细化,只把变更的信息一条条推送给网关注册,减少接口的拉取。
(这里可以在 sdk 注册接口时,向注册中心查询判断是否该接口已经注册,存在于数据库中,如果是新注册就将该接口的信息推送给注册中心,注册中心再将该消息发布,对应的网关引擎监听到该信息后,根据接口信息向网关通信组件 core 中的 Configuration 注册映射。
在 api-gateway-sdk 工程中添加对网关注册中心接口的调用,当所有的服务注册完成后,调用接口进行通知。
实现
注册中心
消息发布
在 api-gateway-center 注册中心服务中提供 Redis 的发布模块,并在应用服务注册接口后调用该方法。
添加依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.6.RELEASE</version> </dependency>
|
application.yml 配置
1 2 3 4
| spring: redis: host: localhost port: 6379
|
消息监听推送配置 PublisherConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package cn.ray.gateway.center.domain.message.config;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate;
@Configuration public class PublisherConfig {
@Bean public RedisTemplate<String, Object> redisMessageTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); template.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class)); return template; }
}
|
这是一个 Redis 消息监听推送配置类。它使用了Spring的注解@Configuration,表示这是一个配置类。
@Configuration 注解告诉 Spring 容器,这个类中定义了一些 Bean 对象的配置。在这个类中定义了一个名为 redisMessageTemplate 的 Bean。
@Bean 注解告诉 Spring 容器,这个方法将返回一个对象,该对象将被注册为一个 Bean ,并且可以被其他类自动注入。
方法 redisMessageTemplate 的参数是 RedisConnectionFactory ,表示需要从 Spring 容器中获取一个 Redis 连接工厂对象作为参数。
在方法内部,首先创建了一个RedisTemplate对象,并将 RedisConnectionFactory 对象设置为它的连接工厂。
RedisConnectionFactory 是 Spring Data Redis 提供的一个接口,用于获取 Redis 连接的工厂类。它可以通过配置来创建实例,或者通过自动配置的方式从应用程序的上下文中获取。
在 Spring Boot 应用程序中,可以通过配置文件或者编程方式来配置 Redis 连接工厂。通常,可以在 application.properties 或 application.yml 文件中配置 Redis 连接信息,例如 Redis 的主机名、端口号、密码等。Spring Boot 会自动读取这些配置,并根据配置创建一个 RedisConnectionFactory 的实例。
另外,还可以通过编程方式来配置 RedisConnectionFactory。在这种情况下,可以在 Spring 配置类中手动创建 RedisConnectionFactory 对象,并设置连接相关的属性,例如主机名、端口号、密码等。
总结起来,RedisConnectionFactory 可以通过配置文件或者编程方式进行配置,从而创建一个可以用于获取 Redis 连接的工厂实例。具体的配置方式可以根据项目的需求和使用的框架来确定。
然后,通过调用 template 的setDefaultSerializer方法,将 FastJsonRedisSerializer 设置为默认的序列化器。FastJsonRedisSerializer 是一个用于将对象序列化为JSON格式的序列化器。
最后,方法返回了 RedisTemplate 对象。
这个配置类的作用是创建一个 RedisTemplate 对象,并设置了默认的序列化器,以便在使用 RedisTemplate 进行消息监听推送时,能够将对象以 JSON 格式序列化存储到Redis中。
消息发布者 Publisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package cn.ray.gateway.center.domain.message;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;
@Service public class Publisher {
@Autowired private final RedisTemplate<String, Object> redisMessageTemplate;
public Publisher(RedisTemplate<String, Object> redisMessageTemplate) { this.redisMessageTemplate = redisMessageTemplate; }
public void pushMessage(String topic, Object message) { redisMessageTemplate.convertAndSend(topic, message); } }
|
这是一个消息发布者,用于向 Redis 消息队列发送消息。它使用了 Spring 框架的 RedisTemplate 来操作 Redis,其中 redisMessageTemplate 是注入的 RedisTemplate 实例。pushMessage 方法用于将消息发送到指定的主题(topic),并且可以传递任何类型的消息(message)。在调用 redisMessageTemplate.convertAndSend 方法时,它会将消息转换为Redis支持的格式,并将其发送到指定的主题。
MessageServiceImpl 消息服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package cn.ray.gateway.center.domain.message;
import cn.ray.gateway.center.application.IMessageService; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
@Service public class MessageServiceImpl implements IMessageService {
@Value("${spring.redis.host}") private String host;
@Value("${spring.redis.port}") private Integer port;
@Resource private Publisher publisher;
@Override public Map<String, String> queryRedisConfig() { Map<String, String> config = new HashMap<>(); config.put("host",host); config.put("port",String.valueOf(port)); return config; }
@Override public void pushMessage(String gatewayId, Object message) { publisher.pushMessage(gatewayId, message); } }
|
这里包装 Redis 所在IP端口供外部查询获取,方便网关引擎配置,而不用手动去yml文件中配置,减少了用户的操作,也统一了 Redis 配置。让注册中心真正作为了整个 API 网关的中转站、配置中心。另外也向外提供了推送消息的服务。
获取 Redis 配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package cn.ray.gateway.center.interfaces;
import cn.ray.gateway.center.application.IConfigManageService; import cn.ray.gateway.center.application.IMessageService; import cn.ray.gateway.center.domain.manage.model.aggregates.ApplicationSystemRichInfo; import cn.ray.gateway.center.domain.manage.model.vo.GatewayServerVO; import cn.ray.gateway.center.infrastructure.common.ResponseCode; import cn.ray.gateway.center.infrastructure.common.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import java.util.List; import java.util.Map;
@RestController @RequestMapping("/wg/admin/config") public class GatewayConfigManage {
private Logger logger = LoggerFactory.getLogger(GatewayConfigManage.class);
@Resource private IConfigManageService configManageService;
@Resource private IMessageService messageService;
@GetMapping(value = "queryServerConfig", produces = "application/json;charset=utf-8") public Result<List<GatewayServerVO>> queryServerConfig() { try { logger.info("查询网关服务配置项信息"); List<GatewayServerVO> gatewayServerVOList = configManageService.queryGatewayServerList(); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), gatewayServerVOList); } catch (Exception e) { logger.error("查询网关服务配置项信息异常", e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), null); } }
@PostMapping(value = "registerGateway") public Result<Boolean> registerGatewayServerNode(@RequestParam String groupId, @RequestParam String gatewayId, @RequestParam String gatewayName, @RequestParam String gatewayAddress) { try { logger.info("注册网关服务节点 gatewayId:{} gatewayName:{} gatewayAddress:{}", gatewayId, gatewayName, gatewayAddress); boolean isSuccess = configManageService.registerGatewayServerNode(groupId, gatewayId, gatewayName, gatewayAddress); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), isSuccess); } catch (Exception e) { logger.error("注册网关服务节点异常", e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
@PostMapping(value = "queryApplicationSystemRichInfo", produces = "application/json;charset=utf-8") public Result<ApplicationSystemRichInfo> queryApplicationSystemRichInfo(@RequestParam String gatewayId, @RequestParam String systemId) { try { logger.info("查询分配到网关下的待注册系统信息(系统、接口、方法) gatewayId:{}", gatewayId); ApplicationSystemRichInfo applicationSystemRichInfo = configManageService.queryApplicationSystemRichInfo(gatewayId, systemId); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), applicationSystemRichInfo); } catch (Exception e) { logger.error("查询分配到网关下的待注册系统信息(系统、接口、方法)异常 gatewayId:{}", gatewayId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), null); } }
@PostMapping(value = "queryRedisConfig", produces = "application/json;charset=utf-8") public Result<Map<String, String>> queryRedisConfig() { try { Map<String, String> redisConfig = messageService.queryRedisConfig(); logger.info("查询配置中心Redis配置信息成功"); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), redisConfig); } catch (Exception e) { logger.error("查询配置中心Redis配置信息失败", e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), null); } } }
|
事件通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| package cn.ray.gateway.center.interfaces;
import cn.ray.gateway.center.application.IConfigManageService; import cn.ray.gateway.center.application.IMessageService; import cn.ray.gateway.center.application.IRegisterManageService; import cn.ray.gateway.center.domain.register.model.vo.ApplicationInterfaceMethodVO; import cn.ray.gateway.center.domain.register.model.vo.ApplicationInterfaceVO; import cn.ray.gateway.center.domain.register.model.vo.ApplicationSystemVO; import cn.ray.gateway.center.infrastructure.common.Constants; import cn.ray.gateway.center.infrastructure.common.ResponseCode; import cn.ray.gateway.center.infrastructure.common.Result; import cn.ray.gateway.center.infrastructure.pojo.ApplicationSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DuplicateKeyException; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController @RequestMapping("/wg/admin/register") public class RPCRegisterManage {
private Logger logger = LoggerFactory.getLogger(RPCRegisterManage.class);
@Resource private IRegisterManageService registerManageService;
@Resource private IConfigManageService configManageService;
@Resource private IMessageService messageService;
@PostMapping(value = "registerApplication", produces = "application/json;charset=utf-8") public Result<Boolean> registerApplication(@RequestParam String systemId, @RequestParam String systemName, @RequestParam String systemType, @RequestParam String systemRegistry) { try { logger.info("注册应用服务 systemId:{}", systemId); ApplicationSystemVO applicationSystemVO = new ApplicationSystemVO(); applicationSystemVO.setSystemId(systemId); applicationSystemVO.setSystemName(systemName); applicationSystemVO.setSystemType(systemType); applicationSystemVO.setSystemRegistry(systemRegistry); registerManageService.registerApplication(applicationSystemVO); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true); } catch (DuplicateKeyException e) { logger.warn("注册应用服务重复 systemId:{}", systemId, e); return new Result<>(ResponseCode.INDEX_DUP.getCode(), e.getMessage(), true); } catch (Exception e) { logger.error("注册应用服务失败 systemId:{}", systemId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
@PostMapping(value = "registerApplicationInterface", produces = "application/json;charset=utf-8") public Result<Boolean> registerApplicationInterface(@RequestParam String systemId, @RequestParam String interfaceId, @RequestParam String interfaceName, @RequestParam String interfaceVersion) { try { logger.info("注册应用接口 systemId:{} interfaceId:{}", systemId, interfaceId); ApplicationInterfaceVO applicationInterfaceVO = new ApplicationInterfaceVO(); applicationInterfaceVO.setSystemId(systemId); applicationInterfaceVO.setInterfaceId(interfaceId); applicationInterfaceVO.setInterfaceName(interfaceName); applicationInterfaceVO.setInterfaceVersion(interfaceVersion); registerManageService.registerApplicationInterface(applicationInterfaceVO); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true); } catch (DuplicateKeyException e) { logger.warn("注册应用接口重复 systemId:{} interfaceId:{}", systemId, interfaceId); return new Result<>(ResponseCode.INDEX_DUP.getCode(), e.getMessage(), true); } catch (Exception e) { logger.error("注册应用接口失败 systemId:{} interfaceId:{}", systemId, interfaceId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
@PostMapping(value = "registerApplicationInterfaceMethod", produces = "application/json;charset=utf-8") public Result<Boolean> registerApplicationInterfaceMethod(@RequestParam String systemId, @RequestParam String interfaceId, @RequestParam String methodId, @RequestParam String methodName, @RequestParam String parameterType, @RequestParam String uri, @RequestParam String httpCommandType, @RequestParam Integer auth) { try { logger.info("注册应用接口方法 systemId:{} interfaceId:{} methodId:{}", systemId, interfaceId, methodId); ApplicationInterfaceMethodVO applicationInterfaceMethodVO = new ApplicationInterfaceMethodVO(); applicationInterfaceMethodVO.setSystemId(systemId); applicationInterfaceMethodVO.setInterfaceId(interfaceId); applicationInterfaceMethodVO.setMethodId(methodId); applicationInterfaceMethodVO.setMethodName(methodName); applicationInterfaceMethodVO.setParameterType(parameterType); applicationInterfaceMethodVO.setUri(uri); applicationInterfaceMethodVO.setHttpCommandType(httpCommandType); applicationInterfaceMethodVO.setAuth(auth); registerManageService.registerApplicationInterfaceMethod(applicationInterfaceMethodVO); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true); } catch (DuplicateKeyException e) { logger.warn("注册应用接口方法重复 systemId:{} interfaceId:{} methodId:{}", systemId, interfaceId, methodId); return new Result<>(ResponseCode.INDEX_DUP.getCode(), e.getMessage(), true); } catch (Exception e) { logger.error("注册应用接口方法 systemId:{} interfaceId:{} methodId:{}", systemId, interfaceId, methodId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } }
@PostMapping(value = "registerEvent", produces = "application/json;charset=utf-8") public Result<Boolean> registerEvent(@RequestParam String systemId) { try { String gatewayId = configManageService.queryGatewayDistribution(systemId); messageService.pushMessage(gatewayId, systemId); logger.info("应用信息注册完成通知成功 systemId:{}", systemId); return new Result<>(ResponseCode.SUCCESS.getCode(), ResponseCode.SUCCESS.getInfo(), true); } catch (Exception e) { logger.error("应用信息注册完成通知失败 systemId:{}", systemId, e); return new Result<>(ResponseCode.UN_ERROR.getCode(), e.getMessage(), false); } } }
|
这里通了一个注册事件,既可以被 api-gateway-sdk 使用,也可以被运用平台调用,进行消息推送,让网关服务拉取最新的系统信息。
网关助手
为了能收到来自注册中心的消息推送,那么网关助手服务也需要连接到对应的 Redis 服务上。那么这里并不需要用户在网关助手类中配置 Redis 的连接信息,而是通过优化的方式从注册中心拉取 Redis 连接信息,到网关助手中进行注册。因为只有这样,才能减少用户的配置,并做到统一的管理。
连接服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Bean public RedisConnectionFactory redisConnectionFactory(GatewayServiceProperties properties, GatewayCenterService gatewayCenterService) { Map<String, String> redisConfig = gatewayCenterService.queryRedisConfig(properties.getAddress()); RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); standaloneConfig.setHostName(redisConfig.get("host")); standaloneConfig.setPort(Integer.parseInt(redisConfig.get("port"))); JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(100); poolConfig.setMaxWaitMillis(30 * 1000); poolConfig.setMinIdle(20); poolConfig.setMaxIdle(40); poolConfig.setTestWhileIdle(true); JedisClientConfiguration clientConfig = JedisClientConfiguration.builder() .connectTimeout(Duration.ofSeconds(2)) .clientName("api-gateway-assist-redis-" + properties.getGatewayId()) .usePooling().poolConfig(poolConfig).build(); return new JedisConnectionFactory(standaloneConfig, clientConfig); }
@Bean public MessageListenerAdapter msgAgreementListenerAdapter(GatewayAssistantApplication gatewayAssistantApplication) { return new MessageListenerAdapter(gatewayAssistantApplication, "receiveMessage"); }
@Bean public RedisMessageListenerContainer container(GatewayServiceProperties properties, RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter msgAgreementListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(msgAgreementListenerAdapter, new PatternTopic(properties.getGatewayId())); return container; }
|
redisConnectionFactory 方法用于创建Redis连接工厂,它通过调用 GatewayCenterService 的 queryRedisConfig 方法获取注册中心的 Redis 配置信息,然后根据该信息构建 Redis 服务的连接配置。JedisClientConfiguration是Redis客户端的配置类,用于配置 Redis 连接池的参数,如最大连接数、最大空闲连接数、连接超时时间等。注意:JedisPoolConfig 一般都是固定配置,也可以从注册中心进行拉取。
msgAgreementListenerAdapter 方法用于创建消息监听器适配器,它将 GatewayAssistantApplication 类中的 receiveMessage 方法作为消息监听器。
container 方法用于创建 Redis 消息监听容器,它使用 Redis 连接工厂和消息监听器适配器来创建一个Redis消息监听容器,并将其绑定到指定的主题(topic)上。
在这个例子中,它将容器绑定到了 GatewayServiceProperties 类中定义的网关ID作为主题上。这样,当有消息发送到该主题时,该容器就会接收到并调用对应的消息监听器处理该消息。
消息订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| package cn.ray.gateway.assist.applicaton;
import cn.ray.gateway.assist.config.GatewayServiceProperties; import cn.ray.gateway.assist.domain.model.aggregates.ApplicationSystemRichInfo; import cn.ray.gateway.assist.domain.model.vo.ApplicationInterfaceMethodVO; import cn.ray.gateway.assist.domain.model.vo.ApplicationInterfaceVO; import cn.ray.gateway.assist.domain.model.vo.ApplicationSystemVO; import cn.ray.gateway.assist.domain.service.GatewayCenterService; import cn.ray.gateway.core.mapping.HttpRequestType; import cn.ray.gateway.core.mapping.HttpStatement; import cn.ray.gateway.core.session.Configuration; import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent;
import java.util.List;
public class GatewayAssistantApplication implements ApplicationContextAware, ApplicationListener<ContextClosedEvent> {
private Logger logger = LoggerFactory.getLogger(GatewayAssistantApplication.class);
private GatewayServiceProperties properties;
private GatewayCenterService gatewayCenterService;
private Configuration configuration;
private Channel gatewaySocketServerChannel;
public GatewayAssistantApplication(GatewayServiceProperties properties, GatewayCenterService gatewayCenterService, Configuration configuration, Channel gatewaySocketServerChannel) { this.properties = properties; this.gatewayCenterService = gatewayCenterService; this.configuration = configuration; this.gatewaySocketServerChannel = gatewaySocketServerChannel; }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { try { gatewayCenterService.doRegister(properties.getAddress(), properties.getGroupId(), properties.getGatewayId(), properties.getGatewayName(), properties.getGatewayAddress());
addMappers(""); } catch (Exception e) { logger.error("网关服务启动失败,停止服务。{}", e.getMessage(), e); throw e; } } public void addMappers(String systemId) { ApplicationSystemRichInfo applicationSystemRichInfo = gatewayCenterService.pullApplicationSystemRichInfo(properties.getAddress(), properties.getGatewayId(), systemId);
List<ApplicationSystemVO> applicationSystemVOList = applicationSystemRichInfo.getApplicationSystemVOList(); if (applicationSystemVOList.isEmpty()) { logger.warn("网关{}服务注册映射为空,请排查 gatewayCenterService.pullApplicationSystemRichInfo 是否检索到此网关算力需要拉取的配置数据。", systemId); return; } for (ApplicationSystemVO system : applicationSystemVOList) { List<ApplicationInterfaceVO> interfaceList = system.getInterfaceList(); for (ApplicationInterfaceVO interfaceVO : interfaceList) { configuration.registerConfig(system.getSystemId(), system.getSystemRegistry(), interfaceVO.getInterfaceId(), interfaceVO.getInterfaceVersion()); List<ApplicationInterfaceMethodVO> methodList = interfaceVO.getMethodList(); for (ApplicationInterfaceMethodVO method : methodList) { HttpStatement httpStatement = new HttpStatement( system.getSystemId(), interfaceVO.getInterfaceId(), method.getMethodId(), method.getUri(), HttpRequestType.valueOf(method.getHttpCommandType()), method.getParameterType(), method.isAuth()); configuration.addMapper(httpStatement); logger.info("网关服务注册映射 系统:{} 接口:{} 方法:{}", system.getSystemId(), interfaceVO.getInterfaceId(), method.getMethodId()); } } } }
@Override public void onApplicationEvent(ContextClosedEvent contextClosedEvent) { try { if (gatewaySocketServerChannel.isActive()) { logger.info("应用容器关闭,Api网关服务关闭。localAddress:{}", gatewaySocketServerChannel.localAddress()); gatewaySocketServerChannel.close(); } } catch (Exception e) { logger.error("应用容器关闭,Api网关服务关闭失败", e); } }
public void receiveMessage(Object message) { logger.info("【事件通知】接收注册中心推送消息 message:{}", message); addMappers(message.toString().substring(1, message.toString().length() - 1)); }
}
|
将注册映射逻辑抽取出来作为单独的方法,通过这样的方式,现在我们拉取接口一方面是网关助手系统也就是网关引擎启动的时候进行拉取,另外一种是已经在运行时候,通过事件的通知进行拉取。
注意:这里的 pullApplicationSystemRichInfo 增加了参数 systemId,也就是 网关中心的 queryApplicationSystemRichInfo 方法增加了参数 systemId,
也就增加了一下逻辑
1 2 3 4 5 6
| if ("".equals(systemId)) { systemIdList = configManageRepository.queryGatewayDistributionSystemIdList(gatewayId); } else { systemIdList.add(systemId); }
|
第一次网关引擎初始化启动时,未指定 systemId,注册当前网关下的所有系统接口,之后就通过事件通知指定了 systemId,只注册指定系统下的接口。
网关SDK
事件通知注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| package cn.ray.gateway.sdk.domain.service;
import cn.hutool.http.HttpUtil; import cn.ray.gateway.sdk.GatewayException; import cn.ray.gateway.sdk.common.Result; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.Map;
public class GatewayCenterService {
private Logger logger = LoggerFactory.getLogger(GatewayCenterService.class);
public void doRegisterApplication(String address, String systemId, String systemName, String systemType, String systemRegistry) { Map<String, Object> paramMap = new HashMap<>(); paramMap.put("systemId", systemId); paramMap.put("systemName", systemName); paramMap.put("systemType", systemType); paramMap.put("systemRegistry", systemRegistry); String resultStr; try { resultStr = HttpUtil.post(address + "/wg/admin/register/registerApplication", paramMap, 2000); } catch (Exception e) { logger.error("应用服务注册异常,链接资源不可用:{}", address + "/wg/admin/register/registerApplication"); throw e; } Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() { }); logger.info("向网关中心注册应用服务 systemId:{} systemName:{} 注册结果:{}", systemId, systemName, resultStr); if (!"0000".equals(result.getCode()) && !"0003".equals(result.getCode())) { throw new GatewayException("注册应用服务异常 [systemId:" + systemId + "] 、[systemRegistry:" + systemRegistry + "]"); } }
public void doRegisterApplicationInterface(String address, String systemId, String interfaceId, String interfaceName, String interfaceVersion) { Map<String, Object> paramMap = new HashMap<>(); paramMap.put("systemId", systemId); paramMap.put("interfaceId", interfaceId); paramMap.put("interfaceName", interfaceName); paramMap.put("interfaceVersion", interfaceVersion); String resultStr; try { resultStr = HttpUtil.post(address + "/wg/admin/register/registerApplicationInterface", paramMap, 2000); } catch (Exception e) { logger.error("应用服务接口注册异常,链接资源不可用:{}", address + "/wg/admin/register/registerApplicationInterface"); throw e; } Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() { }); logger.info("向网关中心注册应用接口服务 systemId:{} interfaceId:{} interfaceName:{} 注册结果:{}", systemId, interfaceId, interfaceName, resultStr); if (!"0000".equals(result.getCode()) && !"0003".equals(result.getCode())) { throw new GatewayException("向网关中心注册应用接口服务异常 [systemId:" + systemId + "] 、[interfaceId:" + interfaceId + "]"); } }
public void doRegisterApplicationInterfaceMethod(String address, String systemId, String interfaceId, String methodId, String methodName, String parameterType, String uri, String httpCommandType, Integer auth) { Map<String, Object> paramMap = new HashMap<>(); paramMap.put("systemId", systemId); paramMap.put("interfaceId", interfaceId); paramMap.put("methodId", methodId); paramMap.put("methodName", methodName); paramMap.put("parameterType", parameterType); paramMap.put("uri", uri); paramMap.put("httpCommandType", httpCommandType); paramMap.put("auth", auth);
String resultStr; try { resultStr = HttpUtil.post(address + "/wg/admin/register/registerApplicationInterfaceMethod", paramMap, 2000); } catch (Exception e) { logger.error("应用服务接口注册方法异常,链接资源不可用:{}", address + "/wg/admin/register/registerApplicationInterfaceMethod"); throw e; } Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() { }); logger.info("向网关中心注册应用接口方法服务 systemId:{} interfaceId:{} methodId:{} 注册结果:{}", systemId, interfaceId, methodId, resultStr); if (!"0000".equals(result.getCode()) && !"0003".equals(result.getCode())) throw new GatewayException("向网关中心注册应用接口方法服务异常 [systemId:" + systemId + "] 、[interfaceId:" + interfaceId + "]、[methodId:]" + methodId + "]"); }
public void doRegisterEvent(String address, String systemId) { Map<String, Object> paramMap = new HashMap<>(); paramMap.put("systemId", systemId); String resultStr; try { resultStr = HttpUtil.post(address + "/wg/admin/register/registerEvent", paramMap, 2000); } catch (Exception e) { logger.error("应用信息注册完成通知异常,链接资源不可用:{}", address + "/wg/admin/register/registerEvent"); throw e; } Result<Boolean> result = JSON.parseObject(resultStr, new TypeReference<Result<Boolean>>() { }); logger.info("应用信息注册完成通知 systemId:{} 注册结果:{}", systemId, resultStr); if (!"0000".equals(result.getCode())) throw new GatewayException("向网关中心注册完成通知异常 [systemId:" + systemId + "] "); }
}
|
应用服务注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| package cn.ray.gateway.sdk.application;
import cn.ray.gateway.sdk.GatewayException; import cn.ray.gateway.sdk.annotation.ApiProducerClazz; import cn.ray.gateway.sdk.annotation.ApiProducerMethod; import cn.ray.gateway.sdk.config.GatewaySDKServiceProperties; import cn.ray.gateway.sdk.domain.service.GatewayCenterService; import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor;
import java.lang.reflect.Method;
public class GatewaySDKApplication implements BeanPostProcessor {
private Logger logger = LoggerFactory.getLogger(GatewaySDKApplication.class);
private GatewaySDKServiceProperties properties;
private GatewayCenterService gatewayCenterService;
public GatewaySDKApplication(GatewaySDKServiceProperties properties, GatewayCenterService gatewayCenterService) { this.properties = properties; this.gatewayCenterService = gatewayCenterService; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { ApiProducerClazz apiProducerClazz = bean.getClass().getAnnotation(ApiProducerClazz.class); if (null == apiProducerClazz) { return bean; } logger.info("\n应用注册:系统信息 \nsystemId: {} \nsystemName: {} \nsystemType: {} \nsystemRegistry: {}", properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry()); gatewayCenterService.doRegisterApplication(properties.getAddress(), properties.getSystemId(), properties.getSystemName(), "RPC", properties.getSystemRegistry());
Class<?>[] interfaces = bean.getClass().getInterfaces(); if (interfaces.length != 1) { throw new GatewayException(bean.getClass().getName() + "interfaces not one this is " + JSON.toJSONString(interfaces)); } String interfaceId = interfaces[0].getName(); logger.info("\n应用注册:接口信息 \nsystemId: {} \ninterfaceId: {} \ninterfaceName: {} \ninterfaceVersion: {}", properties.getSystemId(), interfaceId, apiProducerClazz.interfaceName(), apiProducerClazz.interfaceVersion()); gatewayCenterService.doRegisterApplicationInterface( properties.getAddress(), properties.getSystemId(), interfaceId, apiProducerClazz.interfaceName(), apiProducerClazz.interfaceVersion() );
Method[] methods = bean.getClass().getMethods(); for (Method method : methods) { ApiProducerMethod apiProducerMethod = method.getAnnotation(ApiProducerMethod.class); if (null == apiProducerMethod) { continue; } Class<?>[] parameterTypes = method.getParameterTypes(); StringBuilder parameters = new StringBuilder(); for (Class<?> clazz : parameterTypes) { parameters.append(clazz.getName()).append(","); } String parameterType = parameters.toString().substring(0,parameters.toString().lastIndexOf(",")); logger.info("\n应用注册:方法信息 \nsystemId: {} \ninterfaceId: {} \nmethodId: {} \nmethodName: {} \nparameterType: {} \nuri: {} \nhttpCommandType: {} \nauth: {}", properties.getSystemId(), bean.getClass().getName(), method.getName(), apiProducerMethod.methodName(), parameterType, apiProducerMethod.uri(), apiProducerMethod.httpRequestType(), apiProducerMethod.auth()); gatewayCenterService.doRegisterApplicationInterfaceMethod( properties.getAddress(), properties.getSystemId(), interfaceId, method.getName(), apiProducerMethod.methodName(), parameterType, apiProducerMethod.uri(), apiProducerMethod.httpRequestType(), apiProducerMethod.auth() ); }
gatewayCenterService.doRegisterEvent(properties.getAddress(), properties.getSystemId());
return bean; } }
|
这里后续可以在 sdk 注册接口时,向注册中心查询判断是否该接口已经注册,存在于数据库中,如果是新注册就将该接口的信息推送给注册中心,注册中心再将该消息发布,对应的网关引擎监听到该信息后,根据接口信息向网关通信组件 core 中的 Configuration 注册映射。
测试
启动 Docker 容器,启动 zookeeper 注册中心、启动 Redis 服务
清空数据库
1 2 3
| DELETE FROM application_ system; DELETE FROM application_interface; DELETE FROM application_ interface_method;
|
为的就是在 api-gateway-engine 启动的时候,不让它拉取到任何接口信息,当后续注册新的接口信息时候,再拉取。
启动 api-gateway-center 注册中心,保证后续的流程可以启动。
打包最新 api-gateway-assist-05,部署 api-gateway-engine 镜像文件到 Docker 容器。
启动 api-gateway-test。
观察 Docker 中 api-gateway-engine 的日志信息,是否有映射完成操作。
RPC 应用服务(SDK)未启动:
api-gateway-engine
api-gateway-center
RPC 应用服务(SDK)启动后:
api-gateway-test
api-gateway-center
api-gateway-engine
Postman
GET
POST
思考
大致流程
- 网关引擎初始化启动时,拉取注册中心数据库中已有的所有系统应用接口信息,完成
Configuration.addMapper
注册,并根据当前网关ID监听注册中心。
- 当有一个新的 RPC 应用服务加入网关时,将其带有注解
@ApiProducerClazz
、@ApiProducerMethod
的类和方法的信息持久化到网关中心的数据库中,之后调用 doRegisterEvent
事件发布接口,通知网关中心发布消息。
- 网关中心接收到发布请求后,通过
redisMessageTemplate.convertAndSend(gatewayId,systemId);
发布消息。
- 网关引擎运行中,监听到消息发布,接收到网关中心推送过来的systemId,之后根据 systemId 拉取该系统下所有的接口信息,并根据
configuration.addMapper(httpStatement);
完成通信组件 core 的配置注册。
Redis消息订阅发布
Redis一般都是用于缓存较多,这里记录一下用于事件推送的方法
Redis事件发布端
RedisTemplate#convertAndSend:发布消息,指明接收方 Topic 通信信道以及消息内容。
Redis 监听器
RedisConnectionFactory:负责设置连接参数,Redis服务地址。
RedisConnectionFactory 是 Spring Data Redis 提供的一个接口,用于获取 Redis 连接的工厂类。它可以通过配置来创建实例,或者通过自动配置的方式从应用程序的上下文中获取。
在 Spring Boot 应用程序中,可以通过配置文件或者编程方式来配置 Redis 连接工厂。通常,可以在 application.properties 或 application.yml 文件中配置 Redis 连接信息,例如 Redis 的主机名、端口号、密码等。Spring Boot 会自动读取这些配置,并根据配置创建一个 RedisConnectionFactory 的实例。
另外,还可以通过编程方式来配置 RedisConnectionFactory。在这种情况下,可以在 Spring 配置类中手动创建 RedisConnectionFactory 对象,并设置连接相关的属性,例如主机名、端口号、密码等。
总结起来,RedisConnectionFactory 可以通过配置文件或者编程方式进行配置,从而创建一个可以用于获取 Redis 连接的工厂实例。具体的配置方式可以根据项目的需求和使用的框架来确定。
在 assist 中注入连接工厂 Bean 并修改配置,从注册中心拉取 Redis 的IP端口信息(properties),创建 Jedis 客户端连接(不需要在 assist 重新配置 Redis 服务地址,减少用户的操作,也更方便统一配置)。
RedisMessageListenerContainer:注入消息监听器容器,需要设置连接工厂和监听器适配器。并将消息通信 Topic 与监听器适配器绑定。
MessageListenerAdapter:指明消息处理委托对象,以及消息处理方法(最终发布方的消息会被该方法接收)。
面试中可能会问到的问题
Redis 发布订阅(Publish/Subscribe)与消息队列 MQ 的区别?
Redis发布订阅(Publish/Subscribe)和消息队列都是用于实现消息传递的机制,但它们的实现方式和应用场景略有不同。
Redis发布订阅模式是一种广播机制,它允许订阅者(subscriber)接收到发布者(publisher)发送的消息。发布者将消息发送到指定的频道(channel),而订阅者则可以订阅一个或多个频道,以接收该频道中的所有消息。这种模式适用于消息的广播场景,如实时聊天、新闻推送等。
消息队列则是一种点对点(P2P)的消息传递方式,它将消息发送到队列中,然后由消费者(consumer)从队列中取出并处理该消息。消息队列可以保证消息的可靠性传递,同时也可以实现消息的异步处理,从而提高系统的吞吐量和可伸缩性。消息队列适用于异步处理、任务调度等场景。
因此,Redis发布订阅和消息队列虽然都可以实现消息传递,但在应用场景和实现方式上略有不同,需要根据实际需求选择合适的机制。
那 Redis 发布订阅有什么局限性?
Redis发布订阅模式的局限性主要包括以下几点:
- 可靠性:Redis发布订阅模式不保证消息的可靠性,因为订阅者只能接收到自己订阅的消息,而无法保证消息是否已经被正确地传递给所有的订阅者,也无法保证消息是否已经被正确地处理。
- 消息持久化:Redis发布订阅模式不支持消息的持久化,即当订阅者断开连接时,它将无法接收到在其离线期间发送的消息。
- 扩展性:Redis发布订阅模式不适合处理大量的消息,因为它只能在单个Redis实例中进行处理,无法进行水平扩展。
- 消息顺序:Redis发布订阅模式无法保证消息的顺序,因为它是异步的,消息的传递顺序可能与发布的顺序不一致。
因此,在实际应用中,如果需要保证消息的可靠性、持久化、顺序和扩展性,可以考虑使用消息队列等其他的解决方案。
那什么场景选择 Redis 发布订阅更好?为什么选择 Redis 发布订阅做网关注册的通知?
Redis发布订阅模式适用于以下场景:
- 实时性要求高:Redis发布订阅模式是异步的,消息的传递速度非常快,适合处理需要实时性的场景,如在线聊天、实时通知等。
- 简单的消息传递:Redis发布订阅模式非常适合处理简单的消息传递,如事件通知、状态更新等。
- 低延迟的数据处理:Redis发布订阅模式具有低延迟的特点,适合需要快速处理数据的场景,如实时数据分析、实时数据监控等。
- 简单的数据结构:Redis发布订阅模式的数据结构非常简单,易于使用和维护,适合处理简单的数据结构。
总的来说,Redis发布订阅模式适合处理实时性要求高、简单的消息传递、低延迟的数据处理、简单的数据结构等场景。如果需要处理复杂的消息传递、保证消息的可靠性、持久化、顺序和扩展性,可以考虑使用消息队列等其他的解决方案。
Redis 发布订阅为什么实时性高,延迟低?
Redis发布订阅模式具有实时性高、延迟低的特点,主要原因有以下几点:
- 异步通信:Redis发布订阅模式采用异步通信方式,发布者将消息发布到Redis服务器后,不需要等待订阅者接收消息的响应,即可继续处理其他任务,这样可以大大提高消息的传递速度。
- 内存存储:Redis是一种基于内存存储的NoSQL数据库,数据的读写非常快,可以在微秒级别内完成,这样可以大大缩短消息的传递时间。
- 单线程模型:Redis采用单线程模型,避免了多线程之间的线程切换和锁竞争,这样可以减少消息传递的延迟。
- 网络协议:Redis采用高效的网络协议,如RESP协议,可以大大提高消息的传递效率。
综上所述,Redis发布订阅模式具有实时性高、延迟低的特点,这主要得益于Redis采用了异步通信、内存存储、单线程模型和高效的网络协议等优化措施。