第四章 将连接(RPC\HTTP\其它)抽象为数据源

拆解Dubbo、HTTP调用,抽象为数据源服务,便于后续功能的扩展和使用

设计

4-设计

API 网关的实现对于RPC接口的泛化调用,类似于ORM框架中对数据库的调用。所以也可以把 RPC 抽象成一种连接资源,做数据源的管理和池化的实现。这样既可以方便我们扩展新的连接方式,比如各类厂商的 RPC 框架,或者 HTTP 服务等,还可以通过SPI的方式进行自定义连接资源扩展,以适应不同场景的诉求。这也是抽象连接为数据源的设计目的。

实现

Mapping部分:

  • 无变动。

DataSource部分:

  • Connection:连接接口,提供 execute 执行方法
  • DataSource:数据源接口,提供获取连接方法
  • DataSourceFactory:数据源工厂,用来新建数据源
  • DataSourceType:枚举,不同的数据源类型,目前有 Dubbo 和 HTTP
  • UnpooledDataSource:数据源的实现类,并根据不同的数据源类型获取连接
  • UnpooledDataSourceFactory:为数据源提供配置,新建数据源;
  • DubboConnection/HTTPConnection:连接的实现类,根据请求调用方法

Session部分:

  • DefaultGatewaySession修改了get方法,参数 uri 变为 methodName。Session现在绑定数据源,根据数据源获取连接来执行对应的方法调用。

Socket部分:

  • 无变动。

4-实现

按照JDBC的模型结构,设计 API 网关中的数据源实现,提供 Connection 连接接口,每一个具体的服务 Dubbo、HTTP 都实现 Connection,再由 Datasource 数据源接口来管理。

服务连接

Connection

1
2
3
4
5
6
7
8
9
10
11
package cn.ray.gateway.datasource;

/**
* @author Ray
* @date 2023/5/15 15:28
* @description 连接接口
*/
public interface Connection {

Object execute(String methodName,String[] parameterTypes, String[] parameterNames,Object[] args);
}

DubboConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.ray.gateway.datasource.connection;

import cn.ray.gateway.datasource.Connection;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;

/**
* @author Ray
* @date 2023/5/15 15:33
* @description RPC Dubbo Connection
*/
public class DubboConnection implements Connection {

private final GenericService genericService;

public DubboConnection(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ReferenceConfig<GenericService> referenceConfig) {
// 构建Dubbo服务
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(applicationConfig).registry(registryConfig).registry(registryConfig).start();
// 获取泛化调用服务
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
genericService = cache.get(referenceConfig);
}

/**
* Dubbo 泛化调用:https://dubbo.apache.org/zh/docsv2.7/user/examples/generic-reference/
*/
@Override
public Object execute(String methodName, String[] parameterTypes, String[] parameterNames, Object[] args) {
return genericService.$invoke(methodName, parameterTypes, args);
}
}
  • 把之前处于Session的get方法当中泛化调用的初始化和执行逻辑提取出来,单独变成一个Dubbo连接,之后与数据源绑定。

数据源管理

DataSource

1
2
3
4
5
6
7
8
9
10
11
package cn.ray.gateway.datasource;

/**
* @author Ray
* @date 2023/5/15 15:30
* @description 数据源接口,获取不同的数据源连接
*/
public interface DataSource {

Connection getConnection();
}

DataSourceFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package cn.ray.gateway.datasource;

import cn.ray.gateway.session.Configuration;

/**
* @author Ray
* @date 2023/5/15 15:52
* @description 数据源工厂
*/
public interface DataSourceFactory {

void setProperties(Configuration configuration, String uri);

DataSource getDataSource();
}

UnpooledDataSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package cn.ray.gateway.datasource.unpooled;

import cn.ray.gateway.datasource.Connection;
import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.datasource.DataSourceType;
import cn.ray.gateway.datasource.connection.DubboConnection;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.service.GenericService;

/**
* @author Ray
* @date 2023/5/15 15:45
* @description 无池化数据源
*/
public class UnpooledDataSource implements DataSource {

private Configuration configuration;

private HttpStatement httpStatement;

private DataSourceType dataSourceType;

@Override
public Connection getConnection() {
switch (dataSourceType) {
case HTTP:
break; // TODO
case Dubbo:
// 配置信息
String application = httpStatement.getApplication();
String interfaceName = httpStatement.getInterfaceName();

// 获取服务
ApplicationConfig applicationConfig = configuration.getApplicationConfig(application);
RegistryConfig registryConfig = configuration.getRegistryConfig(application);
ReferenceConfig<GenericService> referenceConfig = configuration.getReferenceConfig(interfaceName);
return new DubboConnection(applicationConfig, registryConfig, referenceConfig);
default:
break;
}
throw new RuntimeException("DataSourceType:" + dataSourceType + "没有对应的数据源实现");
}

public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}

