第六章 引入执行器封装服务调用

将 Session 会话中获取连接源,并通过连接源执行泛化调用的整个执行过程作为独立的模块抽离出来。

设计

6-设计

之前对于 HTTP 请求到网络协议转换后,就是会话流程的处理。但在会话流程中还有些内容过于冗余,这些内容就是对数据源(RPC)的调用时入参和出参的封装,它们应该被提取到一个专门的类来处理,这样才能更加方便的管理。

会话的职责是负责串联上下文,执行器的职责是负责对数据源的调用信息处理。

实现

Mapping部分:

  • GenericReference:invoke返回值变化为 Object ,现在不单单只返回一个 String 了。

DataSource部分:

  • 无变动

Session部分:

  • DefaultGatewaySession中的执行 RPC 逻辑拆分到执行器

Executor部分:

  • 新增 Executor 接口,声明 execute 方法
  • 新增 BaseExecutor 抽象类,实现 Executor 接口的 execute 方法,将原来会话中的参数封装的逻辑移入这里,并调用定义的抽象方法 doExec 实现不同的执行逻辑
  • 新增 SimpleExecutor 类,继承 BaseExecutor 实现抽象方法 doExec ,将原来网关会话中的调用数据源连接对象 DubboConnection 的逻辑移入这里
  • 新增 GatewayResult 类,包装服务返回的结果,便于统一结果

Socket部分:

  • 新增了请求结果封装器 ResponseParser ,把 Handler 当中的结果封装部分移入结果封装器。

Type部分:

  • 无变动

6-实现

执行器

Executor 执行器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package cn.ray.gateway.executor;

import cn.ray.gateway.executor.result.GatewayResult;
import cn.ray.gateway.mapping.HttpStatement;

import java.util.Map;

/**
* @author Ray
* @date 2023/5/17 16:44
* @description 执行器接口
*/
public interface Executor {

GatewayResult execute(HttpStatement httpStatement, Map<String, Object> parameters);
}

