第三章 分治处理会话流程

通过分治重构会话流程,细化网络协议、会话、绑定、映射和泛化调用的关系。

设计

3-设计.png

因为我们要继续扩展新的功能,而上一章功能实现的逻辑分层结构不易于扩展,所以需要进行重构处理。重构点包括;

  1. 拆分session包下会话业务逻辑部分和网络通信部分,做功能实现的隔离。
  2. 拆分 bind 包下代理和对RPC接口的泛化调用,这里你可以把RPC当做一种可连接资源,而这种连接资源也不是只有 RPC 一种,同时也因为 RPC 的泛化调用是一种通用方法,井不需要与逻辑绑定,所以它也应该被拆分出来。

实现

Mapping 模块:

  • MapperMethod:switch处理不同的请求,调用GatewaySession中的方法执行具体内容(目前只提供了 get 接口方法实现)
  • MapperProxy:延续之前的处理方式,这块是一个Interceptor,用来调用执行后续方法,从直接调用 dubbo 变成了调用 MapperedMethod,也就是根据请求类型从 GatewaySession 执行对应的方法
  • MapperProxyFactory:同之前的 GenericReferenceProxyFactory 代理对象工厂,利用 cglib 回调对象 MapperProxy 生成 IGenericReference 代理对象
  • MapperRegistry:添加映射,注册对应 uri 的 MapperProxyFactory ,并提供通过MapperProxyFactory.new 生成 IGenericReference 代理对象并返回的方法
  • HttpRequestType:枚举类,对应不同的请求类型
  • HttpStatement:一个HTTP请求的封装类,包含对应的应用名称、接口名、方法名、uri 以及对应的请求类型

Session模块:

  • Configuration:通过 Map 缓存添加、获取 HttpStatement:addHttpStatement 、 getHttpStatement ;并通过 addMapper 由 MapperRegistry 添加泛化调用工厂,并在之后通过 getMapper 由 MapperRegistry 中Map缓存的对应 uri 的泛化调用工厂生成并获取泛化调用代理对象
  • GatewaySession:网关会话接口,提供get方法、getMapper方法、getConfiguration方法供不同的协议或服务实现
  • GatewaySessionFactory:泛化调用会话工厂接口
  • DefaultGatewaySession:实现了GatewaySession 网关会话接口,这里主要是用于实现 RPC 泛化调用
  • DefaultGatewaySessionFactory:泛化调用会话工厂,用于在 Netty Handler 中创建 DefaultGatewaySession ,开启网关会话,并执行对应的映射调用

Socket模块:

  • 这一块就是沿袭之前的netty处理,只是把其中调用服务的逻辑拆分出去,依赖 gatewaySession 去调用

3-实现

  • 整个工程结构分治设计包括;bind(绑定)、mapping(映射)、session(会话)、socket(网络)这4大块。
  • 整个调用流程以 socket 网络处理协议转换后,获取会话 session, 从 session 中得到映射器对象,并根据 HTTP 请求的 GET/POST 调用到不同的方法上。
  • 这样的拆分可以方便在网络层做鉴权、限流、熔断、鉴权等功能,它们可以与 session 会话逻辑拆分。
  • 另外一层关系是 bind 绑定层中,把 RPC 的泛化调用拆分出来,这里可以把 RPC 当成一种资源来看待,拆分后更有易于后续的池化、扩展和管理。

MapperMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.ray.gateway.bind;

import cn.ray.gateway.mapping.HttpRequestType;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;

import java.lang.reflect.Method;

/**
* @author Ray
* @date 2023/5/13 17:04
* @description
*/
public class MapperMethod {

private String uri;

private final HttpRequestType httpRequestType;

public MapperMethod(String uri, Method method, Configuration configuration) {
this.uri = uri;
this.httpRequestType = configuration.getHttpStatement(uri).getHttpRequestType();
}

public Object execute(GatewaySession session, Object args) {
Object result = null;
switch (httpRequestType) {
case GET:
result = session.get(uri, args);
break;
case POST:
break;
case PUT:
break;
case DELETE:
break;
default:
throw new RuntimeException("Unknown execution method for: " + httpRequestType);
}
return result;
}
}

不同类型的 HTTP 请求做不同的逻辑处理

MapperProxy 映射代理调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.bind;

import cn.ray.gateway.session.GatewaySession;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

/**
* @author Ray
* @date 2023/5/12 17:48
* @description 映射代理调用
*/
public class MapperProxy implements MethodInterceptor {

private GatewaySession gatewaySession;

private final String uri;

public MapperProxy(GatewaySession gatewaySession, String uri) {
this.gatewaySession = gatewaySession;
this.uri = uri;
}

@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
MapperMethod linkMethod = new MapperMethod(uri, method, gatewaySession.getConfiguration());
return linkMethod.execute(gatewaySession, objects);
}
}

简化映射器代理,将原有的 RPC 泛化调用拆分。这个类中只完成代理部分,并调用映射器方法完成逻辑处理,达到处理不同服务(HTTP/RPC/其它)的调用方法。

GatewaySession 处理网关 HTTP 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package cn.ray.gateway.session;

import cn.ray.gateway.bind.IGenericReference;

/**
* @author Ray
* @date 2023/5/13 17:01
* @description 处理网关 HTTP 请求
*/
public interface GatewaySession {

Object get(String uri, Object parameter);

IGenericReference getMapper(String uri);

Configuration getConfiguration();

}

