第一章 HTTP请求会话协议处理

运用 Netty 对 HTTP 请求信息的协议转换,构建网关会话服务,简单响应 HTTP 请求信息到页面。

设计

1-设计

HTTP 请求到 API 网关,网关再去调用到对应的RPC服务,那么这样一个流程一次请求,可以把它抽象为是做了一次会话操作。

  • 之所以称之为会话,是因为一次 HTTP 请求,就要完成;建立连接、协议转换、方法映射、泛化调用、返回结果等一系列操作。而在这些操作过程中的各类行为处理。
  • 此外之所以要单独创建出一个 api-gateway-core 的工程,是因为我们要把这个工程独立于各种容器,例如它并不是直接与 SpringBoot 一起开发,那样会让组件失去灵活性。它的存在更应该像是一个 ORM 框架,可以独立使用,谁也都可以结合。

实现

1-实现

BaseHandler 数据处理器基础类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.session;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
* @author Ray
* @date 2023/5/11 15:51
* @description 数据处理器基础类
*/
public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {

/**
* SimpleChannelInboundHandler是Netty提供的一个专门用于处理传入消息的抽象类
* 它继承自ChannelInboundHandlerAdapter
* 并提供了一些方便的功能,如自动释放接收到的消息和处理异常等。
* 通过继承SimpleChannelInboundHandler,我们可以重写它的channelRead0方法来处理接收到的传入消息
* 而不必关心消息的释放和异常处理等底层细节
* 同时,该类还提供了一些钩子方法(如channelActive和channelInactive)
* 可以在处理消息的同时进行其他操作,如打印日志、更新状态等
* 因此,继承SimpleChannelInboundHandler的用意是为了方便地处理传入的消息,并提高代码的可读性和可维护性
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, T t) throws Exception {
session(channelHandlerContext,channelHandlerContext.channel(),t);
}

protected abstract void session(ChannelHandlerContext ctx, final Channel channel, T request);
}

这是一个Java代码片段,表示一个抽象类BaseHandler,它扩展了SimpleChannelInboundHandler类。

这个类的目的是处理类型为T的传入网络消息(即请求)并对其进行处理。在该类中,channelRead0方法被覆盖实现,该方法在每次接收到消息时被调用。它将接收到的消息t传递给一个名为session的抽象方法,该方法在子类中实现。

该抽象方法session接收一个ChannelHandlerContext对象和一个Channel对象,这些对象表示了通信的上下文和通信通道。此外,它还接收一个类型为T的请求对象,该对象是从网络上接收到的实际请求。

因为BaseHandler是一个抽象类,所以不能直接实例化它。相反,开发者应该创建一个继承自该类的具体子类,并实现session方法来处理具体的业务逻辑。

SessionServerHandler 会话服务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.ray.gateway.session.handlers;

import cn.ray.gateway.session.BaseHandler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Ray
* @date 2023/5/11 15:55
* @description 会话服务处理器
*/
public class SessionServerHandler extends BaseHandler<FullHttpRequest> {

private final Logger logger = LoggerFactory.getLogger(SessionServerHandler.class);

@Override
protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) {
logger.info("网关接收请求 ===> uri: {} , method: {}",request.uri(),request.method());

// 返回信息处理
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// 返回信息控制
response.content().writeBytes(JSON.toJSONBytes("访问路径被网关管理了 URI: " + request.uri(), SerializerFeature.PrettyFormat));
// 头部信息设置
HttpHeaders heads = response.headers();
// 返回内容类型
heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8");
// 响应体的长度
heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 配置长连接
heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
// 配置跨域访问
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE");
heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");

channel.writeAndFlush(response);
}
}

这段Java代码是一个实现了BaseHandler抽象类的具体子类SessionServerHandler

该类的作用是处理类型为FullHttpRequest的HTTP请求,它覆盖了BaseHandler中的抽象方法session,实现了具体的业务逻辑。

session方法中,首先使用Logger打印了请求的URI和请求方法。接下来,构造了一个HTTP响应,并对响应进行了相应的设置。

具体来说,响应的状态行设置为HTTP/1.1 200 OK;响应的内容设置为一个JSON字符串,表示当前的访问路径被网关管理了;响应的头部信息中设置了内容类型为application/json;charset=UTF-8,响应体的长度,配置了长连接,以及配置了跨域访问。

最后,使用channel.writeAndFlush方法将响应写入通信通道并刷新缓冲区,以便发送到客户端。

该类的实现方式符合Netty框架的设计思想,通过继承BaseHandler抽象类,实现了基于Netty的高性能网络编程。

SessionChannelInitializer 会话channel初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.ray.gateway.session;

import cn.ray.gateway.session.handlers.SessionServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

/**
* @author Ray
* @date 2023/5/11 16:09
* @description 会话 channel 初始化,添加处理器
*/
public class SessionChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// HttpRequestDecoder、HttpResponseEncoder 是 Netty 本身提供的HTTP编解器
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpResponseEncoder());

// HttpObjectAggregator 用于处理除了 GET 请求外的 POST 请求时候的对象信息,否则只有上面的信息,是拿不到 POST 请求的
pipeline.addLast(new HttpObjectAggregator(1024*1024));

// SessionServerHandler 是我们自己实现的会话处理,用于拿到HTTP网络请求后,处理我们自己需要的业务逻辑
pipeline.addLast(new SessionServerHandler());
}
}

SessionServer 网关会话服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.ray.gateway.session;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Callable;

