Netty优化

扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

1
2
3
4
5
6
7
8
9
10
11
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

1
2
3
4
5
6
7
8
9
public interface Serializer {

// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);

// 序列化方法
<T> byte[] serialize(T object);

}

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

后续可更改为策略模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
enum Algorithm implements Serializer {
JDK{
@Override
public <T> T deSerialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败");
}
}

@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败");
}
}
},

JSON {
@Override
public <T> T deSerialize(Class<T> clazz, byte[] bytes) {
String json = new String(bytes,StandardCharsets.UTF_8);
return new Gson().fromJson(json,clazz);
}

@Override
public <T> byte[] serialize(T object) {
String json = new Gson().toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}

增加配置类和配置文件,实现序列化的动态配置

SerializeConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class SerializeConfig {
static Properties properties;

static {
try (InputStream in = SerializeConfig.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}

public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}

public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.JDK;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}

application.properties

1
serializer.algorithm = JSON

修改编解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {

private final Logger logger = LoggerFactory.getLogger(MessageCodecSharable.class);

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception {
ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
// 1. 3 字节 魔数
byteBuf.writeBytes("Ray".getBytes());
// 2. 1 字节 版本号
byteBuf.writeByte(1);
// 3. 1 字节 序列化算法 jdk 0 , json 1
byteBuf.writeByte(SerializeConfig.getSerializerAlgorithm().ordinal());
// 4. 1 字节 消息类型
byteBuf.writeByte(message.getMessageType());
// 5. 4 字节 序列号
byteBuf.writeInt(message.getSequenceId());
// 无意义,对齐填充
byteBuf.writeByte(0xff);
byteBuf.writeByte(0xff);
// 6. 序列化: 获取内容的字节数组
byte[] bytes = SerializeConfig.getSerializerAlgorithm().serialize(message);
// 7. 长度
byteBuf.writeInt(bytes.length);
// 8. 内容
byteBuf.writeBytes(bytes);
list.add(byteBuf);
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
String magicNum = byteBuf.readBytes(3).toString(StandardCharsets.UTF_8);
byte version = byteBuf.readByte();
byte serializerType = byteBuf.readByte(); // 0 JDK 1 Json
byte messageType = byteBuf.readByte();
int sequenceId = byteBuf.readInt();
byteBuf.readBytes(2);
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes,0,length);
// 确定反序列化算法
// TODO 后期使用 策略模式 代替枚举
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];
// 获取具体的消息类型
Class<?> messageClass = Message.getMessageClass(messageType);
Object message = algorithm.deSerialize(messageClass, bytes);
logger.info("magicNum: {}, version: {}, serializerType: {}, messageType: {}, sequenceId: {}, contentLength: {}", magicNum, version, serializerType, messageType, sequenceId, length);
logger.info("content: {}", message);
list.add(message);
}
}

参数调优

CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TestConnectionTimeout {

private static final Logger logger = LoggerFactory.getLogger(TestConnectionTimeout.class);

public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
// 1. 客户端通过 .option() 方法配置参数 给 SocketChannel 配置参数
// 2. 服务器端
// new ServerBootstrap().option() // 是给 ServerSocketChannel 配置参数
// new ServerBootstrap().childOption() // 给 SocketChannel 配置参数
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300) // 300ms超时
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync(); // 断点1
} catch (Exception e) {
e.printStackTrace();
logger.debug("connect timeout");
} finally {
group.shutdownGracefully();
}
}
}

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

本质上是通过 EventLoop 执行一个定时任务,在设定的超时时间后执行任务创建异常并通过Promise传递给主线程,如果成功连接则会取消该定时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// ...
}

SO_BACKLOG

  • 属于 ServerSocketChannal 参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sequenceDiagram

participant c as client
participant s as server
participant sq as syns queue
participant aq as accept queue

s ->> s : bind()
s ->> s : listen()
c ->> c : connect()
c ->> s : 1. SYN
Note left of c : SYN_SEND
s ->> sq : put
Note right of s : SYN_RCVD
s ->> c : 2. SYN + ACK
Note left of c : ESTABLISHED
c ->> s : 3. ACK
sq ->> aq : put
Note right of s : ESTABLISHED
aq -->> s :
s ->> s : accept()
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

1
2
3
4
5
6
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {

private volatile int backlog = NetUtil.SOMAXCONN;
// ...
}

SO_BACKLOG

ulimit -n

  • 属于操作系统参数

  • 表示一个进程可同时打开的文件描述符(fd)的数量

TCP_NODELAY

  • 属于 SocketChannal 参数
  • 默认false,开启 Nagle 算法(尽可能发送足够大的数据,小的数据延迟发送,等到足够大,比如达到MSS(最大段长度)才发送)

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数,发送缓冲区
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上),接收缓冲区

ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 对 io 数据的操作,直接内存比堆内存效率高
  • Handler内部对数据的处理可以分配堆内存或直接使用直接内存
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

简易 RPC 框架

准备工作

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class Message implements Serializable {

// 省略旧的代码

public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}

}

请求消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class RpcRequestMessage extends Message {

/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;

public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}

public String getInterfaceName() {
return interfaceName;
}

public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Class<?> getReturnType() {
return returnType;
}

public void setReturnType(Class<?> returnType) {
this.returnType = returnType;
}

public Class[] getParameterTypes() {
return parameterTypes;
}

public void setParameterTypes(Class[] parameterTypes) {
this.parameterTypes = parameterTypes;
}

public Object[] getParameterValue() {
return parameterValue;
}

public void setParameterValue(Object[] parameterValue) {
this.parameterValue = parameterValue;
}

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}