DefaultGatewaySession 默认(RPC)网关会话实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package cn.ray.gateway.session.defaults;

import cn.ray.gateway.bind.IGenericReference;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.GatewaySession;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.rpc.service.GenericService;

/**
* @author Ray
* @date 2023/5/13 17:56
* @description 默认网关会话实现类
*/
public class DefaultGatewaySession implements GatewaySession {

private Configuration configuration;

public DefaultGatewaySession(Configuration configuration) {
this.configuration = configuration;
}

@Override
public Object get(String uri, Object parameter) {
// TODO 后续放在执行器中进行处理

// 配置信息
HttpStatement httpStatement = configuration.getHttpStatement(uri);
String application = httpStatement.getApplication();
String interfaceName = httpStatement.getInterfaceName();

// 获取基础服务(创建成本较高,内存存放获取)
ApplicationConfig applicationConfig = configuration.getApplicationConfig(application);
RegistryConfig registryConfig = configuration.getRegistryConfig(application);
ReferenceConfig<GenericService> referenceConfig = configuration.getReferenceConfig(interfaceName);
// 构建Dubbo服务
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(applicationConfig).registry(registryConfig).registry(registryConfig).start();
// 获取泛化调用服务
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
GenericService genericService = cache.get(referenceConfig);

return genericService.$invoke(httpStatement.getMethodName(), new String[]{"java.lang.String"}, new Object[]{"Ray"});
}

@Override
public IGenericReference getMapper(String uri) {
return configuration.getMapper(uri, this);
}

@Override
public Configuration getConfiguration() {
return configuration;
}
}

GatewayServerHandler 会话服务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package cn.ray.gateway.socket.handlers;

import cn.ray.gateway.bind.IGenericReference;
import cn.ray.gateway.session.GatewaySession;
import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;
import cn.ray.gateway.socket.BaseHandler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Ray
* @date 2023/5/11 15:55
* @description 会话服务处理器
*/
public class GatewayServerHandler extends BaseHandler<FullHttpRequest> {

private final Logger logger = LoggerFactory.getLogger(GatewayServerHandler.class);

private final DefaultGatewaySessionFactory gatewaySessionFactory;

public GatewayServerHandler(DefaultGatewaySessionFactory gatewaySessionFactory) {
this.gatewaySessionFactory = gatewaySessionFactory;
}

@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 ===> uri: {} , method: {}",request.uri(),request.method());

// 返回信息控制:简单处理
String methodName = request.uri().substring(1);
if (methodName.equals("favicon.ico")) return;
// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);

// 服务泛化调用
GatewaySession gatewaySession = gatewaySessionFactory.openSession();
IGenericReference reference = gatewaySession.getMapper(request.uri());
String result = reference.$invoke("test") + " " + System.currentTimeMillis();

// 设置回写数据
response.content().writeBytes(JSON.toJSONBytes(result, SerializerFeature.PrettyFormat));
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置长连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");

channel.writeAndFlush(response);
}
}

测试

RPC 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.interfaces;

import cn.ray.gateway.rpc.IActivityBooth;
import cn.ray.gateway.rpc.dto.XReq;
import com.alibaba.fastjson.JSON;
import org.apache.dubbo.config.annotation.Service;

/**
* @author Ray
* @date 2023/5/12 22:36
* @description
*/
@Service(version = "1.0.0")
public class ActivityBooth implements IActivityBooth {

@Override
public String sayHi(String str) {
return "hi " + str + " by api-gateway-test";
}

@Override
public String insert(XReq req) {
return "hi " + JSON.toJSONString(req) + " by api-gateway-test";
}

@Override
public String test(String str, XReq req) {
return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test";
}
}

启动网关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.ray.gateway.test;

import cn.ray.gateway.mapping.HttpRequestType;
import cn.ray.gateway.mapping.HttpStatement;
import cn.ray.gateway.session.Configuration;
import cn.ray.gateway.session.defaults.DefaultGatewaySessionFactory;
import cn.ray.gateway.socket.GatewaySocketServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Ray
* @date 2023/5/11 16:26
* @description
*/
public class ApiTest {

private final Logger logger = LoggerFactory.getLogger(ApiTest.class);

@Test
public void test_GenericReference() throws InterruptedException, ExecutionException {
// 1. 创建配置信息加载注册
Configuration configuration = new Configuration();
HttpStatement httpStatement = new HttpStatement(
"api-gateway-test",
"cn.ray.gateway.rpc.IActivityBooth",
"sayHi",
"/wg/activity/sayHi",
HttpRequestType.GET);
configuration.addMapper(httpStatement);

// 2. 基于配置构建会话工厂
DefaultGatewaySessionFactory gatewaySessionFactory = new DefaultGatewaySessionFactory(configuration);

// 3. 创建启动网关网络服务
GatewaySocketServer server = new GatewaySocketServer(gatewaySessionFactory);

Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();

if (null == channel) throw new RuntimeException("netty server start error channel is null");

while (!channel.isActive()) {
logger.info("netty server gateway start Ing ...");
Thread.sleep(500);
}
logger.info("netty server gateway start Done! {}", channel.localAddress());

Thread.sleep(Long.MAX_VALUE);
}
}

测试结果

3-测试-1

3-测试-2