为网关接口绑定对应的 RPC 服务,建立代理关系封装 RPC 泛化调用。这样调用网关接口就会调用到对应的 RPC 服务接口上并返回对应的数据。
设计 对于中间件框架的实现,往往都离不开代理 的操作,因为需要使用代理用逻辑封装行为。例如你原本是通过硬编码调用一个接口,那么现在因为所有的行为都被提炼到中间件中控制,那么对于接口的调用就不在一个而是一堆。而对于这样一堆接口的调用,它们是有共性的,比如可以统一使用 Java 反射 获取到方法名、入参、出参等等信息,再根据这些信息做代理逻辑包装,让每一个接口的调用都被中间件处理。
那么本章要实现的就是把来自网关的HTTP请求,转换到RPC调用上,这里就涉及到了RPC所提供的泛化调用,按照对应的泛化调用的逻辑,提供对应的接口和方法以及入参信息就可以拿到最终的结果。
文档:dubbo 泛化调用
如何把 HTTP 地址中的接口方法与 RPC 对应的服务建立起一种关联关系,这样才能满足在调用 HTTP 网关接口时,调用到对应的 RPC 服务上。
HTTP 经过网关调用到 RPC 中间的执行逻辑就是把两个模块用绑定的方式建立起连接,生成一个代理对象。代理对象中包装的就是执行网关接口泛化调用的参数准备和执行以及返回结果的操作。
这里的第一个知识点是泛化调用 ,它是 RPC 接口设计中提供的一种反射调用机制,不需要硬编码调用接口,只需要提供接口的方法名称、入参信息,即可调用到对应的 RPC 接口服务。
这里的第二个知识点是代理包装 ,虽然 RPC 框架提供了泛化调用,也就是说这里可以拿到网络协议转换的 HTTP 请求信息以后,就能直接调用到 RPC 接口。但这样的操作方式不太方便使用,存在硬编码的风险 ,后续不好迭代升级,也不好扩展其他的接口。因为每一个RPC的实现,泛化调用的方法名称还是有所不同的,另外是扩展非 RPC 框架的逻辑,也不方便处理 。所以这里需要单独提供一个代理包装逻辑。
这里的第三个知识点是 Cglib ,因为有第二个知识点中代理操作的存在,我们就需要选择一种方式来做代理处理,而 Cglib 可以满足我们自定义创建接口的方式进行代理 ,同时又可以让一个代理类有多个接口 。注意:多个接口的意思是,一个接口是用于标准的描述,在于使用上。另外一个接口是自主生成的,生成的是 RPC 描述性接口,相当于自主生成了class字节码。
实现
完成泛化调用服务
如何实现不同的RPCService都通过统一的接口来实现调用:dubbo泛化调用对用户是透明的,服务提供 IGenericReference 接口来实现统一调用。不同的 RPC Server,都能够通过 IGenericReference 来实现调用。
将 RPC Service的配置参数加载到configuration类中:应用服务、注册中心、泛化服务等。
将 GenericReference 加载到注册中心 Registry 类
这里用 IGenericReference 泛化调用接口的实现类代替 RPCService 的 GenericService 实例。用代理工厂实现 GenericReference 实现类(就是被代理的对象实例)的加载。
对注册中心 Registry 类中的 IGenericReference 进行代理
IGenericReference接口没有实现类,用Cglilb动态代理进行实现,返回一个被代理对象实例。在代理方法中实现 dubbo 泛化调用。
Netty 启动服务
利用SessionFactoryBuilder => SessionFactoy 启动 sessionServer。
sessionHandler 利用 sessionContguration 进行泛化调用。
IGenericReference 统一泛化调用接口 1 2 3 4 5 6 7 8 9 10 11 12 package cn.ray.gateway.bind;public interface IGenericReference { String $invoke(String args); }
GenericReferenceProxy 泛化调用静态代理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package cn.ray.gateway.bind;import net.sf.cglib.proxy.MethodInterceptor;import net.sf.cglib.proxy.MethodProxy;import org.apache.dubbo.rpc.service.GenericService;import java.lang.reflect.Method;public class GenericReferenceProxy implements MethodInterceptor { private final GenericService genericService; private final String methodName; public GenericReferenceProxy (GenericService genericService, String methodName) { this .genericService = genericService; this .methodName = methodName; } @Override public Object intercept (Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { Class<?>[] parameterTypes = method.getParameterTypes(); String[] parameters = new String [parameterTypes.length]; for (int i = 0 ; i < parameterTypes.length; i++) { parameters[i] = parameterTypes[i].getName(); } return genericService.$invoke(methodName,parameters,objects); } }
该类实现了 Cglib 的 MethodInterceptor 接口,用于代理泛化调用服务。
其中,GenericReferenceProxy类的构造方法需要传入泛化调用服务 GenericService 和泛化调用方法的名称 methodName 。
在intercept()方法中,首先获取要调用方法的参数类型 parameterTypes ,将其转化为参数名称的数组 parameters 。
接着,调用 genericService 的 $invoke() 方法,传入要调用的方法名称 methodName 、参数类型数组 parameters 和方法参数数组 objects ,完成泛化调用。
这里的$invoke()方法是Dubbo提供的一个通用泛化调用方法,可以根据传入的方法名和参数类型进行调用。具体文档可以参考:https://dubbo.apache.org/zh/docsv2.7/user/examples/generic-reference/
GenericReferenceProxyFactory 泛化调用静态代理工厂 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package cn.ray.gateway.bind;import net.sf.cglib.core.Signature;import net.sf.cglib.proxy.Enhancer;import net.sf.cglib.proxy.InterfaceMaker;import org.apache.dubbo.rpc.service.GenericService;import org.objectweb.asm.Type;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class GenericReferenceProxyFactory { private final GenericService genericService; private final Map<String,IGenericReference> genericReferenceCache = new ConcurrentHashMap <>(); public GenericReferenceProxyFactory (GenericService genericService) { this .genericService = genericService; } public IGenericReference newInstance (String methodName) { return genericReferenceCache.computeIfAbsent(methodName,k -> { GenericReferenceProxy genericReferenceProxy = new GenericReferenceProxy (genericService, methodName); InterfaceMaker interfaceMaker = new InterfaceMaker (); interfaceMaker.add(new Signature (methodName, Type.getType(String.class),new Type []{Type.getType(String.class)}),null ); Class<?> interfaceClass = interfaceMaker.create(); Enhancer enhancer = new Enhancer (); enhancer.setSuperclass(Object.class); enhancer.setInterfaces(new Class []{IGenericReference.class,interfaceClass}); enhancer.setCallback(genericReferenceProxy); return (IGenericReference) enhancer.create(); }); } }
这段代码是泛化调用静态代理工厂,主要用于创建并缓存泛化调用静态代理对象,其中包括泛化调用服务 GenericService
和泛化调用方法名称 methodName
。它提供了一个 newInstance
方法,可以根据不同的泛化调用方法名称创建对应的代理对象。
在 newInstance
方法中,首先使用了 computeIfAbsent
方法来从缓存中获取对应的静态代理对象。如果没有,则根据泛化调用方法名称创建 GenericReferenceProxy
对象,并使用 InterfaceMaker
类创建接口 interfaceClass
,目前这里只硬编码创建了返回值 String ,参数 String 的接口,然后使用 Enhancer
类创建代理对象,将 IGenericReference
接口和 interfaceClass
接口设置为代理对象的实现接口,并设置代理对象的回调方法为 GenericReferenceProxy
对象。最后返回创建的代理对象。
注意:这里创建好的代理对象是 GenericService
的代理对象并实现了IGenericReference
和 interfaceClass
两个接口,即IGenericReference
.$invoke 方法将被 GenericService
重写。
这样就实现了根据不同的泛化调用方法名称创建对应的静态代理对象,并且利用缓存来避免重复创建对象,提高了代码的性能。
Configuration 会话生命周期配置项 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package cn.ray.gateway.session;import cn.ray.gateway.bind.GenericReferenceRegistry;import cn.ray.gateway.bind.IGenericReference;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.rpc.service.GenericService;import java.util.HashMap;import java.util.Map;public class Configuration { private final GenericReferenceRegistry registry = new GenericReferenceRegistry (this ); private final Map<String, ApplicationConfig> applicationConfigMap = new HashMap <>(); private final Map<String, RegistryConfig> registryConfigMap = new HashMap <>(); private final Map<String, ReferenceConfig<GenericService>> referenceConfigMap = new HashMap <>(); public Configuration () { ApplicationConfig application = new ApplicationConfig (); application.setName("api-gateway-test" ); application.setQosEnable(false ); RegistryConfig registry = new RegistryConfig (); registry.setAddress("zookeeper://127.0.0.1:2181" ); registry.setRegister(false ); ReferenceConfig<GenericService> reference = new ReferenceConfig <>(); reference.setInterface("cn.ray.gateway.rpc.IActivityBooth" ); reference.setVersion("1.0.0" ); reference.setGeneric("true" ); applicationConfigMap.put("api-gateway-test" ,application); registryConfigMap.put("api-gateway-test" ,registry); referenceConfigMap.put("cn.ray.gateway.rpc.IActivityBooth" ,reference); } public ApplicationConfig getApplicationConfig (String applicationName) { return applicationConfigMap.get(applicationName); } public RegistryConfig getRegistryConfig (String applicationName) { return registryConfigMap.get(applicationName); } public ReferenceConfig<GenericService> getReferenceConfig (String interfaceName) { return referenceConfigMap.get(interfaceName); } public void addGenericReference (String application, String interfaceName, String methodName) { registry.addGenericReference(application, interfaceName, methodName); } public IGenericReference getGenericReference (String methodName) { return registry.getGenericReference(methodName); } }
GenericReferenceRegistry 泛化调用服务注册中心 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package cn.ray.gateway.bind;import cn.ray.gateway.session.Configuration;import org.apache.dubbo.config.ApplicationConfig;import org.apache.dubbo.config.ReferenceConfig;import org.apache.dubbo.config.RegistryConfig;import org.apache.dubbo.config.bootstrap.DubboBootstrap;import org.apache.dubbo.config.utils.ReferenceConfigCache;import org.apache.dubbo.rpc.service.GenericService;import java.util.HashMap;import java.util.Map;public class GenericReferenceRegistry { private final Configuration configuration; public GenericReferenceRegistry (Configuration configuration) { this .configuration = configuration; } private final Map<String, GenericReferenceProxyFactory> knownGenericReferences = new HashMap <>(); public void addGenericReference (String application, String interfaceName, String methodName) { ApplicationConfig applicationConfig = configuration.getApplicationConfig(application); RegistryConfig registryConfig = configuration.getRegistryConfig(application); ReferenceConfig<GenericService> referenceConfig = configuration.getReferenceConfig(interfaceName); DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(applicationConfig).registry(registryConfig).reference(referenceConfig).start(); ReferenceConfigCache cache = ReferenceConfigCache.getCache(); GenericService genericService = cache.get(referenceConfig); knownGenericReferences.put(methodName,new GenericReferenceProxyFactory (genericService)); } public IGenericReference getGenericReference (String methodName) { GenericReferenceProxyFactory genericReferenceProxyFactory = knownGenericReferences.get(methodName); if (null ==genericReferenceProxyFactory) { throw new RuntimeException ("Type " + methodName + " is not known to the GenericReferenceRegistry." ); } return genericReferenceProxyFactory.newInstance(methodName); } }
这段代码是一个泛化调用服务的注册中心,用于注册和获取泛化调用服务的接口方法。它依赖于Configuration
,该类提供了服务的配置信息。
当一个新的泛化调用服务接口方法被注册时,addGenericReference
方法被调用,传入三个参数:服务应用名、接口名和方法名。通过调用configuration
中的方法,获取服务的应用、注册和泛化调用配置对象,然后使用这些配置创建一个新的DubboBootstrap
实例并启动它。接下来,使用ReferenceConfigCache
获取泛化调用服务,并将其保存在一个GenericReferenceProxyFactory
对象中,用于生成泛化调用服务的代理对象。最后,将GenericReferenceProxyFactory
对象添加到knownGenericReferences
缓存中,以便在需要时进行检索。
当需要调用已注册的泛化调用服务接口方法时,getGenericReference
方法被调用,传入一个方法名。它会从knownGenericReferences
缓存中获取与方法名对应的GenericReferenceProxyFactory
对象,然后使用它来生成一个新的泛化调用服务的代理对象,并返回该对象。如果未找到与方法名对应的GenericReferenceProxyFactory
对象,则会抛出一个RuntimeException
异常。
注意:ReferenceConfigCache 是 Dubbo 中提供的一个缓存类,它可以缓存已经创建的ReferenceConfig对象,以提高性能和避免重复创建对象。
在 Dubbo 中,每次使用 ReferenceConfig 创建服务代理对象时,都会进行大量的初始化操作,包括协议、注册中心、集群等等。如果每次都重新创建ReferenceConfig对象,这些初始化操作都需要再次执行,浪费了大量时间和资源。因此,Dubbo 提供了 ReferenceConfigCache 缓存,将 ReferenceConfig 对象缓存起来,下次再使用时直接从缓存中获取即可,无需再次初始化。这样可以提高性能,避免资源浪费。
IGenericReferenceSessionFactory 泛化调用会话工厂接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package cn.ray.gateway.session;import io.netty.channel.Channel;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public interface IGenericReferenceSessionFactory { Future<Channel> openSession () throws ExecutionException, InterruptedException; }
GenericReferenceSessionFactory 泛化调用会话工厂 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package cn.ray.gateway.session.defaults;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.IGenericReferenceSessionFactory;import cn.ray.gateway.session.SessionServer;import io.netty.channel.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class GenericReferenceSessionFactory implements IGenericReferenceSessionFactory { private final Logger logger = LoggerFactory.getLogger(GenericReferenceSessionFactory.class); private final Configuration configuration; public GenericReferenceSessionFactory (Configuration configuration) { this .configuration = configuration; } @Override public Future<Channel> openSession () throws ExecutionException, InterruptedException { SessionServer server = new SessionServer (configuration); Future<Channel> future = Executors.newFixedThreadPool(2 ).submit(server); Channel channel = future.get(); if (null == channel) throw new RuntimeException ("netty server start error channel is null" ); while (!channel.isActive()) { logger.info("netty server gateway start Ing ..." ); Thread.sleep(500 ); } logger.info("netty server gateway start Done! {}" , channel.localAddress()); return future; } }
SessionServerHandler 会话服务处理器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package cn.ray.gateway.session.handlers;import cn.ray.gateway.bind.IGenericReference;import cn.ray.gateway.session.BaseHandler;import cn.ray.gateway.session.Configuration;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.http.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SessionServerHandler extends BaseHandler <FullHttpRequest> { private final Logger logger = LoggerFactory.getLogger(SessionServerHandler.class); private final Configuration configuration; public SessionServerHandler (Configuration configuration) { this .configuration = configuration; } @Override protected void session (ChannelHandlerContext ctx, Channel channel, FullHttpRequest request) { logger.info("网关接收请求 ===> uri: {} , method: {}" ,request.uri(),request.method()); String methodName = request.uri().substring(1 ); if (methodName.equals("favicon.ico" )) return ; DefaultFullHttpResponse response = new DefaultFullHttpResponse (HttpVersion.HTTP_1_1, HttpResponseStatus.OK); IGenericReference genericReference = configuration.getGenericReference("sayHi" ); String result = genericReference.$invoke("test" + " " + System.currentTimeMillis()); response.content().writeBytes(JSON.toJSONBytes(result, SerializerFeature.PrettyFormat)); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON + "; charset=UTF-8" ); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "*" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE" ); heads.add(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true" ); channel.writeAndFlush(response); } }
GenericReferenceSessionFactoryBuilder 会话工厂建造类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package cn.ray.gateway.session;import cn.ray.gateway.session.defaults.GenericReferenceSessionFactory;import io.netty.channel.Channel;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class GenericReferenceSessionFactoryBuilder { public Future<Channel> build (Configuration configuration) { IGenericReferenceSessionFactory genericReferenceSessionFactory = new GenericReferenceSessionFactory (configuration); try { return genericReferenceSessionFactory.openSession(); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException (e); } } }
测试 IActivityBooth RPC接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package cn.ray.gateway.rpc;import cn.ray.gateway.rpc.dto.XReq;public interface IActivityBooth { String sayHi (String str) ; String insert (XReq req) ; String test (String str, XReq req) ; }
ActivityBooth RPC实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package cn.ray.gateway.interfaces;import cn.ray.gateway.rpc.IActivityBooth;import cn.ray.gateway.rpc.dto.XReq;import com.alibaba.fastjson.JSON;import org.apache.dubbo.config.annotation.Service;@Service(version = "1.0.0") public class ActivityBooth implements IActivityBooth { @Override public String sayHi (String str) { return "hi " + str + " by api-gateway-test" ; } @Override public String insert (XReq req) { return "hi " + JSON.toJSONString(req) + " by api-gateway-test" ; } @Override public String test (String str, XReq req) { return "hi " + str + JSON.toJSONString(req) + " by api-gateway-test" ; } }
启动网关 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package cn.ray.gateway.test;import cn.ray.gateway.session.Configuration;import cn.ray.gateway.session.GenericReferenceSessionFactoryBuilder;import cn.ray.gateway.session.SessionServer;import io.netty.channel.Channel;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class ApiTest { private final Logger logger = LoggerFactory.getLogger(ApiTest.class); @Test public void test_GenericReference () throws InterruptedException, ExecutionException { Configuration configuration = new Configuration (); configuration.addGenericReference("api-gateway-test" , "cn.ray.gateway.rpc.IActivityBooth" , "sayHi" ); GenericReferenceSessionFactoryBuilder builder = new GenericReferenceSessionFactoryBuilder (); Future<Channel> future = builder.build(configuration); logger.info("服务启动完成 {}" , future.get().id()); Thread.sleep(Long.MAX_VALUE); } }
测试结果
问题与总结
方法名一映射一泛化调用服务的问题
将泛化调用注册到 Registry ,是根据 application+interface ,定位到某个RPC服务。然后绑定 methodName 和该 RPC 服务对应的泛化调用服务。
只根据 methodName 获取泛化调用服务时,没法避免同名的情况。例如两个不同的RPC服务提供了同名方法。就无法辨别?
目前IGenericReference只实现了参数和返回值均为单个String的方法,如何实现泛化调用多个不同参数和返回值的方法?
缓存问题
每次进行add操作,绑定 methodName 和泛化调用服务,都要重头启动dulbbo服务,获得 ReferenceConfigCache,再根据 key 获取genericService,减少启动次数?
思路一:缓存 ReferenceConfgCache ,是否还需要重新启动dubbo服务start方法?这一块对dubbo服务启动不太清楚。
思路二:缓存 genericService 。interface 对应某个RPC服务的genericService,绑定该服务下的任意 method 都可以利用已经加载好的genericService。
代理问题
Cglib代理,实现了两个代理接口
1、lGenericReference 泛化调用接口
2、根据RPC方法创建的接口
此处接口2的意义在哪?如果不显式地用方法名进行方法调用,完全可以不实现接口2。
泛化调用静态代理工厂GenericReferenceProxyFactory里面的genericReferenceCache没感觉有太大的作用。
因为代理工厂在创建保存的时候就是根据methodName一一映射的
(GenericReferenceRegistry.addGenericReference),然后 genericReferenceCache 又是根据 methodName 映射的代理类,按理说一个代理工厂的 genericReferenceCache 里面只会有一个GenericReference,此处的作用我理解的好像就是类似单例缓存了一个Method绑定的IGenericReference,那么可以不用Map直接使用单例模式吗?