执行器抽象基类(模板模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.ray.gateway.executor;

import cn.ray.gateway.datasource.Connection;
import cn.ray.gateway.executor.result.GatewayResult;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.type.SimpleTypeRegistry;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* @author Ray
* @date 2023/5/17 16:48
* @description 执行器抽象基类
*/
public abstract class BaseExecutor implements Executor {

private final Logger logger = LoggerFactory.getLogger(BaseExecutor.class);

protected Configuration configuration;

protected Connection connection;

public BaseExecutor(Configuration configuration, Connection connection) {
this.configuration = configuration;
this.connection = connection;
}

@Override
public GatewayResult execute(HttpStatement httpStatement, Map<String, Object> parameters) {
// 参数处理;后续的一些参数校验也可以在这里封装。
String methodName = httpStatement.getMethodName();
String parameterType = httpStatement.getParameterType();
String[] parameterTypes = new String[]{parameterType};
Object[] args = SimpleTypeRegistry.isSimpleType(parameterType) ? parameters.values().toArray() : new Object[]{parameters};
logger.info("执行调用 method:{}#{}.{}({}) args:{}", httpStatement.getApplication(), httpStatement.getInterfaceName(), httpStatement.getMethodName(), JSON.toJSONString(parameterTypes), JSON.toJSONString(args));

// 执行调用
try {
Object data = doExec(methodName, parameterTypes, args);
return GatewayResult.buildSuccess(data);
} catch (Exception e) {
return GatewayResult.buildError(e.getMessage());
}
}

protected abstract Object doExec(String methodName, String[] parameterTypes, Object[] args);
}

SimpleExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.ray.gateway.executor;

import cn.ray.gateway.datasource.Connection;
import cn.ray.gateway.session.Configuration;

/**
* @author Ray
* @date 2023/5/17 16:58
* @description 简单执行器
*/
public class SimpleExecutor extends BaseExecutor {

public SimpleExecutor(Configuration configuration, Connection connection) {
super(configuration, connection);
}

@Override
protected Object doExec(String methodName, String[] parameterTypes, Object[] args) {
return connection.execute(
methodName,
parameterTypes,
new String[]{"ignore"},
args);
}
}

GatewayResult 结果封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.ray.gateway.executor.result;

/**
* @author Ray
* @date 2023/5/17 16:45
* @description
*/
public class GatewayResult {

private String code;

private String message;

private Object data;

public GatewayResult(String code, String message, Object data) {
this.code = code;
this.message = message;
this.data = data;
}

public static GatewayResult buildSuccess(Object data){
return new GatewayResult("0000","调用成功", data);
}

public static GatewayResult buildError(Object data){
return new GatewayResult("0001","调用失败", data);
}

public String getCode() {
return code;
}

public String getMessage() {
return message;
}

public Object getData() {
return data;
}
}

会话调用

DefaultGatewaySessionFactory 中构建执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.ray.gateway.session.defaults;

import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.datasource.DataSourceFactory;
import cn.ray.gateway.datasource.unpooled.UnpooledDataSourceFactory;
import cn.ray.gateway.executor.Executor;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;
import cn.ray.gateway.session.GatewaySessionFactory;

/**
* @author Ray
* @date 2023/5/12 22:25
* @description 泛化调用会话工厂
*/
public class DefaultGatewaySessionFactory implements GatewaySessionFactory {

private final Configuration configuration;

public DefaultGatewaySessionFactory(Configuration configuration) {
this.configuration = configuration;
}

@Override
public GatewaySession openSession(String uri) {
// 获取数据源连接信息:这里把 Dubbo、HTTP 抽象为连接资源
DataSourceFactory dataSourceFactory = new UnpooledDataSourceFactory();
dataSourceFactory.setProperties(configuration,uri);
DataSource dataSource = dataSourceFactory.getDataSource();
// 创建执行器
Executor executor = configuration.newExecutor(dataSource.getConnection());
// 创建会话
return new DefaultGatewaySession(configuration, uri, executor);
}
}

开启会话时,使用连接 Connection 创建对应的执行器,并将其注入网关会话 DefaultGatewaySession ,便于后续使用。

DefaultGatewaySession 中调用执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package cn.ray.gateway.session.defaults;

import cn.ray.gateway.bind.IGenericReference;
import cn.ray.gateway.datasource.Connection;
import cn.ray.gateway.datasource.DataSource;
import cn.ray.gateway.executor.Executor;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;
import cn.ray.gateway.type.SimpleTypeRegistry;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;

import java.util.Map;

/**
* @author Ray
* @date 2023/5/13 17:56
* @description 默认网关会话实现类
*/
public class DefaultGatewaySession implements GatewaySession {

private Configuration configuration;

private String uri;

private Executor executor;

public DefaultGatewaySession(Configuration configuration, String uri, Executor executor) {
this.configuration = configuration;
this.uri = uri;
this.executor = executor;
}

@Override
public Object get(String methodName, Map<String,Object> parameters) {
HttpStatement httpStatement = configuration.getHttpStatement(uri);
try {
return executor.execute(httpStatement, parameters);
} catch (Exception e) {
throw new RuntimeException("Error exec get. Cause: " + e);
}
}

@Override
public Object post(String methodName, Map<String, Object> parameters) {
return get(methodName, parameters);
}

@Override
public IGenericReference getMapper() {
return configuration.getMapper(uri, this);
}

@Override
public Configuration getConfiguration() {
return configuration;
}
}

当调用 RPC 泛化服务代理对象时,会调用网关会话 DefaultGatewaySession 执行服务请求,此时就调用上面定义的执行器完成服务请求操作,并接受服务请求结果,最后用 GatewayResult 包装返回。

网络调用

请求结果封装器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.ray.gateway.socket.agreement;

import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.http.*;

/**
* @author Ray
* @date 2023/5/17 17:16
* @description 构建结果返回信息
*/
public class ResponseParser {

public static DefaultFullHttpResponse parse(Object result) {
// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// 设置回写数据
response.content().writeBytes(JSON.toJSONString(result).getBytes());
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置持久连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
return response;
}
}

会话服务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.ray.gateway.socket.handlers;

import cn.ray.gateway.bind.IGenericReference;
import cn.ray.gateway.session.GatewaySession;
import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;
import cn.ray.gateway.socket.BaseHandler;
import cn.ray.gateway.socket.agreement.RequestParser;
import cn.ray.gateway.socket.agreement.ResponseParser;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* @author Ray
* @date 2023/5/11 15:55
* @description 会话服务处理器
*/
public class GatewayServerHandler extends BaseHandler<FullHttpRequest> {

private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class);

private final DefaultGatewaySessionFactory gatewaySessionFactory;

public GatewayServerHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}

