Java进阶-分布式框架通信核心基础-手动实现RPC
单体应用与分布式项目简介
- 单体应用项目
- 项目的各个层都放在一个项目中,各个层之间的调用直接是通过方法接口调用,无需网络传输,无延迟
- 应用场景:对于单体应用项目来说, 开发起来比较方便, 服务的调用都是在本地调用,性能方面比较好,特别适用于中小型的一些项目: 比如小说类的App, 母婴类的App
- 分布式应用项目
- 项目的各个层分别部署在不同的服务器,各个层之间调用通过网络传输RPC框架来调用
- 特点:
- 项目规模比较大, 用户数量比较多, 业务功能比较复杂
- 分布式项目采用模块化开发, 对于开发人员来说, 只需要了解负责的模块即可
- 对于分布式项目部署来说, 采用模块化部署, 万一一个模块出现问题, 并不会直接影响其他模块调用
- 对于分布式项目会引入网络, 也会引入分库分表的操作, 所以系统的复杂性会明显提高
- 使用场景:对于分布式项目常用的一般是大型的App, 比如说社交类的App, 旅游类的App, 直播类的App, 在线教育类的App等等
项目简介
- 项目分三个层product-api层、product-web层、product-service层,然后web层与service层之间的调用通过网络传输来实现,这个网络传输由自己编写实现
- 即自己手写一套RPC框架,来实现各个层之间的网络通信数据传送
- 具体图如下:

