通过分治重构会话流程,细化网络协议、会话、绑定、映射和泛化调用的关系。
设计
因为我们要继续扩展新的功能,而上一章功能实现的逻辑分层结构不易于扩展,所以需要进行重构处理 。重构点包括;
拆分session包下会话业务逻辑部分和网络通信部分,做功能实现的隔离。
拆分 bind 包下代理和对RPC接口的泛化调用,这里你可以把RPC当做一种可连接资源,而这种连接资源也不是只有 RPC 一种,同时也因为 RPC 的泛化调用是一种通用方法,井不需要与逻辑绑定,所以它也应该被拆分出来。
实现 Mapping 模块:
MapperMethod:switch处理不同的请求,调用GatewaySession中的方法执行具体内容(目前只提供了 get 接口方法实现)
MapperProxy:延续之前的处理方式,这块是一个Interceptor,用来调用执行后续方法,从直接调用 dubbo 变成了调用 MapperedMethod,也就是根据请求类型从 GatewaySession 执行对应的方法
MapperProxyFactory:同之前的 GenericReferenceProxyFactory 代理对象工厂,利用 cglib 回调对象 MapperProxy 生成 IGenericReference 代理对象
MapperRegistry:添加映射,注册对应 uri 的 MapperProxyFactory ,并提供通过MapperProxyFactory.new 生成 IGenericReference 代理对象并返回的方法
HttpRequestType:枚举类,对应不同的请求类型
HttpStatement:一个HTTP请求的封装类,包含对应的应用名称、接口名、方法名、uri 以及对应的请求类型
Session模块:
Configuration:通过 Map 缓存添加、获取 HttpStatement:addHttpStatement 、 getHttpStatement ;并通过 addMapper 由 MapperRegistry 添加泛化调用工厂,并在之后通过 getMapper 由 MapperRegistry 中Map缓存的对应 uri 的泛化调用工厂生成并获取泛化调用代理对象
GatewaySession:网关会话接口,提供get方法、getMapper方法、getConfiguration方法供不同的协议或服务实现
GatewaySessionFactory:泛化调用会话工厂接口
DefaultGatewaySession:实现了GatewaySession 网关会话接口,这里主要是用于实现 RPC 泛化调用
DefaultGatewaySessionFactory:泛化调用会话工厂,用于在 Netty Handler 中创建 DefaultGatewaySession ,开启网关会话,并执行对应的映射调用
Socket模块:
这一块就是沿袭之前的netty处理,只是把其中调用服务的逻辑拆分出去,依赖 gatewaySession 去调用
整个工程结构分治设计包括;bind(绑定)、mapping(映射)、session(会话)、socket(网络)这4大块。
整个调用流程以 socket 网络处理协议转换后,获取会话 session, 从 session 中得到映射器对象,并根据 HTTP 请求的 GET/POST 调用到不同的方法上。
这样的拆分可以方便在网络层做鉴权、限流、熔断、鉴权等功能,它们可以与 session 会话逻辑拆分。
另外一层关系是 bind 绑定层中,把 RPC 的泛化调用拆分出来,这里可以把 RPC 当成一种资源来看待,拆分后更有易于后续的池化、扩展和管理。
MapperMethod 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package cn.ray.gateway.bind;import cn.ray.gateway.mapping.HttpRequestType;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import java.lang.reflect.Method;public class MapperMethod { private String uri; private final HttpRequestType httpRequestType; public MapperMethod (String uri, Method method, Configuration configuration) { this .uri = uri; this .httpRequestType = configuration.getHttpStatement(uri).getHttpRequestType(); } public Object execute (GatewaySession session, Object args) { Object result = null ; switch (httpRequestType) { case GET: result = session.get(uri, args); break ; case POST: break ; case PUT: break ; case DELETE: break ; default : throw new RuntimeException ("Unknown execution method for: " + httpRequestType); } return result; } }
不同类型的 HTTP 请求做不同的逻辑处理
MapperProxy 映射代理调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package cn.ray.gateway.bind;import cn.ray.gateway.session.GatewaySession;import net.sf.cglib.proxy.MethodInterceptor;import net.sf.cglib.proxy.MethodProxy;import java.lang.reflect.Method;public class MapperProxy implements MethodInterceptor { private GatewaySession gatewaySession; private final String uri; public MapperProxy (GatewaySession gatewaySession, String uri) { this .gatewaySession = gatewaySession; this .uri = uri; } @Override public Object intercept (Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { MapperMethod linkMethod = new MapperMethod (uri, method, gatewaySession.getConfiguration()); return linkMethod.execute(gatewaySession, objects); } }
简化映射器代理,将原有的 RPC 泛化调用拆分。这个类中只完成代理部分,并调用映射器方法完成逻辑处理,达到处理不同服务(HTTP/RPC/其它)的调用方法。
GatewaySession 处理网关 HTTP 请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package cn.ray.gateway.session;import cn.ray.gateway.bind.IGenericReference;public interface GatewaySession { Object get (String uri, Object parameter) ; IGenericReference getMapper (String uri) ; Configuration getConfiguration () ; }
DefaultGatewaySession 默认(RPC)网关会话实现类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package cn.ray.gateway.session.defaults;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.config.bootstrap.DubboBootstrap;import org.apache.dubbo.config.utils.ReferenceConfigCache;import org.apache.dubbo.rpc.service.GenericService;public class DefaultGatewaySession implements GatewaySession { private Configuration configuration; public DefaultGatewaySession (Configuration configuration) { this .configuration = configuration; } @Override public Object get (String uri, Object parameter) { HttpStatement httpStatement = configuration.getHttpStatement(uri); String application = httpStatement.getApplication(); String interfaceName = httpStatement.getInterfaceName(); ApplicationConfig applicationConfig = configuration.getApplicationConfig(application); RegistryConfig registryConfig = configuration.getRegistryConfig(application); ReferenceConfig<GenericService> referenceConfig = configuration.getReferenceConfig(interfaceName); DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(applicationConfig).registry(registryConfig).registry(registryConfig).start(); ReferenceConfigCache cache = ReferenceConfigCache.getCache(); GenericService genericService = cache.get(referenceConfig); return genericService.$invoke(httpStatement.getMethodName(), new String []{"java.lang.String" }, new Object []{"Ray" }); } @Override public IGenericReference getMapper (String uri) { return configuration.getMapper(uri, this ); } @Override public Configuration getConfiguration () { return configuration; } }
GatewayServerHandler 会话服务处理器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package cn.ray.gateway.socket.handlers;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.session.GatewaySession;import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;import cn.ray.gateway.socket.BaseHandler;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.http.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class GatewayServerHandler extends BaseHandler <FullHttpRequest> { private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class); private final DefaultGatewaySessionFactory gatewaySessionFactory; public GatewayServerHandler (DefaultGatewaySessionFactory gatewaySessionFactory) { this .gatewaySessionFactory = gatewaySessionFactory; } @Override protected void session (ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) { logger.info("网关接收请求 ===> uri: {} , method: {}" ,request.uri(),request.method()); String methodName = request.uri().substring(1 ); if (methodName.equals("favicon.ico" )) return ; DefaultFullHttpResponse response = new DefaultFullHttpResponse (HttpVersion.HTTP_1_1, HttpResponseStatus.OK); GatewaySession gatewaySession = gatewaySessionFactory.openSession(); IGenericReference reference = gatewaySession.getMapper(request.uri()); String result = reference.$invoke("test" ) + " " + System.currentTimeMillis(); response.content().writeBytes(JSON.toJSONBytes(result, SerializerFeature.PrettyFormat)); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8" ); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true" ); channel.writeAndFlush(response); } }
测试 RPC 服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package cn.ray.gateway.interfaces;import cn.ray.gateway.rpc.IActivityBooth;import cn.ray.gateway.rpc.dto.XReq;import com.alibaba.fastjson.JSON;import org.apache.dubbo.config.annotation.Service;@Service(version = "1.0.0") public class ActivityBooth implements IActivityBooth { @Override public String sayHi (String str) { return "hi " + str + " by api-gateway-test" ; } @Override public String insert (XReq req) { return "hi " + JSON.toJSONString(req) + " by api-gateway-test" ; } @Override public String test (String str, XReq req) { return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test" ; } }
启动网关 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package cn.ray.gateway.test;import cn.ray.gateway.mapping.HttpRequestType;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;import cn.ray.gateway.socket.GatewaySocketServer;import io.netty.channel.Channel;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class ApiTest { private final Logger logger = LoggerFactory.getLogger(ApiTest.class); @Test public void test_GenericReference () throws InterruptedException, ExecutionException { Configuration configuration = new Configuration (); HttpStatement httpStatement = new HttpStatement ( "api-gateway-test" , "cn.ray.gateway.rpc.IActivityBooth" , "sayHi" , "/wg/activity/sayHi" , HttpRequestType.GET); configuration.addMapper(httpStatement); DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory (configuration); GatewaySocketServer server = new GatewaySocketServer (gatewaySessionFactory); Future<Channel> future = Executors.newFixedThreadPool(2 ).submit(server); Channel channel = future.get(); if (null == channel) throw new RuntimeException ("netty server start error channel is null" ); while (!channel.isActive()) { logger.info("netty server gateway start Ing ..." ); Thread.sleep(500 ); } logger.info("netty server gateway start Done! {}" , channel.localAddress()); Thread.sleep(Long.MAX_VALUE); } }
测试结果