响应消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class RpcResponseMessage extends Message {

/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;

public Object getReturnValue() {
return returnValue;
}

public void setReturnValue(Object returnValue) {
this.returnValue = returnValue;
}

public Exception getExceptionValue() {
return exceptionValue;
}

public void setExceptionValue(Exception exceptionValue) {
this.exceptionValue = exceptionValue;
}

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}

@Override
public String toString() {
return "RpcResponseMessage{" +
"returnValue=" + returnValue +
", exceptionValue=" + exceptionValue +
'}';
}
}

服务端的 Service 获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ServicesFactory {

static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

static {
try (InputStream in = ServicesFactory.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
Class<?> interfaceClass = Class.forName(name);
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}

public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}

相关配置 application.properties

1
2
serializer.algorithm = JDK
com.netty.server.service.HelloService = com.netty.server.service.HelloServiceImpl

服务器 handler(RpcRequestMessageHandler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage req) throws Exception {
RpcResponseMessage response = new RpcResponseMessage();
response.setSequenceId(req.getSequenceId());
try {
System.out.println(req.getInterfaceName());
Object service = ServicesFactory.getService(Class.forName(req.getInterfaceName()));
Method method = service.getClass().getMethod(req.getMethodName(), req.getParameterTypes());
Object result = method.invoke(service, req.getParameterValue());
response.setReturnValue(result);
} catch (Exception e) {
e.printStackTrace();
response.setExceptionValue(new Exception("远程调用出错: " + e.getCause().getMessage()));
} finally {
ctx.writeAndFlush(response);
}
}

public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
RpcRequestMessage requestMessage = new RpcRequestMessage(
1,
"com.netty.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"Ray"}
);

Object service = ServicesFactory.getService(Class.forName(requestMessage.getInterfaceName()));
Method method = service.getClass().getMethod(requestMessage.getMethodName(), requestMessage.getParameterTypes());
Object invoke = method.invoke(service, requestMessage.getParameterValue());
System.out.println(invoke);
}
}

客户端 handler(RpcResponseMessageHandler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

/**
* Map<Integer, Promise<Object>> -> (序列号,用来接收结果的 promise 对象)
*/
public static Map<Integer, Promise<Object>> promises = new ConcurrentHashMap<>();

private Logger logger = LoggerFactory.getLogger(RpcResponseMessageHandler.class);

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage response) throws Exception {
// 获取对应序列号的 空 Promise,存入数据,用完即删,避免占用内存
Promise<Object> promise = promises.remove(response.getSequenceId());
if (promise!=null) {
Exception exceptionValue = response.getExceptionValue();
if (exceptionValue!=null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(response.getReturnValue());
}
}
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RpcServer {

private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler();
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ProtocolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_CODEC)
.addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
logger.error("server error");
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端第一版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RpcClient {

private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);

public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProtocolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_CODEC)
.addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
RpcRequestMessage requestMessage = new RpcRequestMessage(
1,
"com.netty.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"Ray"}
);
ChannelFuture future = channel.writeAndFlush(requestMessage);
future.addListener(promise -> {
if (!promise.isSuccess()){
Throwable cause = promise.cause();
logger.error("error",cause);
}
});
channel.closeFuture().sync();
} catch (InterruptedException e) {
logger.error("client error");
} finally {
group.shutdownGracefully();
}
}
}

客户端第二版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class RpcClientManager {

private static Channel channel = null;

private static Logger logger = LoggerFactory.getLogger(RpcClientManager.class);

private static final Object LOCK = new Object();

// 单例模式,双重校验锁
public static Channel getChannel() {
if (channel!=null) {
return channel;
}
synchronized (LOCK) {
if (channel!=null) {
return channel;
}
initChannel();
return channel;
}
}

private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProtocolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_CODEC)
.addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (InterruptedException e) {
logger.error("client error");
}
}

public static void main(String[] args) {
// RpcRequestMessage requestMessage = new RpcRequestMessage(
// 1,
// "com.netty.server.service.HelloService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"Ray"}
// );
// getChannel().writeAndFlush(requestMessage);
HelloService proxyInstance = getProxyInstance(HelloService.class);
System.out.println(proxyInstance.sayHello("Ray"));
System.out.println(proxyInstance.sayHello("JOJO"));
System.out.println(proxyInstance.sayNo("Ray"));
}

// 创建代理类,方便用户使用,将 Rpc 消息的组装以及发送操作进行代理
public static <T> T getProxyInstance(Class<T> clazz) {
ClassLoader classLoader = clazz.getClassLoader();
Class<?>[] interfaces = new Class[]{clazz};
Object proxyInstance = Proxy.newProxyInstance(classLoader, interfaces, ((proxy, method, args) -> {
int sequenceId = SequenceIdGenerator.nextId();
// 1. 将方法调用转换为消息对象
RpcRequestMessage requestMessage = new RpcRequestMessage(
sequenceId,
clazz.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);

// 2. 将消息对象发送出去
getChannel().writeAndFlush(requestMessage);

// 这里由于 主线程 是用于显示结果的
// 但 此时 RpcResponseMessageHandler 是处理结果的,也就是说是 NIO 线程 在接收 RPC 返回结果
// 所以想到利用 Promise 来实现不同线程间的数据传输
// 而由于 RPC 远程调用不止一次,只使用一个 Promise 可能造成数据的冲突错乱
// 这里又想到用 序列号 来表示 不同的 Promise
// 通过 Map 映射 (sequenceId,promise) 来实现不同的 RPC 消息 Promise 的独立性

// 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.promises.put(sequenceId,promise);

promise.await(); // 主线程同步阻塞等待返回结果

if (promise.isSuccess()) {
// 调用成功
return promise.getNow();
} else {
// 调用失败
throw new RuntimeException(promise.cause());
}
}));
return (T) proxyInstance;
}
}