初始化项目
- 创建父模块08-product-rpc-v1
- 新建一个项目(new Module)-》选择Maven->next->name: 08-product-rpc-v1 Atrifact coordinates展开:groupid: com.zh.demo Atrifactid:product-rpc-v1 -> finished
-
由于该项目仅仅是用来管理子项目的,因此在pom.xml文件中添加
<packaging>pom</packaging> - 删除src文件夹,只留pom文件
- 创建product-api模块:点击08-product-rpc-v1,右击->new->Module->Maven->next->name:product-api->Atrifact coordinates展开:groupid: com.zh.demo.v1->finish
- 创建product-web模块:点击08-product-rpc-v1,右击->new->Module->Spring Initializr(因为这个项目需要部署,因此需要启动,直接使用springboot)->group:com.zh.demo.v1、artifact:product-web-v1、java version:8->next->Develop Tools 勾选Lombok 、Web勾选Springweb->next->注意:content root 路径要在product-web下(…/08-product-rpc-v1/product-web-v1) ->finish
- 同理创建product-server模块
product-api模块(api层)
- 创建包名:com.zh.demo.rpc.v1
-
创建domain的product类
package com.zh.demo.rpc.v1.domain; /** * 产品信息 */ @Setter @Getter @ToString @AllArgsConstructor @NoArgsConstructor public class Product implements Serializable { private Long id;//id private String sn;//产品编号 private String name;//产品名称 private BigDecimal price;//产品价格 } -
创建service的IProductService类
package com.zh.demo.rpc.v1.service; import com.zh.demo.rpc.v1.domain.Product; /** * 定义产品接口的操作规范 */ public interface IProductService { /** * 保存产品 * @param product */ void save(Product product); /** * 根据产品id删除产品 * @param productId */ void deleteById(Long productId); /** * 修改产品信息 * @param product */ void update(Product product); /** * 根据产品id获取到产品信息 * @param productId * @return */ Product get(Long productId); }
product-web模块(control层)
- 将包名也修改为
com.zh.demo.rpc.v1 -
在pom文件中导入product-api模块
<dependency> <groupId>com.zh.demo.v1</groupId> <artifactId>product-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> -
创建ProductController类
package com.zh.demo.rpc.v1.web; import com.zh.demo.rpc.v1.domain.Product; import com.zh.demo.rpc.v1.service.IProductService; @RestController @RequestMapping("product") public class ProductController { @Autowired private IProductService productService; @RequestMapping("save") public String save(Product product) throws Exception{ productService.save(product); return "success"; } @RequestMapping("get") public Product get(Long id) throws Exception{ Product product = productService.get(id); return product; } }
product-server模块(service层)
- 将包名也修改为
com.zh.demo.rpc.v1 - 在pom文件中导入product-api模块
-
新增实现类
package com.zh.demo.rpc.v1.service.impl; import com.zh.demo.rpc.v1.domain.Product; import com.zh.demo.rpc.v1.service.IProductService; import org.springframework.stereotype.Component; import java.math.BigDecimal; @Service("IProductService") public class ProductServiceImpl implements IProductService { @Override public void save(Product product) { System.out.println("产品保存成功: "+product); } @Override public void deleteById(Long productId) { System.out.println("产品删除成功: "+ productId); } @Override public void update(Product product) { System.out.println("产品修改成功: "+ product); } @Override public Product get(Long productId) { System.out.println("产品获取成功"); return new Product(1L,"001","笔记本电脑",BigDecimal.TEN); } }
问题剖析
- 从上面来看service、control都依赖api层,但是control层如何拿到service层的实现呢?
-
比如:ProductController中的注入
@Autowired private IProductService productService; - 这里是拿不到IProductService接口的实现对象的,因此需要做一个网络类到service层去获取对应的接口实现类
手动实现各个层的网络通信
api层封装通用的请求、响应结果类
-
请求类RpcRequest
package com.zh.demo.rpc.v1.common; @Setter @NoArgsConstructor @AllArgsConstructor @ToString @Getter //RPC通信的数据请求规则 public class RpcRequest implements Serializable { // 请求消息的消息Id private String requestId; // 请求的具体的类名(接口名称) private String className; // 请求的具体的方法名称 private String methodName; // 请求的方法参数类型列表 private Class<?>[] parameterTypes; // 请求的方法参数列表 private Object[] parameters; } -
响应类RpcResponse
package com.zh.demo.rpc.v1.common; @NoArgsConstructor @AllArgsConstructor @ToString @Setter @Getter //RPC通信消息的响应数据规则 public class RpcResponse implements Serializable { //响应的消息id private String responseId; //请求的消息id private String requestId; // 响应的消息是否成功 private boolean success; // 响应的数据结果 private Object result; // 如果有异常信息,在该对象中记录异常信息 private Throwable throwable; }
service层改造
-
提供一个网络监听服务,用来响应当前模块的所有接口的实现类RpcServer
package com.zh.demo.rpc.v1; @Slf4j @Data @NoArgsConstructor @AllArgsConstructor public class RpcServer { //启动一个网络通信服务需要的端口 private int port ; //通过容器对象获取容器中的bean private ApplicationContext applicationContext; //服务启动类--> 启动一个ServerSocket //PostConstruct这个注解可以让这个方法在调用完构造函数后自动执行 @PostConstruct public void startup(){ try { //1 创建一个socket通信 ServerSocket serverSocket = new ServerSocket(port); System.out.println("服务端启动成功, 等待客户端连接....."); //2 接收客户端请求 while(true){ Socket socket = serverSocket.accept();//等待客户端连接 如果没有客户端连接, 会阻塞在这里 try( ObjectInputStream ois =new ObjectInputStream(socket.getInputStream()); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()) ){ //读取内容 RpcRequest rpcRequest = (RpcRequest) ois.readObject(); System.out.println("服务端收到客户端的请求: obj = " + rpcRequest); //调用业务方法执行代码 //1 通过类名找到对应的Bean对象,在容器中查找 String className = rpcRequest.getClassName();//IProductService Object bean = applicationContext.getBean(className); Class<?> clazz = bean.getClass();//字节码对象 //2 通过反射调用类的对应方法 String methodName = rpcRequest.getMethodName(); Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters(); Method method = null;//方法对象 Object result = null;//反射的返回结果 if(parameterTypes==null || parameterTypes.length==0){ method=clazz.getMethod(methodName); result =method.invoke(bean); }else{ method = clazz.getMethod(methodName, parameterTypes); result=method.invoke(bean,parameters); } //发送数据到客户端 RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setSuccess(true); rpcResponse.setResult(result); oos.writeObject(rpcResponse); oos.flush();//刷新缓冲区域 }catch (Exception e){ e.printStackTrace(); log.error(e.getMessage()); } } } catch (IOException e) { e.printStackTrace(); } } } -
在ProductServerApplication启动当前模块时,启动网络服务RpcServer
@SpringBootApplication public class ProductServerApplication { public static void main(String[] args) { SpringApplication.run(ProductServerApplication.class, args); } @Bean public RpcServer rpcServer(ApplicationContext applicationContext){ RpcServer rpcServer = new RpcServer(9000,applicationContext); return rpcServer; } }
control层改造
-
转门封装一个通用类,用于到service层获取接口对应的实现类的对象—RpcClient
package com.zh.demo.rpc.v1; @Slf4j @Data @NoArgsConstructor @AllArgsConstructor public class RpcClient { //远程网络通信的IP地址 private String host; //远程网络通信的端口 private int port; public Object send(RpcRequest rpcRequest){ //1 连接服务端 try( Socket socket = new Socket(host, port); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()) ) { //1 通过输出流返送数据到服务端--> 对象输出流 // 调用一个方法 找到对应的字节码对象 通过一个名称 找到对应的方法 调用方法的参数类型, 实际参数列表 oos.writeObject(rpcRequest); oos.flush(); //2 接收服务端数据 RpcResponse rpcResponse = (RpcResponse) ois.readObject(); System.out.println("服务端响应给客户端的请求结果: rpcResponse = " + rpcResponse); return rpcResponse.getResult(); }catch (Exception e){ e.printStackTrace(); log.error(e.getMessage()); } return null; } } - 提供一个代理,转门用来代理某个接口实例,该实例通过网络通信去service层获取实际的调用结果–RpcProxy
- web层拿到的bean,其实就是productService接口的一个代理对象
- 当web层用这个代理对象(bean)去调用方法时,代理对象会通过远程网络通信,将方法名、参数等,传递给service层,然后service层用实际的impl执行方法,然后将执行结果响应给web层
@Data @NoArgsConstructor @AllArgsConstructor public class RpcProxy { private RpcClient rpcClient; //通用的代理对象,可以代理任何接口 public <T> T getProxy(Class<T> interfaceClass){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),//类加载器 new Class<?>[]{interfaceClass}, new InvocationHandler() { //proxy 代理对象 method 代理的方法 args 实际参数列表 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 1 创建客户端连接 // 2 构造请求参数 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(interfaceClass.getSimpleName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); rpcRequest.setParameterTypes(method.getParameterTypes()); // 3 发送请求 // 4 返回请求结果 Object result = rpcClient.send(rpcRequest); return result; } } ); } } -
product-web模块启动时,应到到service层获取到所有当前模块所需要的接口实现类对应的实例对象
@SpringBootApplication public class ProductWebApplication { public static void main(String[] args) { SpringApplication.run(ProductWebApplication.class, args); } @Bean public RpcClient rpcClient(){ RpcClient client = new RpcClient("127.0.0.1", 9000); return client; } @Bean public RpcProxy rpcProxy(RpcClient rpcClient){ return new RpcProxy(rpcClient); } @Bean public IProductService productService(RpcProxy rpcProxy){ // 因为本地没有对应的实现类的字节码对象, 无法实例化对象 // 这里仅仅是一个IProductService的代理对象proxy IProductService productService=rpcProxy.getProxy(IProductService.class) ; return productService; } }- 这样,ProductController注入的productService实例,就能获取到值了
测试
- 启动product-server模块-ProductServerApplication
- 启动product-web模块-ProductWebApplication
-
浏览器访问:
http://localhost:8080/product/get?id=100,响应结果如下{ "id": 1, "sn": "001", "name": "笔记本电脑", "price": 10 }
原理总结
- 首先使用socket创建2个模块(请求层、响应层)的网络连接
- 请求层网络请求传输的数据是一个对象,这个对象封装了要获取的类名、方法名称、参数等信息
- 响应层根据请求的数据采用java的反射来调用相应的方法,返回对应的对象
问题分析
- 我们封装的网络通信模块放在了product-web层,那么是不是应该将网络通信模块独立出来?任何模块使用直接引入即可?
抽取网络通信模块
项目初始化
- 创建父模块09-product-rpc-v2
- 新建一个项目(new Module)-》选择Maven->next->name: 09-product-rpc-v2 Atrifact coordinates展开:groupid: com.zh.rpc.v2 Atrifactid:product-rpc-v2 -> finished
-
由于该项目仅仅是用来管理子项目的,因此在pom.xml文件中添加
<packaging>pom</packaging> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> </dependencies> - 删除src文件夹,只留pom文件
- 创建product-blz-v2业务模块
- 右击09-product-rpc-v2-》new Module-》选择Maven->next->name: product-blz-v2 -> finished
-
由于该项目仅仅是用来管理子项目的,因此在pom.xml文件中添加
<packaging>pom</packaging> - 删除src文件夹,只留pom文件
- 创建rpc-sys-v2系统模块:步骤跟2一样,也是一个只有pom的父模块
product-blz-v2业务模块
- 分别新建三个模块,一个maven,2个springboot项目
- product-common模块,maven项目,创建方法略
- 新建包名
com.zh.rpc.v2 - 把Product、IProductService 2个代码拖入
- 新建包名
- product-service模块,springboot项目,创建方法略
- 将ProductServiceImpl类拖入
-
pom文件添加依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>product-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
- product-web模块,springboot项目,创建方法略
- ProductController类拖入
- pom文件添加依赖product-common,同上
rpc-sys-v2系统模块
- 该模块下也要分3块,分别是rpc-common、rpc-client、rpc-server
- rpc-common模块,maven项目,创建方法略,设置包名为
com.zh.rpc.v2- 将之前的RpcRequest、RpcResponse两个类拖入
- rpc-client模块,maven项目,创建方法略,设置包名为
com.zh.rpc.v2-
pom文件添加依赖
<dependencies> <dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> -
添加jdk1.8依赖
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> -
将RpcClient、RpcProxy2个类拖入项目即可
-
- rpc-server模块,springboot项目,创建方法略,设置包名为
com.zh.rpc.v2- pom文件添加依赖rpc-common,同上
-
将RpcServer这个类拖入,因为这个类中用到了这个类ApplicationContext,因此需要添加下面依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.6.RELEASE</version> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.2.6.RELEASE</version> <optional>true</optional> </dependency> - 添加jdk1.8依赖同上
-
由于RpcServer中处理网络请求,应该是用多线程技术,因此将RpcServer改造如下
package com.zh.rpc.v2; /*当前类负责接收请求*/ @Slf4j @Data @NoArgsConstructor @AllArgsConstructor public class RpcServer { private ExecutorService executorService= Executors.newCachedThreadPool(); private int port ; private ApplicationContext applicationContext; public RpcServer(int port, ApplicationContext applicationContext) { this.port = port; this.applicationContext = applicationContext; } //服务启动类--> 启动一个ServerSocket //PostConstruct这个注解可以让这个方法在调用完构造函数后自动执行 @PostConstruct public void startup(){ try { //1 创建一个socket通信 ServerSocket serverSocket = new ServerSocket(port); System.out.println("服务端启动成功, 等待客户端连接....."); //2 接收客户端请求 while(true){ Socket socket = serverSocket.accept();//等待客户端连接 如果没有客户端连接, 会阻塞在这里 //处理请求--> 使用多线程技术--> 线程池技术 executorService.submit(new RpcServerHandler(socket,applicationContext)); } } catch (IOException e) { e.printStackTrace(); } } } -
封装RpcServerHandler类,专门用于处理网络请求
/*封装一个类专门处理网络请求*/ @Setter @Getter @NoArgsConstructor public class RpcServerHandler implements Runnable { private Socket socket; private ApplicationContext applicationContext; public RpcServerHandler(Socket socket,ApplicationContext applicationContext) { this.socket = socket; this.applicationContext=applicationContext; } @Override public void run() { try( ObjectInputStream ois =new ObjectInputStream(socket.getInputStream()); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()) ){ //读取内容 RpcRequest rpcRequest = (RpcRequest) ois.readObject(); System.out.println("服务端收到客户端的请求: obj = " + rpcRequest); //调用业务方法执行代码 //1 通过类名找到对应的Bean对象,在容器中查找 String className = rpcRequest.getClassName();//IProductService Object bean = applicationContext.getBean(className); Class<?> clazz = bean.getClass();//字节码对象 //2 通过反射调用类的对应方法 String methodName = rpcRequest.getMethodName(); Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters(); Method method = null;//方法对象 Object result = null;//反射的返回结果 if(parameterTypes==null || parameterTypes.length==0){ method=clazz.getMethod(methodName); result =method.invoke(bean); }else{ method = clazz.getMethod(methodName, parameterTypes); result=method.invoke(bean,parameters); } //发送数据到客户端 RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setSuccess(true); rpcResponse.setResult(result); oos.writeObject(rpcResponse); oos.flush();//刷新缓冲区域 }catch (Exception e){ e.printStackTrace(); } } }
product-blz-v2业务模块使用rpc-sys-v2系统模块
- product-service模块
-
pom文件添加依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-server</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency> -
ProductServiceApplication,启动网络通信服务
package com.zh.rpc.v2; @SpringBootApplication public class ProductServiceApplication { public static void main(String[] args) { SpringApplication.run(ProductServiceApplication.class, args); } @Bean public RpcServer rpcServer(ApplicationContext applicationContext){ RpcServer rpcServer = new RpcServer(9000, applicationContext); return rpcServer; } } -
启动这个类,即启动服务端
-
- product-web模块
-
pom文件添加依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-client</artifactId> <version>1.0-SNAPSHOT</version> </dependency> -
改造ProductWebApplication
package com.zh.rpc.v2; @SpringBootApplication public class ProductWebApplication { public static void main(String[] args) { SpringApplication.run(ProductWebApplication.class, args); } @Bean public RpcClient rpcClient(){ RpcClient rpcClient = new RpcClient("127.0.0.1",9000); return rpcClient; } @Bean public RpcProxy rpcProxy(RpcClient rpcClient){ return new RpcProxy(rpcClient); } @Bean public IProductService productService(RpcProxy rpcProxy){ return rpcProxy.getProxy(IProductService.class); } } -
运行该模块
-
- 浏览器访问:
http://localhost:8080/product/get?id=100,响应正常
注册中心的封装集成
-
上面讲到,product-web模块进行访问product-service模块的时候,连接地址如下
RpcClient rpcClient = new RpcClient("127.0.0.1",9000); - 问题
- 这个地址是写死固定的,不能动态修改
- 如果product-service模块部署了n个服务器,那么,是不是应该根据某个策略进行动态分配ip地址?
- 集成注册中心
- 为什么需要注册中心? 如果没有注册中心,会有如下结果:
- 客户端在调用服务端的时候必须指定服务端的一个具体的地址, 只能从一个服务端进行调用
- 如果有多个服务端启动, 并且提供服务的时候, 客户端并不能完成对服务端的请求负载
- 如果有一个服务端出现问题, 在调用的时候没法切换到别的服务端进行调用
- 注册中心的主要功能
- 针对服务端来说, 我们在服务端启动完成以后, 需要往注册中心注册自己的地址信息(在指定的目录下面)
- 针对客户端来说, 我们在每次调用的时候, 根据一定的负载策略, 从注册中心获取一个可用的地址发送远程调用请求
- 为什么需要注册中心? 如果没有注册中心,会有如下结果:
服务提供者product-service模块
-
添加pom依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-server</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency> -
ProductServiceApplication改造如下
//1.初始化注册中心zk,用于服务提供者向注册中心注册服务 @Bean public RpcRegistry rpcRegistry(){ RpcRegistry rpcRegistry = new RpcRegistry("127.0.0.1:2181"); return rpcRegistry; } //2. 向注册中心zk注册当前服务;注意这个端口6000,可不是当前服务的端口,是当前网络通信服务的端口!!! @Bean public RpcServer rpcServer(ApplicationContext applicationContext,RpcRegistry rpcRegistry){ //"127.0.0.1":这个地址通常应该是域名或者远程地址,将当前服务器的地址注册到注册中心,即zookeeper RpcServer rpcServer = new RpcServer("127.0.0.1",9000, applicationContext,rpcRegistry); return rpcServer; }
消费者product-web模块
-
添加pom依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-client</artifactId> <version>1.0-SNAPSHOT</version> </dependency> -
ProductWebApplication
// 1. 初始化注册中心,消费者通过注册中心发现可用服务 @Bean public RpcDiscover rpcDiscover() throws Exception { RpcDiscover rpcDiscover = new RpcDiscover("127.0.0.1:2181"); return rpcDiscover; } //2. 初始化客户端网络工具,用于向服务断提供网络请求 @Bean public RpcClient rpcClient(RpcDiscover rpcDiscover){ RpcClient rpcClient = new RpcClient(rpcDiscover); return rpcClient; } //3. 封装代理对象工具,将客户端的接口封装成一个代理,到服务端获取 @Bean public RpcProxy rpcProxy(RpcClient rpcClient){ return new RpcProxy(rpcClient); } //4. 客户端将要获取的impl对象存储到容器中 @Bean public IProductService productService(RpcProxy rpcProxy){ return rpcProxy.getProxy(IProductService.class); }
注册中心模块rpc-registry
- 注册中心要提供2个功能,服务提供者的注册功能、消费者的查询功能
- maven项目,创建方法略,设置包名为
com.zh.rpc.v2 -
pom文件添加zookpeer
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> - 添加jdk1.8依赖
-
注册中心常量使用类Constant
/*该类是一个注册中心的常量配置文件*/ package com.zh.rpc.v2; public interface Constant { //定义客户端连接session会话超时时间,单位为毫秒,该值的设置和zkServer设置的心跳时间有关系 int SESSION_TIMEOUT=4000; // 定义用于保存rpc通信服务端的地址信息的目录 String REGISTRY_PATH="/rpc"; // 定义数据存放的具体目录 String DATA_PATH=REGISTRY_PATH+"/data"; } -
服务端往注册中心注册类RpcRegistry
package com.zh.rpc.v2; /*该类是用于服务端给zooKeeper注册的类*/ @AllArgsConstructor @NoArgsConstructor @Getter @Setter public class RpcRegistry { //zkServer的地址信息 private String registryAddress; //zk客户端程序 private ZooKeeper zooKeeper; public RpcRegistry(String registryAddress) { this.registryAddress = registryAddress; } public void createNode(String data) throws Exception{ //创建一个客户端程序, 对于注册可以不用监听事件 zooKeeper= new ZooKeeper(registryAddress, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { } }); if(zooKeeper!=null){ try{ //判断注册的目录是否存在 Stat stat = zooKeeper.exists(REGISTRY_PATH, false); if(stat==null){ //如果不存在, 创建一个持久的节点目录 zooKeeper.create(REGISTRY_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } //创建一个临时的序列节点,并且保存数据信息 zooKeeper.create(DATA_PATH,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); }catch (Exception e){ e.printStackTrace(); } }else{ } } } -
客户端(消费者)发现可用RPC服务的类
package com.zh.rpc.v2; /*该类是用于发现zooKeeper中哪些服务器可以使用*/ @Setter @Getter //地址发现,用于实时的获取最新的RPC服务信息 public class RpcDiscover { //服务端地址 zkServer的地址 private String registryAddress; //获取到的所有提供服务的服务器列表 volatile 保证多线程的可见性 private volatile List<String> dataList=new ArrayList<>(); private ZooKeeper zooKeeper=null; //初始化zkClient客户端 public RpcDiscover(String registryAddress) throws Exception { this.registryAddress = registryAddress; zooKeeper=new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType()==Event.EventType.NodeChildrenChanged){ //监听zkServer的服务器列表变化 watchNode(); } } }); //获取节点相关数据 watchNode(); } // 从dataList列表随机获取一个可用的服务端的地址信息给rpc-client public String discover(){ int size=dataList.size(); if(size>0){ int index= new Random().nextInt(size); return dataList.get(index); } throw new RuntimeException("没有找到对应的服务器"); } //监听服务端的列表信息 private void watchNode(){ try{ //获取子节点信息 List<String> nodeList = zooKeeper.getChildren(Constant.REGISTRY_PATH, true); List<String> dataList=new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zooKeeper.getData(Constant.REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } this.dataList=dataList; }catch (Exception e){ e.printStackTrace(); } } }
消息通信服务端模块rpc-server改造
-
添加pom依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-registry</artifactId> <version>1.0-SNAPSHOT</version> </dependency> -
RpcServer改造
public class RpcServer { private ExecutorService executorService= Executors.newCachedThreadPool(); private int port ; private String hostname; private ApplicationContext applicationContext; private RpcRegistry rpcRegistry; public RpcServer(String hostname,int port, ApplicationContext applicationContext,RpcRegistry rpcRegistry) { this.hostname=hostname; this.port = port; this.applicationContext = applicationContext; this.rpcRegistry=rpcRegistry; } //服务启动类--> 启动一个ServerSocket //PostConstruct这个注解可以让这个方法在调用完构造函数后自动执行 @PostConstruct public void startup() throws Exception { try { //1 创建一个socket通信 ServerSocket serverSocket = new ServerSocket(port); System.out.println("服务端启动成功, 等待客户端连接....."); //2 往注册中心zookeeper中注册对应的地址性 host, port--> 地址信息 //注意看!!!:这里往zk中注册的可是当前网络服务的host+port可不是当前业务服务的host+port String addrName=null;//192.168.48.1:9000 addrName=hostname+":"+port; System.out.println("addrName = " + addrName); //此时zk中的临时节点是:/rpc/data/192.168.48.1:9000 rpcRegistry.createNode(addrName); System.out.println("服务端启动成功, 等待客户端连接....."); //2 接收客户端请求 while(true){ Socket socket = serverSocket.accept();//等待客户端连接 如果没有客户端连接, 会阻塞在这里 //处理请求--> 使用多线程技术--> 线程池技术 executorService.submit(new RpcServerHandler(socket,applicationContext)); } } catch (IOException e) { e.printStackTrace(); } } }
消息通信客户端模块rpc-client改造
-
添加pom依赖
<dependency> <groupId>com.zh.rpc.v2</groupId> <artifactId>rpc-registry</artifactId> <version>1.0-SNAPSHOT</version> </dependency> -
RpcClient添加从注册中心获取可用服务代码
public class RpcClient { private String host; private int port; private RpcDiscover rpcDiscover; public RpcClient(RpcDiscover rpcDiscover) { this.rpcDiscover = rpcDiscover; } public Object send(RpcRequest rpcRequest){ //1 从zookeeper注册中心获取可用的server层服务器地址 String serverAddr = rpcDiscover.discover(); String[] args = serverAddr.split(":"); host=args[0]; port=Integer.valueOf(args[1]); //2 连接服务端 try( Socket socket = new Socket(host, port); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()) ) { //1 通过输出流返送数据到服务端--> 对象输出流 // 调用一个方法 找到对应的字节码对象 通过一个名称 找到对应的方法 调用方法的参数类型, 实际参数列表 oos.writeObject(rpcRequest); oos.flush(); //2 接收服务端数据 RpcResponse rpcResponse = (RpcResponse) ois.readObject(); System.out.println("服务端响应给客户端的请求结果: rpcResponse = " + rpcResponse); return rpcResponse.getResult(); }catch (Exception e){ e.printStackTrace(); } return null; } }
测试
- 浏览器访问:
http://localhost:8080/product/get?id=100,响应正常 - 注意: 一定要先启动ZooKeeper
总结!!!
- 生产者服务启动后会伴随启动一个对应的网络通信服务,IP相同,端口不同(9000)
- 网络服务的初始化与启动需要生产者服务传递它容器对象(applicationContext),因此网络服务当收到消费者的请求时,可以直接拿到生产者服务容器中的所有对象
- 往ZK注册中心注册的是网络通信服务的 IP+端口,并非 生产者服务 的,即zk关注的是每个生产者服务伴随的网络通信服务的状态。
- 消费者服务按照一定的负载均衡策略 通过zk注册中心获取生产者集群中的某个网络通信服务IP+端口,然后用这个地址进行网络通信。
- RPC并不是将消费者服务引用的接口(IProductService)对应的生产者服务容器中的impl实例对象以远程传输的方式给消费者,而是消费者首先拿到的是一个代理对象(RpcProxy);当消费者服务使用这个对象调用方法时,这个代理对象每次通过网络传输去到生产者服务容器中拿到真正的实例,执行具体方法,然后将执行结果通过网络传输返回给消费者。