运用 Netty 对 HTTP 请求信息的协议转换,构建网关会话服务,简单响应 HTTP 请求信息到页面。
设计
HTTP 请求到 API 网关,网关再去调用到对应的RPC服务,那么这样一个流程一次请求,可以把它抽象为是做了一次会话操作。
- 之所以称之为会话,是因为一次 HTTP 请求,就要完成;建立连接、协议转换、方法映射、泛化调用、返回结果等一系列操作。而在这些操作过程中的各类行为处理。
- 此外之所以要单独创建出一个 api-gateway-core 的工程,是因为我们要把这个工程独立于各种容器,例如它并不是直接与 SpringBoot 一起开发,那样会让组件失去灵活性。它的存在更应该像是一个 ORM 框架,可以独立使用,谁也都可以结合。
实现
BaseHandler 数据处理器基础类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package cn.ray.gateway.session;
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;
public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, T t) throws Exception { session(channelHandlerContext,channelHandlerContext.channel(),t); }
protected abstract void session(ChannelHandlerContext ctx, final Channel channel, T request); }
|
这是一个Java代码片段,表示一个抽象类BaseHandler
,它扩展了SimpleChannelInboundHandler
类。
这个类的目的是处理类型为T
的传入网络消息(即请求)并对其进行处理。在该类中,channelRead0
方法被覆盖实现,该方法在每次接收到消息时被调用。它将接收到的消息t
传递给一个名为session
的抽象方法,该方法在子类中实现。
该抽象方法session
接收一个ChannelHandlerContext
对象和一个Channel
对象,这些对象表示了通信的上下文和通信通道。此外,它还接收一个类型为T
的请求对象,该对象是从网络上接收到的实际请求。
因为BaseHandler
是一个抽象类,所以不能直接实例化它。相反,开发者应该创建一个继承自该类的具体子类,并实现session
方法来处理具体的业务逻辑。
SessionServerHandler 会话服务处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package cn.ray.gateway.session.handlers;
import cn.ray.gateway.session.BaseHandler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class SessionServerHandler extends BaseHandler<FullHttpRequest> {
private final Logger logger = LoggerFactory.getLogger(SessionServerHandler.class);
@Override protected void session(ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) { logger.info("网关接收请求 ===> uri: {} , method: {}",request.uri(),request.method());
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.content().writeBytes(JSON.toJSONBytes("访问路径被网关管理了 URI: " + request.uri(), SerializerFeature.PrettyFormat)); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8"); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*"); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE"); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
channel.writeAndFlush(response); } }
|
这段Java代码是一个实现了BaseHandler
抽象类的具体子类SessionServerHandler
。
该类的作用是处理类型为FullHttpRequest
的HTTP请求,它覆盖了BaseHandler
中的抽象方法session
,实现了具体的业务逻辑。
在session
方法中,首先使用Logger打印了请求的URI和请求方法。接下来,构造了一个HTTP响应,并对响应进行了相应的设置。
具体来说,响应的状态行设置为HTTP/1.1 200 OK;响应的内容设置为一个JSON字符串,表示当前的访问路径被网关管理了;响应的头部信息中设置了内容类型为application/json;charset=UTF-8,响应体的长度,配置了长连接,以及配置了跨域访问。
最后,使用channel.writeAndFlush
方法将响应写入通信通道并刷新缓冲区,以便发送到客户端。
该类的实现方式符合Netty框架的设计思想,通过继承BaseHandler
抽象类,实现了基于Netty的高性能网络编程。
SessionChannelInitializer 会话channel初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package cn.ray.gateway.session;
import cn.ray.gateway.session.handlers.SessionServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder;
public class SessionChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpRequestDecoder()); pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpObjectAggregator(1024*1024));
pipeline.addLast(new SessionServerHandler()); } }
|
SessionServer 网关会话服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package cn.ray.gateway.session;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.util.concurrent.Callable;
public class SessionServer implements Callable<Channel> {
private final Logger logger = LoggerFactory.getLogger(SessionServer.class);
private final EventLoopGroup boss = new NioEventLoopGroup(1);
private final EventLoopGroup worker = new NioEventLoopGroup();
private Channel channel;
@Override public Channel call() throws Exception { ChannelFuture channelFuture = null;
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childHandler(new SessionChannelInitializer()); channelFuture = bootstrap.bind(new InetSocketAddress(7397)) .syncUninterruptibly(); this.channel = channelFuture.channel(); } catch (Exception e) { logger.error("socket server start error.", e); } finally { if (null!=channelFuture && channelFuture.isSuccess()) { logger.info("socket server start done."); } else { logger.error("socket server start error."); } } return channel; } }
|
这段Java代码定义了一个名为SessionServer
的类,它实现了Callable<Channel>
接口,用于在后台线程中启动Netty服务器,并返回已绑定的通道。
在该类中,定义了一个Logger对象logger
,用于记录日志信息。
该类中还定义了两个EventLoopGroup
对象,分别为boss
和worker
,用于处理服务器端的IO事件和业务逻辑。
在call()
方法中,首先定义了一个ChannelFuture
对象channelFuture
,用于表示服务器启动的结果。接下来,创建了一个ServerBootstrap
对象,用于启动Netty服务器,并进行相应的配置。其中,指定了boss
和worker
线程组、使用NioServerSocketChannel
作为服务器端的Channel类型、设置SO_BACKLOG选项为128,最后将SessionChannelInitializer
作为子处理器添加到ServerBootstrap
中。
通过调用bootstrap.bind(new InetSocketAddress(7397)).syncUninterruptibly()
方法,绑定了服务器监听的地址和端口,并阻塞当前线程直到服务器启动完成。启动成功后,将已绑定的通道赋值给成员变量channel
。
在finally
块中,根据channelFuture
的结果打印相应的日志信息,记录服务器启动的成功或失败。
最后,call()
方法返回已绑定的通道channel
,以便后续使用。
测试
ApiTest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package cn.ray.gateway.test;
import cn.ray.gateway.session.SessionServer; import io.netty.channel.Channel; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future;
public class ApiTest {
private final Logger logger = LoggerFactory.getLogger(ApiTest.class);
@Test public void test() throws ExecutionException, InterruptedException { SessionServer server = new SessionServer();
Future<Channel> future = Executors.newFixedThreadPool(2).submit(server);
Channel channel = future.get();
if (null==channel) { logger.error("netty server start error, because channel is null!"); }
while (!channel.isActive()) { logger.info("NettyServer 启动服务..."); Thread.sleep(500); } logger.info("NettyServer 启动服务成功 {}",channel.localAddress());
Thread.sleep(Long.MAX_VALUE); } }
|
这段代码是一个测试方法 test()
,主要流程如下:
- 创建一个
SessionServer
的实例 server
,它实现了 Callable 接口,表示可以通过 ExecutorService.submit(Callable<T>)
方法将 server
对象提交到一个线程池中执行,执行结果可以通过 Future 对象获取。
- 创建一个线程池,大小为2,通过
Executors.newFixedThreadPool(2)
创建。
- 将
server
对象提交到线程池中执行,得到一个 Future<Channel>
对象 future
。
- 调用
future.get()
方法阻塞当前线程,等待执行结果,此时 server
对象的 call()
方法在另一个线程中执行,完成后将返回一个 Channel
对象,表示绑定的服务器通道。
- 获取到
Channel
对象 channel
后,判断它是否为 null
,如果是,则输出错误日志。
- 如果
channel
不为 null
,则等待它变为激活状态,即可接受客户端连接。
- 输出启动成功的日志信息,包括本地地址。
- 最后,阻塞当前线程,等待程序退出,因为
Thread.sleep(Long.MAX_VALUE)
会一直休眠,直到线程被中断或程序结束。
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| 18:56:41.462 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework 18:56:41.469 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16 18:56:41.484 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 18:56:41.484 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 18:56:41.505 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false 18:56:41.506 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8 18:56:41.507 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.storeFence: available 18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 18:56:41.508 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available 18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true 18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9 18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available 18:56:41.509 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available 18:56:41.510 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/vx/_jdzk2c53_v080k5ww8s7_780000gn/T (java.io.tmpdir) 18:56:41.510 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model) 18:56:41.511 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: MacOS 18:56:41.512 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3817865216 bytes 18:56:41.512 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1 18:56:41.516 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available 18:56:41.516 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false 18:56:41.518 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false 18:56:41.518 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512 18:56:41.526 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available 18:56:41.562 [pool-1-thread-1] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 8415 (auto-detected) 18:56:41.565 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false 18:56:41.565 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false 18:56:41.567 [pool-1-thread-1] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0) 18:56:41.567 [pool-1-thread-1] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128 18:56:41.572 [pool-1-thread-1] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: fc:e2:6c:ff:fe:14:27:ef (auto-detected) 18:56:41.581 [pool-1-thread-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple 18:56:41.581 [pool-1-thread-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 9 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 4194304 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: false 18:56:41.605 [pool-1-thread-1] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023 18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled 18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0 18:56:41.615 [pool-1-thread-1] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384 18:56:41.627 [pool-1-thread-1] INFO cn.ray.gateway.session.SessionServer - socket server start done. 18:56:41.628 [main] INFO cn.ray.gateway.test.ApiTest - NettyServer 启动服务成功 /0:0:0:0:0:0:0:0:7397 18:56:49.433 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true 18:56:49.434 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true 18:56:49.434 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@4e12148f 18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096 18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8 18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32 18:56:49.497 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false 18:56:49.511 [nioEventLoopGroup-3-1] INFO cn.ray.gateway.session.handlers.SessionServerHandler - 网关接收请求 ===> uri: /query , method: GET
|