扩展序列化算法 序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new byte [bodyLength];byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (body));Message message = (Message) in.readObject();message.setSequenceId(sequenceId); ByteArrayOutputStream out = new ByteArrayOutputStream ();new ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); }
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
后续可更改为策略模式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 enum Algorithm implements Serializer { JDK{ @Override public <T> T deSerialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bytes)); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("反序列化失败" ); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(object); return bos.toByteArray(); } catch (IOException e) { throw new RuntimeException ("序列化失败" ); } } }, JSON { @Override public <T> T deSerialize (Class<T> clazz, byte [] bytes) { String json = new String (bytes,StandardCharsets.UTF_8); return new Gson ().fromJson(json,clazz); } @Override public <T> byte [] serialize(T object) { String json = new Gson ().toJson(object); return json.getBytes(StandardCharsets.UTF_8); } } }
增加配置类和配置文件,实现序列化的动态配置
SerializeConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public abstract class SerializeConfig { static Properties properties; static { try (InputStream in = SerializeConfig.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError (e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.JDK; } else { return Serializer.Algorithm.valueOf(value); } } }
application.properties
1 serializer.algorithm = JSON
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @ChannelHandler .Sharablepublic class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { private final Logger logger = LoggerFactory.getLogger(MessageCodecSharable.class); @Override protected void encode (ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception { ByteBuf byteBuf = channelHandlerContext.alloc().buffer(); byteBuf.writeBytes("Ray" .getBytes()); byteBuf.writeByte(1 ); byteBuf.writeByte(SerializeConfig.getSerializerAlgorithm().ordinal()); byteBuf.writeByte(message.getMessageType()); byteBuf.writeInt(message.getSequenceId()); byteBuf.writeByte(0xff ); byteBuf.writeByte(0xff ); byte [] bytes = SerializeConfig.getSerializerAlgorithm().serialize(message); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); list.add(byteBuf); } @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { String magicNum = byteBuf.readBytes(3 ).toString(StandardCharsets.UTF_8); byte version = byteBuf.readByte(); byte serializerType = byteBuf.readByte(); byte messageType = byteBuf.readByte(); int sequenceId = byteBuf.readInt(); byteBuf.readBytes(2 ); int length = byteBuf.readInt(); byte [] bytes = new byte [length]; byteBuf.readBytes(bytes,0 ,length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType]; Class<?> messageClass = Message.getMessageClass(messageType); Object message = algorithm.deSerialize(messageClass, bytes); logger.info("magicNum: {}, version: {}, serializerType: {}, messageType: {}, sequenceId: {}, contentLength: {}" , magicNum, version, serializerType, messageType, sequenceId, length); logger.info("content: {}" , message); list.add(message); } }
参数调优 CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 参数
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class TestConnectionTimeout { private static final Logger logger = LoggerFactory.getLogger(TestConnectionTimeout.class); public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler ()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); logger.debug("connect timeout" ); } finally { group.shutdownGracefully(); } } }
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
本质上是通过 EventLoop 执行一个定时任务,在设定的超时时间后执行任务创建异常并通过Promise传递给主线程,如果成功连接则会取消该定时任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable () { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException ("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } }
SO_BACKLOG
属于 ServerSocketChannal 参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
sync queue - 半连接队列
大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog
指定,在 syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
accept queue - 全连接队列
其大小通过 /proc/sys/net/core/somaxconn
指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值)
来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private volatile int backlog = NetUtil.SOMAXCONN; }
ulimit -n
属于操作系统参数
表示一个进程可同时打开的文件描述符(fd)的数量
TCP_NODELAY
属于 SocketChannal 参数
默认false,开启 Nagle 算法(尽可能发送足够大的数据,小的数据延迟发送,等到足够大,比如达到MSS(最大段长度)才发送)
SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数,发送缓冲区
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上),接收缓冲区
ALLOCATOR
属于 SocketChannal 参数
用来分配 ByteBuf, ctx.alloc()
RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
对 io 数据的操作,直接内存比堆内存效率高
Handler内部对数据的处理可以分配堆内存或直接使用直接内存
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
简易 RPC 框架 准备工作 为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public abstract class Message implements Serializable { public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } public String getInterfaceName () { return interfaceName; } public void setInterfaceName (String interfaceName) { this .interfaceName = interfaceName; } public String getMethodName () { return methodName; } public void setMethodName (String methodName) { this .methodName = methodName; } public Class<?> getReturnType() { return returnType; } public void setReturnType (Class<?> returnType) { this .returnType = returnType; } public Class[] getParameterTypes() { return parameterTypes; } public void setParameterTypes (Class[] parameterTypes) { this .parameterTypes = parameterTypes; } public Object[] getParameterValue() { return parameterValue; } public void setParameterValue (Object[] parameterValue) { this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; public Object getReturnValue () { return returnValue; } public void setReturnValue (Object returnValue) { this .returnValue = returnValue; } public Exception getExceptionValue () { return exceptionValue; } public void setExceptionValue (Exception exceptionValue) { this .exceptionValue = exceptionValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } @Override public String toString () { return "RpcResponseMessage{" + "returnValue=" + returnValue + ", exceptionValue=" + exceptionValue + '}' ; } }
服务端的 Service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap <>(); static { try (InputStream in = ServicesFactory.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service" )) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError (e); } } public static <T> T getService (Class<T> interfaceClass) { return (T) map.get(interfaceClass); } }
相关配置 application.properties
1 2 serializer.algorithm = JDK com.netty.server.service.HelloService = com.netty.server.service.HelloServiceImpl
服务器 handler(RpcRequestMessageHandler) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage req) throws Exception { RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(req.getSequenceId()); try { System.out.println(req.getInterfaceName()); Object service = ServicesFactory.getService(Class.forName(req.getInterfaceName())); Method method = service.getClass().getMethod(req.getMethodName(), req.getParameterTypes()); Object result = method.invoke(service, req.getParameterValue()); response.setReturnValue(result); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(new Exception ("远程调用出错: " + e.getCause().getMessage())); } finally { ctx.writeAndFlush(response); } } public static void main (String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { RpcRequestMessage requestMessage = new RpcRequestMessage ( 1 , "com.netty.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"Ray" } ); Object service = ServicesFactory.getService(Class.forName(requestMessage.getInterfaceName())); Method method = service.getClass().getMethod(requestMessage.getMethodName(), requestMessage.getParameterTypes()); Object invoke = method.invoke(service, requestMessage.getParameterValue()); System.out.println(invoke); } }
客户端 handler(RpcResponseMessageHandler) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static Map<Integer, Promise<Object>> promises = new ConcurrentHashMap <>(); private Logger logger = LoggerFactory.getLogger(RpcResponseMessageHandler.class); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage response) throws Exception { Promise<Object> promise = promises.remove(response.getSequenceId()); if (promise!=null ) { Exception exceptionValue = response.getExceptionValue(); if (exceptionValue!=null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(response.getReturnValue()); } } } }
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class RpcServer { private static final Logger logger = LoggerFactory.getLogger(RpcServer.class); public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ProtocolFrameDecoder ()) .addLast(LOGGING_HANDLER) .addLast(MESSAGE_CODEC) .addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { logger.error("server error" ); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
客户端第一版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class RpcClient { private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ProtocolFrameDecoder ()) .addLast(LOGGING_HANDLER) .addLast(MESSAGE_CODEC) .addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("127.0.0.1" , 8080 ).sync().channel(); RpcRequestMessage requestMessage = new RpcRequestMessage ( 1 , "com.netty.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"Ray" } ); ChannelFuture future = channel.writeAndFlush(requestMessage); future.addListener(promise -> { if (!promise.isSuccess()){ Throwable cause = promise.cause(); logger.error("error" ,cause); } }); channel.closeFuture().sync(); } catch (InterruptedException e) { logger.error("client error" ); } finally { group.shutdownGracefully(); } } }
客户端第二版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public class RpcClientManager { private static Channel channel = null ; private static Logger logger = LoggerFactory.getLogger(RpcClientManager.class); private static final Object LOCK = new Object (); public static Channel getChannel () { if (channel!=null ) { return channel; } synchronized (LOCK) { if (channel!=null ) { return channel; } initChannel(); return channel; } } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new ProtocolFrameDecoder ()) .addLast(LOGGING_HANDLER) .addLast(MESSAGE_CODEC) .addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("127.0.0.1" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (InterruptedException e) { logger.error("client error" ); } } public static void main (String[] args) { HelloService proxyInstance = getProxyInstance(HelloService.class); System.out.println(proxyInstance.sayHello("Ray" )); System.out.println(proxyInstance.sayHello("JOJO" )); System.out.println(proxyInstance.sayNo("Ray" )); } public static <T> T getProxyInstance (Class<T> clazz) { ClassLoader classLoader = clazz.getClassLoader(); Class<?>[] interfaces = new Class []{clazz}; Object proxyInstance = Proxy.newProxyInstance(classLoader, interfaces, ((proxy, method, args) -> { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage requestMessage = new RpcRequestMessage ( sequenceId, clazz.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(requestMessage); DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.promises.put(sequenceId,promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } })); return (T) proxyInstance; } }