public void setHttpStatement(HttpStatement httpStatement) {
this.httpStatement = httpStatement;
}

public void setDataSourceType(DataSourceType dataSourceType) {
this.dataSourceType = dataSourceType;
}
}

UnpooledDataSourceFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.ray.gateway.datasource.unpooled;

import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.datasource.DataSourceFactory;
import cn.ray.gateway.datasource.DataSourceType;
import cn.ray.gateway.session.Configuration;

/**
* @author Ray
* @date 2023/5/15 15:56
* @description
*/
public class UnpooledDataSourceFactory implements DataSourceFactory {

private UnpooledDataSource dataSource;


public UnpooledDataSourceFactory() {
this.dataSource = new UnpooledDataSource();
}

@Override
public void setProperties(Configuration configuration, String uri) {
this.dataSource.setConfiguration(configuration);
this.dataSource.setHttpStatement(configuration.getHttpStatement(uri));
this.dataSource.setDataSourceType(DataSourceType.Dubbo);
}

@Override
public DataSource getDataSource() {
return dataSource;
}
}

会话使用数据源

DefaultGatewaySessionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.ray.gateway.session.defaults;

import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.datasource.DataSourceFactory;
import cn.ray.gateway.datasource.unpooled.UnpooledDataSourceFactory;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;
import cn.ray.gateway.session.GatewaySessionFactory;

/**
* @author Ray
* @date 2023/5/12 22:25
* @description 泛化调用会话工厂
*/
public class DefaultGatewaySessionFactory implements GatewaySessionFactory {

private final Configuration configuration;

public DefaultGatewaySessionFactory(Configuration configuration) {
this.configuration = configuration;
}

@Override
public GatewaySession openSession(String uri) {
// 获取数据源连接信息:这里把 Dubbo、HTTP 抽象为连接资源
DataSourceFactory dataSourceFactory = new UnpooledDataSourceFactory();
dataSourceFactory.setProperties(configuration,uri);
DataSource dataSource = dataSourceFactory.getDataSource();
return new DefaultGatewaySession(configuration, uri, dataSource);
}
}

DefaultGatewaySession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.ray.gateway.session.defaults;

import cn.ray.gateway.bind.IGenericReference;
import cn.ray.gateway.datasource.Connection;
import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;

/**
* @author Ray
* @date 2023/5/13 17:56
* @description 默认网关会话实现类
*/
public class DefaultGatewaySession implements GatewaySession {

private Configuration configuration;

private String uri;

private DataSource dataSource;

public DefaultGatewaySession(Configuration configuration, String uri, DataSource dataSource) {
this.configuration = configuration;
this.uri = uri;
this.dataSource = dataSource;
}

@Override
public Object get(String methodName, Object parameter) {
Connection connection = dataSource.getConnection();
return connection.execute(methodName, new String[] {"java.lang.String"}, new String[]{"name"}, new Object[]{parameter});
}

@Override
public IGenericReference getMapper() {
return configuration.getMapper(uri, this);
}

@Override
public Configuration getConfiguration() {
return configuration;
}
}

测试

RPC 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.interfaces;

import cn.ray.gateway.rpc.IActivityBooth;
import cn.ray.gateway.rpc.dto.XReq;
import com.alibaba.fastjson.JSON;
import org.apache.dubbo.config.annotation.Service;

/**
* @author Ray
* @date 2023/5/12 22:36
* @description
*/
@Service(version = "1.0.0")
public class ActivityBooth implements IActivityBooth {

@Override
public String sayHi(String str) {
return "hi " + str + " by api-gateway-test";
}

@Override
public String insert(XReq req) {
return "hi " + JSON.toJSONString(req) + " by api-gateway-test";
}

@Override
public String test(String str, XReq req) {
return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test";
}
}

启动网关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.ray.gateway.test;

import cn.ray.gateway.mapping.HttpRequestType;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;
import cn.ray.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Ray
* @date 2023/5/11 16:26
* @description
*/
public class ApiTest {

private final Logger logger = LoggerFactory.getLogger(ApiTest.class);

@Test
public void test_GenericReference() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement = new HttpStatement(
"api-gateway-test",
"cn.ray.gateway.rpc.IActivityBooth",
"sayHi",
"/wg/activity/sayHi",
HttpRequestType.GET);
configuration.addMapper(httpStatement);

// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);

// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);

Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();

if (null == channel) throw new RuntimeException("netty server start error channel is null");

while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());

Thread.sleep(Long.MAX_VALUE);
}
}

测试结果

4-测试-1

4-测试-2