拆解Dubbo、HTTP调用,抽象为数据源服务,便于后续功能的扩展和使用
设计
API 网关的实现对于RPC接口的泛化调用,类似于ORM框架中对数据库的调用。所以也可以把 RPC 抽象成一种连接资源,做数据源的管理和池化的实现。这样既可以方便我们扩展新的连接方式,比如各类厂商的 RPC 框架,或者 HTTP 服务等,还可以通过SPI的方式进行自定义连接资源扩展,以适应不同场景的诉求。这也是抽象连接为数据源的设计目的。
实现 Mapping部分:
DataSource部分:
Connection:连接接口,提供 execute 执行方法
DataSource:数据源接口,提供获取连接方法
DataSourceFactory:数据源工厂,用来新建数据源
DataSourceType:枚举,不同的数据源类型,目前有 Dubbo 和 HTTP
UnpooledDataSource:数据源的实现类,并根据不同的数据源类型获取连接
UnpooledDataSourceFactory:为数据源提供配置,新建数据源;
DubboConnection/HTTPConnection:连接的实现类,根据请求调用方法
Session部分:
DefaultGatewaySession修改了get方法,参数 uri 变为 methodName。Session现在绑定数据源,根据数据源获取连接来执行对应的方法调用。
Socket部分:
按照JDBC的模型结构,设计 API 网关中的数据源实现,提供 Connection 连接接口,每一个具体的服务 Dubbo、HTTP 都实现 Connection,再由 Datasource 数据源接口来管理。
服务连接 Connection 1 2 3 4 5 6 7 8 9 10 11 package cn.ray.gateway.datasource;public interface Connection { Object execute (String methodName,String[] parameterTypes, String[] parameterNames,Object[] args) ; }
DubboConnection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package cn.ray.gateway.datasource.connection;import cn.ray.gateway.datasource.Connection;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.config.bootstrap.DubboBootstrap;import org.apache.dubbo.config.utils.ReferenceConfigCache;import org.apache.dubbo.rpc.service.GenericService;public class DubboConnection implements Connection { private final GenericService genericService; public DubboConnection (ApplicationConfig applicationConfig, RegistryConfig registryConfig, ReferenceConfig<GenericService> referenceConfig) { DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(applicationConfig).registry(registryConfig).registry(registryConfig).start(); ReferenceConfigCache cache = ReferenceConfigCache.getCache(); genericService = cache.get(referenceConfig); } @Override public Object execute (String methodName, String[] parameterTypes, String[] parameterNames, Object[] args) { return genericService.$invoke(methodName, parameterTypes, args); } }
把之前处于Session的get方法当中泛化调用的初始化和执行逻辑提取出来,单独变成一个Dubbo连接,之后与数据源绑定。
数据源管理 DataSource 1 2 3 4 5 6 7 8 9 10 11 package cn.ray.gateway.datasource;public interface DataSource { Connection getConnection () ; }
DataSourceFactory 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package cn.ray.gateway.datasource;import cn.ray.gateway.session.Configuration;public interface DataSourceFactory { void setProperties (Configuration configuration, String uri) ; DataSource getDataSource () ; }
UnpooledDataSource 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package cn.ray.gateway.datasource.unpooled;import cn.ray.gateway.datasource.Connection;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.datasource.DataSourceType;import cn.ray.gateway.datasource.connection.DubboConnection;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.rpc.service.GenericService;public class UnpooledDataSource implements DataSource { private Configuration configuration; private HttpStatement httpStatement; private DataSourceType dataSourceType; @Override public Connection getConnection () { switch (dataSourceType) { case HTTP: break ; case Dubbo: String application = httpStatement.getApplication(); String interfaceName = httpStatement.getInterfaceName(); ApplicationConfig applicationConfig = configuration.getApplicationConfig(application); RegistryConfig registryConfig = configuration.getRegistryConfig(application); ReferenceConfig<GenericService> referenceConfig = configuration.getReferenceConfig(interfaceName); return new DubboConnection (applicationConfig, registryConfig, referenceConfig); default : break ; } throw new RuntimeException ("DataSourceType:" + dataSourceType + "没有对应的数据源实现" ); } public void setConfiguration (Configuration configuration) { this .configuration = configuration; } public void setHttpStatement (HttpStatement httpStatement) { this .httpStatement = httpStatement; } public void setDataSourceType (DataSourceType dataSourceType) { this .dataSourceType = dataSourceType; } }
UnpooledDataSourceFactory 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package cn.ray.gateway.datasource.unpooled;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.datasource.DataSourceFactory;import cn.ray.gateway.datasource.DataSourceType;import cn.ray.gateway.session.Configuration;public class UnpooledDataSourceFactory implements DataSourceFactory { private UnpooledDataSource dataSource; public UnpooledDataSourceFactory () { this .dataSource = new UnpooledDataSource (); } @Override public void setProperties (Configuration configuration, String uri) { this .dataSource.setConfiguration(configuration); this .dataSource.setHttpStatement(configuration.getHttpStatement(uri)); this .dataSource.setDataSourceType(DataSourceType.Dubbo); } @Override public DataSource getDataSource () { return dataSource; } }
会话使用数据源 DefaultGatewaySessionFactory 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package cn.ray.gateway.session.defaults;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.datasource.DataSourceFactory;import cn.ray.gateway.datasource.unpooled.UnpooledDataSourceFactory;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import cn.ray.gateway.session.GatewaySessionFactory;public class DefaultGatewaySessionFactory implements GatewaySessionFactory { private final Configuration configuration; public DefaultGatewaySessionFactory (Configuration configuration) { this .configuration = configuration; } @Override public GatewaySession openSession (String uri) { DataSourceFactory dataSourceFactory = new UnpooledDataSourceFactory (); dataSourceFactory.setProperties(configuration,uri); DataSource dataSource = dataSourceFactory.getDataSource(); return new DefaultGatewaySession (configuration, uri, dataSource); } }
DefaultGatewaySession 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package cn.ray.gateway.session.defaults;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.datasource.Connection;import cn.ray.gateway.datasource.DataSource;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GatewaySession;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.config.bootstrap.DubboBootstrap;import org.apache.dubbo.config.utils.ReferenceConfigCache;import org.apache.dubbo.rpc.service.GenericService;public class DefaultGatewaySession implements GatewaySession { private Configuration configuration; private String uri; private DataSource dataSource; public DefaultGatewaySession (Configuration configuration, String uri, DataSource dataSource) { this .configuration = configuration; this .uri = uri; this .dataSource = dataSource; } @Override public Object get (String methodName, Object parameter) { Connection connection = dataSource.getConnection(); return connection.execute(methodName, new String [] {"java.lang.String" }, new String []{"name" }, new Object []{parameter}); } @Override public IGenericReference getMapper () { return configuration.getMapper(uri, this ); } @Override public Configuration getConfiguration () { return configuration; } }
测试 RPC 服务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package cn.ray.gateway.interfaces;import cn.ray.gateway.rpc.IActivityBooth;import cn.ray.gateway.rpc.dto.XReq;import com.alibaba.fastjson.JSON;import org.apache.dubbo.config.annotation.Service;@Service(version = "1.0.0") public class ActivityBooth implements IActivityBooth { @Override public String sayHi (String str) { return "hi " + str + " by api-gateway-test" ; } @Override public String insert (XReq req) { return "hi " + JSON.toJSONString(req) + " by api-gateway-test" ; } @Override public String test (String str, XReq req) { return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test" ; } }
启动网关 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package cn.ray.gateway.test;import cn.ray.gateway.mapping.HttpRequestType;import cn.ray.gateway.mapping.HttpStatement;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;import cn.ray.gateway.socket.GatewaySocketServer;import io.netty.channel.Channel;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class ApiTest { private final Logger logger = LoggerFactory.getLogger(ApiTest.class); @Test public void test_GenericReference () throws InterruptedException, ExecutionException { Configuration configuration = new Configuration (); HttpStatement httpStatement = new HttpStatement ( "api-gateway-test" , "cn.ray.gateway.rpc.IActivityBooth" , "sayHi" , "/wg/activity/sayHi" , HttpRequestType.GET); configuration.addMapper(httpStatement); DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory (configuration); GatewaySocketServer server = new GatewaySocketServer (gatewaySessionFactory); Future<Channel> future = Executors.newFixedThreadPool(2 ).submit(server); Channel channel = future.get(); if (null == channel) throw new RuntimeException ("netty server start error channel is null" ); while (!channel.isActive()) { logger.info("netty server gateway start Ing ..." ); Thread.sleep(500 ); } logger.info("netty server gateway start Done! {}" , channel.localAddress()); Thread.sleep(Long.MAX_VALUE); } }
测试结果