@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 ===> uri: {} , method: {}", request.uri(), request.method());

// 1、解析请求参数
RequestParser requestParser = new RequestParser(request);
String uri = requestParser.getUri();
if (uri == null) {
return;
}
Map<String, Object> parameters = requestParser.parse();

// 2、调用会话服务
GatewaySession gatewaySession = gatewaySessionFactory.openSession(uri);
IGenericReference reference = gatewaySession.getMapper();
Object result = reference.$invoke(parameters);

// 3、封装返回结果
DefaultFullHttpResponse response = ResponseParser.parse(result);
channel.writeAndFlush(response);
}
}

将GatewayServerHandler自定义网关处理器中的逻辑封装为3各模块:解析请求参数、调用会话服务、封装返回结果并反馈给客户端。

将网关的逻辑分为各个独立的模块,每个模块都负责特定的功能,并通过Contiguration贯穿、联系起来,解决了会话流程中的冗余内容,让各个类的职责更加清晰。

测试

RPC 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.interfaces;

import cn.ray.gateway.rpc.IActivityBooth;
import cn.ray.gateway.rpc.dto.XReq;
import com.alibaba.fastjson.JSON;
import org.apache.dubbo.config.annotation.Service;

/**
* @author Ray
* @date 2023/5/12 22:36
* @description
*/
@Service(version = "1.0.0")
public class ActivityBooth implements IActivityBooth {

@Override
public String sayHi(String str) {
return "hi " + str + " by api-gateway-test";
}

@Override
public String insert(XReq req) {
return "hi " + JSON.toJSONString(req) + " by api-gateway-test";
}

@Override
public String test(String str, XReq req) {
return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test";
}
}

启动网关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.ray.gateway.test;

import cn.ray.gateway.mapping.HttpRequestType;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;
import cn.ray.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Ray
* @date 2023/5/11 16:26
* @description
*/
public class ApiTest {

private final Logger logger = LoggerFactory.getLogger(ApiTest.class);

@Test
public void test_GenericReference() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();

HttpStatement httpStatement01 = new HttpStatement(
"api-gateway-test",
"cn.ray.gateway.rpc.IActivityBooth",
"sayHi",
"/wg/activity/sayHi",
HttpRequestType.GET,
"java.lang.String");

HttpStatement httpStatement02 = new HttpStatement(
"api-gateway-test",
"cn.ray.gateway.rpc.IActivityBooth",
"insert",
"/wg/activity/insert",
HttpRequestType.POST,
"cn.ray.gateway.rpc.dto.XReq");

configuration.addMapper(httpStatement01);
configuration.addMapper(httpStatement02);

// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);

// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);

Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();

if (null == channel) throw new RuntimeException("netty server start error channel is null");

while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());

Thread.sleep(Long.MAX_VALUE);
}
}

测试结果

GET

6-测试-1

POST

6-测试-2

思考

  • 由于整个网关会话由 Configuration 作为配置贯穿,可以在 Configuration 中新建创建执行器的方法,后续可以通过修改这个方法以得到不同的 Executor 。
  • 对于 ResponseParser ,可以为对应请求结果封装的逻辑创建一个静态方法,后续需要时直接调用即可。