/**
* @author Ray
* @date 2023/5/11 16:12
* @description 网关会话服务
*/
public class SessionServer implements Callable<Channel> {

private final Logger logger = LoggerFactory.getLogger(SessionServer.class);

private final EventLoopGroup boss = new NioEventLoopGroup(1);

private final EventLoopGroup worker = new NioEventLoopGroup();

private Channel channel;

@Override
public Channel call() throws Exception {
ChannelFuture channelFuture = null;

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
// 设置服务器Socket的连接队列大小为128,即服务器可以同时接受128个等待处理的客户端连接请求。
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new SessionChannelInitializer());
channelFuture = bootstrap.bind(new InetSocketAddress(7397))
.syncUninterruptibly();
// 阻塞当前线程直到服务器启动完成
// syncUninterruptibly()方法不会响应线程中断
// 因此即使在等待操作完成的过程中,线程被中断也不会中止等待
// 如果需要能够响应线程中断,可以使用.sync()方法,它会抛出InterruptedException异常,需要进行相应的处理。
this.channel = channelFuture.channel();
} catch (Exception e) {
logger.error("socket server start error.", e);
} finally {
if (null!=channelFuture && channelFuture.isSuccess()) {
logger.info("socket server start done.");
} else {
logger.error("socket server start error.");
}
}
return channel;
}
}

这段Java代码定义了一个名为SessionServer的类,它实现了Callable<Channel>接口,用于在后台线程中启动Netty服务器,并返回已绑定的通道。

在该类中,定义了一个Logger对象logger,用于记录日志信息。

该类中还定义了两个EventLoopGroup对象,分别为bossworker,用于处理服务器端的IO事件和业务逻辑。

call()方法中,首先定义了一个ChannelFuture对象channelFuture,用于表示服务器启动的结果。接下来,创建了一个ServerBootstrap对象,用于启动Netty服务器,并进行相应的配置。其中,指定了bossworker线程组、使用NioServerSocketChannel作为服务器端的Channel类型、设置SO_BACKLOG选项为128,最后将SessionChannelInitializer作为子处理器添加到ServerBootstrap中。

通过调用bootstrap.bind(new InetSocketAddress(7397)).syncUninterruptibly()方法,绑定了服务器监听的地址和端口,并阻塞当前线程直到服务器启动完成。启动成功后,将已绑定的通道赋值给成员变量channel

finally块中,根据channelFuture的结果打印相应的日志信息,记录服务器启动的成功或失败。

最后,call()方法返回已绑定的通道channel,以便后续使用。

测试

ApiTest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.ray.gateway.test;

import cn.ray.gateway.session.SessionServer;
import io.netty.channel.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Ray
* @date 2023/5/11 16:26
* @description
*/
public class ApiTest {

private final Logger logger = LoggerFactory.getLogger(ApiTest.class);

@Test
public void test() throws ExecutionException, InterruptedException {
SessionServer server = new SessionServer();

Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);

Channel channel = future.get();

if (null==channel) {
logger.error("netty server start error, because channel is null!");
}

while (!channel.isActive()) {
logger.info("NettyServer 启动服务...");
Thread.sleep(500);
}
logger.info("NettyServer 启动服务成功 {}",channel.localAddress());

Thread.sleep(Long.MAX_VALUE);
}
}

这段代码是一个测试方法 test(),主要流程如下:

  1. 创建一个 SessionServer 的实例 server,它实现了 Callable 接口,表示可以通过 ExecutorService.submit(Callable<T>) 方法将 server 对象提交到一个线程池中执行,执行结果可以通过 Future 对象获取。
  2. 创建一个线程池,大小为2,通过 Executors.newFixedThreadPool(2) 创建。
  3. server 对象提交到线程池中执行,得到一个 Future<Channel> 对象 future
  4. 调用 future.get() 方法阻塞当前线程,等待执行结果,此时 server 对象的 call() 方法在另一个线程中执行,完成后将返回一个 Channel 对象,表示绑定的服务器通道。
  5. 获取到 Channel 对象 channel 后,判断它是否为 null,如果是,则输出错误日志。
  6. 如果 channel 不为 null,则等待它变为激活状态,即可接受客户端连接。
  7. 输出启动成功的日志信息,包括本地地址。
  8. 最后,阻塞当前线程,等待程序退出,因为 Thread.sleep(Long.MAX_VALUE) 会一直休眠,直到线程被中断或程序结束。

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
18:56:41.462 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
18:56:41.469 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
18:56:41.484 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
18:56:41.484 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
18:56:41.505 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
18:56:41.506 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
18:56:41.507 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available
18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available
18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
18:56:41.510 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/vx/_jdzk2c53_v080k5ww8s7_780000gn/T (java.io.tmpdir)
18:56:41.510 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
18:56:41.511 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: MacOS
18:56:41.512 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3817865216 bytes
18:56:41.512 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
18:56:41.516 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
18:56:41.516 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
18:56:41.518 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
18:56:41.518 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
18:56:41.526 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
18:56:41.562 [pool-1-thread-1] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 8415 (auto-detected)
18:56:41.565 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
18:56:41.565 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
18:56:41.567 [pool-1-thread-1] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
18:56:41.567 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
18:56:41.572 [pool-1-thread-1] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: fc:e2:6c:ff:fe:14:27:ef (auto-detected)
18:56:41.581 [pool-1-thread-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
18:56:41.581 [pool-1-thread-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false
18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
18:56:41.627 [pool-1-thread-1] INFO cn.ray.gateway.session.SessionServer - socket server start done.
18:56:41.628 [main] INFO cn.ray.gateway.test.ApiTest - NettyServer 启动服务成功 /0:0:0:0:0:0:0:0:7397
18:56:49.433 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
18:56:49.434 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
18:56:49.434 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@4e12148f
18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
18:56:49.511 [nioEventLoopGroup-3-1] INFO cn.ray.gateway.session.handlers.SessionServerHandler - 网关接收请求 ===> uri: /query , method: GET

1-测试