将 Session 会话中获取连接源,并通过连接源执行泛化调用的整个执行过程作为独立的模块抽离出来。
设计
之前对于 HTTP 请求到网络协议转换后,就是会话流程的处理。但在会话流程中还有些内容过于冗余,这些内容就是对数据源(RPC)的调用时入参和出参的封装,它们应该被提取到一个专门的类来处理,这样才能更加方便的管理。
会话的职责是负责串联上下文,执行器的职责是负责对数据源的调用信息处理。
实现 Mapping部分:
GenericReference:invoke返回值变化为 Object ,现在不单单只返回一个 String 了。
DataSource部分:
Session部分:
DefaultGatewaySession中的执行 RPC 逻辑拆分到执行器
Executor部分:
新增 Executor 接口,声明 execute 方法
新增 BaseExecutor 抽象类,实现 Executor 接口的 execute 方法,将原来会话中的参数封装的逻辑移入这里,并调用定义的抽象方法 doExec 实现不同的执行逻辑
新增 SimpleExecutor 类,继承 BaseExecutor 实现抽象方法 doExec ,将原来网关会话中的调用数据源连接对象 DubboConnection 的逻辑移入这里
新增 GatewayResult 类,包装服务返回的结果,便于统一结果
Socket部分:
新增了请求结果封装器 ResponseParser ,把 Handler 当中的结果封装部分移入结果封装器。
Type部分:
执行器 Executor 执行器接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package cn.ray.gateway.executor;import cn.ray.gateway.executor.result.GatewayResult;import cn.ray.gateway.mapping.HttpStatement;import java.util.Map;public interface Executor { GatewayResult execute (HttpStatement httpStatement, Map<String, Object> parameters) ; }
执行器抽象基类(模板模式) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package cn.ray.gateway.executor;import cn.ray.gateway.datasource.Connection;import cn.ray.gateway.executor.result.GatewayResult;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.type.SimpleTypeRegistry;import com.alibaba.fastjson.JSON;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public abstract class BaseExecutor implements Executor { private final Logger logger = LoggerFactory.getLogger(BaseExecutor.class); protected Configuration configuration; protected Connection connection; public BaseExecutor (Configuration configuration, Connection connection) { this .configuration = configuration; this .connection = connection; } @Override public GatewayResult execute (HttpStatement httpStatement, Map<String, Object> parameters) { String methodName = httpStatement.getMethodName(); String parameterType = httpStatement.getParameterType(); String[] parameterTypes = new String []{parameterType}; Object[] args = SimpleTypeRegistry.isSimpleType(parameterType) ? parameters.values().toArray() : new Object []{parameters}; logger.info("执行调用 method:{}#{}.{}({}) args:{}" , httpStatement.getApplication(), httpStatement.getInterfaceName(), httpStatement.getMethodName(), JSON.toJSONString(parameterTypes), JSON.toJSONString(args)); try { Object data = doExec(methodName, parameterTypes, args); return GatewayResult.buildSuccess(data); } catch (Exception e) { return GatewayResult.buildError(e.getMessage()); } } protected abstract Object doExec (String methodName, String[] parameterTypes, Object[] args) ; }
SimpleExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package cn.ray.gateway.executor;import cn.ray.gateway.datasource.Connection;import cn.ray.gateway.session.Configuration;public class SimpleExecutor extends BaseExecutor { public SimpleExecutor (Configuration configuration, Connection connection) { super (configuration, connection); } @Override protected Object doExec (String methodName, String[] parameterTypes, Object[] args) { return connection.execute( methodName, parameterTypes, new String []{"ignore" }, args); } }
GatewayResult 结果封装 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package cn.ray.gateway.executor.result;public class GatewayResult { private String code; private String message; private Object data; public GatewayResult (String code, String message, Object data) { this .code = code; this .message = message; this .data = data; } public static GatewayResult buildSuccess (Object data) { return new GatewayResult ("0000" ,"调用成功" , data); } public static GatewayResult buildError (Object data) { return new GatewayResult ("0001" ,"调用失败" , data); } public String getCode () { return code; } public String getMessage () { return message; } public Object getData () { return data; } }
会话调用 DefaultGatewaySessionFactory 中构建执行器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package cn.ray.gateway.session.defaults;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.datasource.DataSourceFactory;import cn.ray.gateway.datasource.unpooled.UnpooledDataSourceFactory;import cn.ray.gateway.executor.Executor;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import cn.ray.gateway.session.GatewaySessionFactory;public class DefaultGatewaySessionFactory implements GatewaySessionFactory { private final Configuration configuration; public DefaultGatewaySessionFactory (Configuration configuration) { this .configuration = configuration; } @Override public GatewaySession openSession (String uri) { DataSourceFactory dataSourceFactory = new UnpooledDataSourceFactory (); dataSourceFactory.setProperties(configuration,uri); DataSource dataSource = dataSourceFactory.getDataSource(); Executor executor = configuration.newExecutor(dataSource.getConnection()); return new DefaultGatewaySession (configuration, uri, executor); } }
开启会话时,使用连接 Connection 创建对应的执行器,并将其注入网关会话 DefaultGatewaySession ,便于后续使用。
DefaultGatewaySession 中调用执行器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package cn.ray.gateway.session.defaults;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.datasource.Connection;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.executor.Executor;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import cn.ray.gateway.type.SimpleTypeRegistry;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.config.bootstrap.DubboBootstrap;import org.apache.dubbo.config.utils.ReferenceConfigCache;import org.apache.dubbo.rpc.service.GenericService;import java.util.Map;public class DefaultGatewaySession implements GatewaySession { private Configuration configuration; private String uri; private Executor executor; public DefaultGatewaySession (Configuration configuration, String uri, Executor executor) { this .configuration = configuration; this .uri = uri; this .executor = executor; } @Override public Object get (String methodName, Map<String,Object> parameters) { HttpStatement httpStatement = configuration.getHttpStatement(uri); try { return executor.execute(httpStatement, parameters); } catch (Exception e) { throw new RuntimeException ("Error exec get. Cause: " + e); } } @Override public Object post (String methodName, Map<String, Object> parameters) { return get(methodName, parameters); } @Override public IGenericReference getMapper () { return configuration.getMapper(uri, this ); } @Override public Configuration getConfiguration () { return configuration; } }
当调用 RPC 泛化服务代理对象时,会调用网关会话 DefaultGatewaySession 执行服务请求,此时就调用上面定义的执行器完成服务请求操作,并接受服务请求结果,最后用 GatewayResult 包装返回。
网络调用 请求结果封装器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package cn.ray.gateway.socket.agreement;import com.alibaba.fastjson.JSON;import io.netty.handler.codec.http.*;public class ResponseParser { public static DefaultFullHttpResponse parse (Object result) { DefaultFullHttpResponse response = new DefaultFullHttpResponse (HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.content().writeBytes(JSON.toJSONString(result).getBytes()); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8" ); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true" ); return response; } }
会话服务处理器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package cn.ray.gateway.socket.handlers;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.session.GatewaySession;import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;import cn.ray.gateway.socket.BaseHandler;import cn.ray.gateway.socket.agreement.RequestParser;import cn.ray.gateway.socket.agreement.ResponseParser;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.http.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Map;public class GatewayServerHandler extends BaseHandler <FullHttpRequest> { private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class); private final DefaultGatewaySessionFactory gatewaySessionFactory; public GatewayServerHandler (DefaultGatewaySessionFactory gatewaySessionFactory) { this .gatewaySessionFactory = gatewaySessionFactory; } @Override protected void session (ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) { logger.info("网关接收请求 ===> uri: {} , method: {}" , request.uri(), request.method()); RequestParser requestParser = new RequestParser (request); String uri = requestParser.getUri(); if (uri == null ) { return ; } Map<String, Object> parameters = requestParser.parse(); GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri); IGenericReference reference = gatewaySession.getMapper(); Object result = reference.$invoke(parameters); DefaultFullHttpResponse response = ResponseParser.parse(result); channel.writeAndFlush(response); } }
将GatewayServerHandler自定义网关处理器中的逻辑封装为3各模块:解析请求参数、调用会话服务、封装返回结果并反馈给客户端。
将网关的逻辑分为各个独立的模块,每个模块都负责特定的功能,并通过Contiguration贯穿、联系起来,解决了会话流程中的冗余内容,让各个类的职责更加清晰。
测试 RPC 服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package cn.ray.gateway.interfaces;import cn.ray.gateway.rpc.IActivityBooth;import cn.ray.gateway.rpc.dto.XReq;import com.alibaba.fastjson.JSON;import org.apache.dubbo.config.annotation.Service;@Service(version = "1.0.0") public class ActivityBooth implements IActivityBooth { @Override public String sayHi (String str) { return "hi " + str + " by api-gateway-test" ; } @Override public String insert (XReq req) { return "hi " + JSON.toJSONString(req) + " by api-gateway-test" ; } @Override public String test (String str, XReq req) { return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test" ; } }
启动网关 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package cn.ray.gateway.test;import cn.ray.gateway.mapping.HttpRequestType;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;import cn.ray.gateway.socket.GatewaySocketServer;import io.netty.channel.Channel;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class ApiTest { private final Logger logger = LoggerFactory.getLogger(ApiTest.class); @Test public void test_GenericReference () throws InterruptedException, ExecutionException { Configuration configuration = new Configuration (); HttpStatement httpStatement01 = new HttpStatement ( "api-gateway-test" , "cn.ray.gateway.rpc.IActivityBooth" , "sayHi" , "/wg/activity/sayHi" , HttpRequestType.GET, "java.lang.String" ); HttpStatement httpStatement02 = new HttpStatement ( "api-gateway-test" , "cn.ray.gateway.rpc.IActivityBooth" , "insert" , "/wg/activity/insert" , HttpRequestType.POST, "cn.ray.gateway.rpc.dto.XReq" ); configuration.addMapper(httpStatement01); configuration.addMapper(httpStatement02); DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory (configuration); GatewaySocketServer server = new GatewaySocketServer (gatewaySessionFactory); Future<Channel> future = Executors.newFixedThreadPool(2 ).submit(server); Channel channel = future.get(); if (null == channel) throw new RuntimeException ("netty server start error channel is null" ); while (!channel.isActive()) { logger.info("netty server gateway start Ing ..." ); Thread.sleep(500 ); } logger.info("netty server gateway start Done! {}" , channel.localAddress()); Thread.sleep(Long.MAX_VALUE); } }
测试结果 GET
POST
思考
由于整个网关会话由 Configuration 作为配置贯穿,可以在 Configuration 中新建创建执行器的方法,后续可以通过修改这个方法以得到不同的 Executor 。
对于 ResponseParser ,可以为对应请求结果封装的逻辑创建一个静态方法,后续需要时直接调用即可。