Java进阶-Zookeeper

分布式基本概念

系统高可用

  1. 概念:所谓的系统高可用,主要是指两个方面
    1. 系统的健壮性, 不允许系统出现单点故障
      1. 要部署多台服务器,任何一台服务器宕机,还有其他服务器正常使用
    2. 系统的处理能力, 可以提高系统的处理能力, 保证系统的运行效率
  2. 集群:
    1. 集群, 主要是原来使用的是一台服务器处理, 现在使用多台服务器保障系统的运行
    2. 集群又分为:主备模式、主从模式、高可用模式
      1. 主备集群: 主要有一个主要节点提供服务, 另外的节点主要是出于备份状态, 平时不提供工作, 一旦主节点出现问题, 备份节点启动运行, 提供正常的服务
        1. 比如:有2台Nginx服务器用来做转发,正常只有一台服务器处于active,另外一台处于备份状态,一旦第一条出现故障,自动切换到第二台,第二台处于active
      2. 主从集群: 集群中的节点都提供服务, 但是每台服务器的角色可能不一样, 比如配置数据库的读写分离, 主数据库可能是写操作, 对于实时性要求不高的读操作就使用从数据库
        1. 即根据业务划分,不同业务访问不同的服务器
        2. 上面的例子注意:主服务器需要即时将数据同步给从服务器,因此实时性不高
      3. 高可用集群: 集群中的节点提供的功能是一样的, 所有的节点没有主从之分,主要是提高系统的高可用, 避免单点故障

分布式

  1. 分布式是系统部署方式
  2. 比如我们的业务系统, 部署一个业务系统需要的环境(应用服务(Tomcat)+数据库服务(MySQL)),
  3. 如果我们把Tomcat和数据库MySQL服务部署在同一台服务器, 我们称之为单机部署, 这样Tomcat和MySQL之间的网络开销可以忽略(直接走127.0.0.1不会消耗网络)
  4. 如果我们把Tomcat和数据库MySQL服务部署在不同的服务器, 我们称之为分布式应用, 因为应用服务器和数据库服务器之间需要走网络通信, 当然, 我们把所有需要走网络这种部署方式称之为”分布式应用”

微服务

  1. 微服务一定是分布式, 但是分布式不一定是微服务
  2. 所谓”微服务” 指的是我们系统的一个 架构设计方式
  3. 以前我们把所有的功能都放到一个项目(应用中), 这种方式我们称之为单体应用
  4. 随着项目开发的功能变多, 架构变强, 我们需要根据模块来进行划分, 每个模块之间通过服务之间的网络调用, 我们称之为微服务架构
  5. 即一个项目有n个功能块,将这个n个功能块拆分,根据每个功能块的使用量进行分别部署,各个功能块之前用网络调用
  6. 主要好处:
    1. 模块化部署,避免一个宕机,导致整个系统不可用
    2. 可以单独针对高并发的场景(秒杀)扩容,添加机器
    3. 方便开发人员独立开发,互不干扰
  7. 注意: 各个模块之前可以同步调用,也可以异步调用
    1. 如果用异步,就用消息队列
    2. 如果用同步,就用Dubbo、springcloud

图1

分布式的使用

  1. 分布式在大数据时代非常重要而且基础的一个概念, 我们在大数据时代, 面临的主要问题有
    1. 海量数据的存储 —> 分布式存储
    2. 海量数据的运算 —-> 分布式计算
    3. 高并发的请求 —–> 分布式系统
  2. 大数据主要研究的是:分布式存储、分布式计算
  3. 分布式存储
    1. 大数据通常使用HBase+HDFS数据库来进行大量数据的分布式存储,可以动态挂n个节点访问数据库
  4. 分布式计算
    1. 分而治之(map、reduce)
      1. 强调的是把一个大的计算任务分发成多个小的计算任务, 并行运算, 最终把结果进行汇总
    2. 对于分布式计算, 我们强调的是移动运算, 而不是移动数据,运算向数据靠近
  5. 分布式系统

分布式协调服务

  1. 所谓分布式:主要是指我们的一个整个的应用是由部署在多个机器上的服务去统一完成,对于部署在多个机器上的应用,通常是有不同的角色,比如有Master和Slave角色(主从)的区分,对于Zookeeper集群,也会有多个节点,主要包括leader节点和follower节点
  2. 所谓协调服务: 主要是指我们的Zookeeper去在我们的分布式系统中充当一个协调者的角色,帮助我们的具体的业务系统之间的相互协调,保证系统的正常运行
  3. 如果客户端直接访问服务器出现问题:
    1. 客户端不知道哪个服务器可以访问
    2. 如果一个服务器宕机,不能自动切换其他正常的服务器
  4. Zookeeper能力
    1. 一个服务器正常启动需要到Zookeeper去注册
    2. 一个服务器宕机,Zookeeper可以自动将当前客户端的访问协调到其他正常的服务器
  5. 正常流程如下图:
    1. 客户端先到Zookeeper去发现服务器,Zookeeper协调到服务,比如server01
    2. 客户端在通过发现的服务器直接去访问server01

    图1

  6. 注意: Zookeeper的作用仅仅是用来协调,并没有充当发送者

Zookeeper基础

应用场景

  1. 服务器在线感知
    1. 所有服务器启动的时候,都向Zookeeper中的一个指定目录写入一个数据/server/serverxxx
    2. 客户端连接Zookeeper,获取到/servers/目录下面所有的可用的服务信息
    3. 客户端根据设置的负载均衡策略去选择某个服务进行请求
    4. 客户端监听/servers这个目录下面的数据改变,如果Zookeeper的这个目录下的数据发生改变,那么会及时的通知客户端数据已经发生改变
    5. 客户端收到Zookeeper的通知,及时的去获取最新的数据
      1. 也就是说客户端不是每次请求都要去Zookeeper取获取服务信息
    6. 当应用程序宕机,在Zookeeper中会及时的删除对应的服务信息
      1. 通过心跳机制实现,服务器每个几秒钟向Zookeeper通知一下,一旦停止接收到服务器的心跳,则证明宕机
  2. 主从(备)协调
    1. 对于我们的集群环境中的多个机器,其中一台是处于活跃状态,可以正常的提供对应的服务,另外的服务器是出于备份状态,只有当活跃状态的机器出现问题,不能提供服务的时候,才会把备份状态的机器切换为活跃状态
    2. 实现步骤
      1. 首先两台服务器Server01、Server02启动,并且同时都往Zookeeper进行注册,写入的信息/server/server01,server02,同时绑定对应的值改变事件
      2. 两台服务器都判定一下自己写入的数据,是否是第一条记录,如果是,直接作为活动节点(主节点),如果不是,则作为备份节点(从节点)
      3. 当Zookeeper中的节点信息发生改变(新的节点加入、存在的节点移除),通知所有的已经绑定值改变事件的客户端,去更新最新的数据
      4. 当所有的服务器收到一个值改变事件,再去判断一下自己节点是否是第一个节点,如果是,把状态修改为活动状态,否则,设置为备份状态。
  3. 配置管理
    1. 在我们的大型的应用中, 对于一个系统的配置会有很多需要配置的参数,比如说数据库的配置, Tomcat的线程数的配置等等, 如果我们不使用统一的配置管理中心的话, 需要在每个应用服务去进行一个单独的配置,这样操作比较麻烦,而且还容易出错,我们可以使用一个统一的配置管理
    2. 提供一个配置管理程序(web页面系统), 用于向Zookeeper中写入对应的数据, 主要包括属性名称和属性值
    3. 所有的服务启动的时候都去读取zookeeper中的配置信息,加载到应用程序,完成系统的正常启动
  4. 名称服务
    1. 命名服务就是指通过指定的名字来获取资源或者服务的地址。Zookeeper会在自己的文件系统上(树结构的文件系统)创建一个以路径为名称的节点,它可以指向提供的服务的地址,远程对象等。
    2. 简单来说使用Zookeeper做命名服务就是用路径作为名字,路径上的数据就是其名字指向的实体。
    3. 即:当客户端调用后台服务接口时,先到达zookeeper,zookeeper中已命名了所有的路径名称,zookeeper会自动协调分配的目的主机。zookeeper根据请求路径名称协调调用目的主机。
  5. 分布式锁
    1. 在分布式系统的架构设计中,有时候需要保证分布式系统中的对于某些接口的原子性的操作, 我们需要控制在同一时刻只能有一个应用程序可以正常操作,其他的程序必须等待在操作的程序完成以后才可以正常的操作数据
    2. 通俗来说,n个服务器的某个接口都要访问这台服务器生成订单id的接口,此时就会造成并发,要保证n个请求获取的值不一样,此时就要用到分布式锁
    3. 实现步骤:
      1. 所有需要访问生成ID的接口的服务的应用都去Zookeeper的指定目录生成一个自己的数据/lock/serverxxx
      2. 所有的业务系统都判断一下自己生成的服务地址是否是所有的地址列表中的最小的一个, 如果是最小的一个,则可以正常访问接口,否则,处于等待状态
      3. 当获得锁对象的应用访问完生成ID的接口以后, 我们需要删除自己在Zookeeper中的服务列表,然后Zookeeper通知所有绑定该事件的订单服务器
      4. 其他的应用再去判断自己的数据是否是Zookeeper中的地址列表中的最小的一个数据,如果是的话,即可以获取到锁对象,开始资源对象的访问

Zookeeper的安装

  1. 背景:
    1. 假设有3台服务器,ip地址分别为192.168.26、27、28.100
    2. 3台服务器分别都要做下面的操作,其中一个是leader,其他两台是follower
  2. 安装步骤
    1. 安装JDK:因为Zookeeper使用Java写的,因此依赖JDK环境
    2. 下载安装包:https://zookeeper.apache.org/releases.html
      1. 下载 Apache ZooKeeper 3.5.9(asc, sha512),下载完后是apache-zookeeper-3.6.3-bin.tar.gz
    3. 解压到Linux服务器的指定目录
      1. 解压包到指定目录下:tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /usr/local/
    4. 修改配置文件
      1. 在安装目录conf文件下,打开zoo_sample.cfg,将该文件中的内容拷贝一份到一个新的文件zoo.cfg中

         //zoo_sample.cfg中内容如下,其他都是注释
         #心跳时间:默认ms
         tickTime=2000
         #集群启动的时候时间限制,10*2000=20s
         initLimit=10
         # 同步数据的时间 10s,这几台zookeeper主从服务器之间同步数据
         syncLimit=5
         # 数据存放的目录
         dataDir=/tmp/zookeeper
         # 客户端连接zookeeper的端口
         clientPort=2181
        
      2. 在配置文件zoo.cf中添加如下内容

         # 修改数据存放的目录,存放到项目的data目录下面
         dataDir=/usr/local/apache-zookeeper-3.6.3-bin/data
         # 集群中的服务列表,1,2,3代表的是一个服务id,需要和myid(dataDir目录下)文件中的内容一致,而且要在1-154之间,规定集群中的服务器,就是这个Zookeeper要部署到3台服务器上,一个主,2个从
         # 2888 数据同步端口,三台机器之前数据同步通信用2888 3888数据选举端口,一个宕机,其他几台要通信选举一个leader,通过3888通信
         server.1=192.168.26.100:2888:3888
         server.2=192.168.27.100:2888:3888
         server.3=192.168.28.100:2888:3888
        
    5. 创建一个标志文件myid
      1. 在dataDir目录下新建一个myid文件夹,里面存放当前这台服务器的id
      2. cd /usr/local/apache-zookeeper-3.6.3-bin/data
      3. echo 1 > myid,其他两台服务器分别设置为2、3
    6. 启动服务器
      1. /usr/local/apache-zookeeper-3.6.3-bin/zkServer.sh start conf/zoo.cfg

         zkServer.sh start  conf/zoo.cfg # 启动命令
         zkServer.sh status conf/zoo.cfg # 查看状态命令
         zkServer.sh stop conf/zoo.cfg # 停止服务命令
         zkServer.sh restart conf/zoo.cfg # 重启服务命令
        

Zookeeper的核心工作机制

  1. ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby(论文)一个开源的实现.雅虎的一个开源产品. Apache的顶级开源项目。谷歌写的论文,雅虎实现,现在属于Apache
  2. 它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
  3. ZooKeeper特性
    1. Zookeeper:一个leader,多个follower组成的集群(3个节点 最佳实践)
    2. 全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
      1. 疑问:如果leader还没有把数据同步到其他节点,那么client请求其他节点,是不是读取不到数据?
      2. 不会,client的请求会被阻塞,直到leader同步过来之后
    3. 分布式更新、写,只能由leader实施。
    4. 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
    5. 数据更新原子性,一次数据更新要么成功(半数以上节点成功),要么失败
    6. 实时性,在一定时间范围内,client能读到最新数据(毫秒级别)
  4. zookeeper数据结构
    1. 层次化的目录结构,命名符合常规文件系统规范
      1. 可以简单理解为一个数据库
      2. 是由一个节点一个节点Node组成的层级关系
      3. 对于每个Node节点,都可以存放数据
    2. 每个节点(文件夹)在zookeeper中叫做znode,并且其有一个唯一的路径标识
    3. 节点Znode可以包含数据(只能存储很小量的数据,<1M)和子节点(但是EPHEMERAL类型的节点不能有子节点)
    4. 客户端应用可以在节点上设置监视器,监听节点的数据、节点变化
    5. 每个节点就相当于一个路径,比如:/service/service001

      图1

  5. 节点类型
    1. Znode有四种形式的目录节点

       PERSISTENT  持久化节点
       PERSISTENT_SEQUENTIAL   持久化顺序节点
       EPHEMERAL   临时节点
       EPHEMERAL_SEQUENTIAL 临时节点顺序节点
      
    2. 创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
    3. 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
    4. 短暂(ephemeral): 断开连接自己删除
    5. 持久(persistent) : 断开连接不删除

Zookeeper的基本操作命令

  1. 服务器的启动和监控

     zkServer.sh  start | stop | restart | status
     jps   查看java进程 QuorumPeerMain
     netstat -natl  查看是否正常
     netstat -ntpl  查看是否正常
    
  2. 客户端连接

     zkCli.sh -server localhost:2181
     如果是连接当前本机地址, 可以直接使用: zkCli.sh
     help: 查看命令, 可以查看具体可以执行的命令
    
  3. 创建节点
    1. create [-s] [-e] [-c] [-t ttl] path [data] [acl]
      1. -s: 顺序节点 默认为否
      2. -e: 临时节点 默认为否
      3. -t ttl 创建一个持久化节点或者一个持久化顺序节点的时候, 指定存活时间
      4. path: 路径 目录
      5. data: 设置路径下存放的数据
    2. 举例:

       //创建持久化节点
       create /pNod pNode
       //创建临时节点,临时节点在会话关闭以后, 会自动删除,临时节点不允许有子节点
       create -e /eNode eNode
       //创建顺序节点,对于顺序节点, 是允许进行节点重复创建, 默认会在节点后面添加一个全局唯一的顺序ID
       create /lock
       create -s  /lock/getId
       create -e -s  /lock/getId2
      
  4. 查看节点
    1. 查看节点信息
      1. ls [-s] [-w] [-R] path
      2. [-s] : 显示统计信息
      3. [-w]: 查看事件信息
      4. [-R]: 显示递归目录
    2. 获取节点数据
      1. get [-s] [-w] path
        1. -s: 显示统计信息
        2. -w: 获取事件信息
      2. 举例,输入 get -s /pNod

         pNode # 获取到的节点数据
         cZxid = 0x600000010  # 创建节点的事务Id
         ctime = Tue Apr 21 16:23:31 CST 2020   #创建时间
         mZxid = 0x600000010 # 修改的事务ID
         mtime = Tue Apr 21 16:23:31 CST 2020 #修改时间
         pZxid = 0x600000010 
         cversion = 0 # 当前版本信息
         dataVersion = 0 #数据版本信息
         aclVersion = 0 #权限版本信息
         ephemeralOwner = 0x0  #临时节点的会话ID
         dataLength = 5 # 数据长度
         numChildren = 0 # 子节点个数
        
  5. 修改节点数据
    1. set [-s] [-v version] path data
      1. -s 设置过程显示节点的状态信息
      2. -v: 使用CAS设置数据, 使用国歌声, 可以使用stat从dataVersion中找到版本
    2. 示例

       set -v 0 /node2 node3
       set /node2 node3
      
  6. 删除节点
    1. 单个节点删除
      1. delete [-v version] path
        1. -v: 在并发的时候, 使用乐观锁进行删除
      2. delete /node2
    2. 删除所有节点(递归删除)
      1. deleteall /pNod
  7. 绑定事件
    1. 绑定一次事件、绑定永久的事件
    2. addWatch [-m mode] path
      1. mode通常值为:PERSISTENT(当前节点)、PERSISTENT_RECURSIVE(当前节点以及子节点)
      2. 默认的模式是绑定当前节点以及子节点的所有的事件
    3. 常见的事件有:
      1. NodeDataChanged: 节点数据改变事件
      2. NodeChildrenChanged: 子节点数量改变事件
      3. NodeCreated: 节点创建时间
    4. 使用
      1. addWatch -m PERSISTENT /cNode
      2. 如果对于数据的获取:一般绑定的是NodeDataChanged
      3. 如果对于获取子节点列表: 一般绑定的是NodeChildrenChanged

Zookeeper应用

Java客户端API

  1. 常见的jar包使用
    1. 自带的 zkclient
      1. apache自带原生的,没有任何封装
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.6.0</version>
       </dependency>
      
    2. Apache 开源的 Curator : 企业项目中常用
      1. 内部封装了分布式锁、事务管理等丰富功能
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>4.3.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-client</artifactId>
           <version>4.3.0</version>
       </dependency>
      
    3. Apache 开源的ZkClient(com.101tec): 已经不更新
  2. 这里的本质就是使用java的API来操作上面讲的基本操作命令,用代码来替代命令
  3. 代码举例
    1. 快速新建一个SpringBoot项目
      1. 新建项目文件夹JavaAdvanced,打开IDEA->Create New Project
      2. 选择Spring Initializr->next,配置下面参数,然后选择next

         Group: com.zh.demo
         Artifact: 01-zookeeper-api-demo
         Java Version: 8
         packge: com.zh.demo.zookeeperapidemo
        
      3. 点击Developer Tools->勾选Lombok,点击Web勾选Spring Web->顶部有个SpringBoot,选择版本2.5.4
      4. 点击next->finish
      5. pom文件添加zookeeper依赖

         <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
             <version>3.6.3</version>
         </dependency>
        
    2. 启动之前安装的3台zookeeper服务器集群(ZK集群)
    3. 使用步骤
      1. 在入口类Application中连接ZK集群

         @SpringBootApplication
         public class Application {
             //ZK集群的地址,端口,多个用逗号隔开
             private  static  String ZK_SERVER_ADDR="192.168.26.100:2181,192.168.27.100:2181,192.168.28.100:2181";
             //设置连接会话超时时长
             private static  int SESSION_TIMEOUT=30000;
                    
             public static void main(String[] args) {
                 SpringApplication.run(Application.class, args);
             }
             //创建一个zookeeper的连接
             @Bean //创建之后放到容器
             public ZooKeeper zooKeeper() throws Exception{
                 // 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
                 ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
                     @Override
                     public void process(WatchedEvent event) {
                         System.out.println("event = " + event);
                         if(event.getState()== Event.KeeperState.SyncConnected){
                             System.out.println("zookeeper客户端连接成功");
                         }
                     }
                 });
                 return zooKeeper;
             }
                    
         }
        
      2. 创建一个ZKApiController封装各种API专门用于操作ZK集群

         @RestController
         public class ZKApiController {
                    
             @Autowired
             private ZooKeeper zooKeeper;
                    
                    
             /**
              * 创建一个节点
              * @param path  节点路径
              * @param data  节点数据
              * @param type  节点类型
              * @return
              * @throws Exception
              * 比如传参:/servers/server001  db_server   PERSISTENT
              * PERSISTENT_SEQUENTIAL:永久顺序节点  PERSISTENT 永久节点
              */
             @RequestMapping("createNode")
             public String createNode(String path,String data,String type) throws Exception{
                 String result = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.valueOf(type));
                 return result;
             }
                    
             /**
              *获取节点数据
              * @param path 节点的名称
              * @return
              * @throws Exception
              * 比如传参:/servers/server001 则获取到结果db_server
              */
             @RequestMapping("getData")
             public String getData(String path) throws Exception{
                 //1 先去查询版本信息  如果没有, 返回的是一个null
                 Stat stat = zooKeeper.exists(path, false);
                 //同步获取数据
                 byte[] data = zooKeeper.getData(path, false, stat);
                 System.out.println("new String(data) = " + new String(data));
                 return new String(data);
             }
                    
             /**
              * 异步获取数据,不经常用,因为节点数据本来就不大
              * @param path
              * @return
              * @throws Exception
              */
             @RequestMapping("getDataAsync")
             public String getDataAsync(String path) throws Exception{
                 //1 先去查询版本信息
                 Stat stat = zooKeeper.exists(path, false);
                 //异步获取数据,通过callback获取数据
                 zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
                     @Override
                     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                         System.out.println("异步处理回调数据");
                         System.out.println("收到的数据:"+new String(data));
                         System.out.println("ctx = " + ctx);
                     }
                 }, "测试数据");
                    
                 return "异步获取数据";
             }
                    
             /**
              * 获取子节点
              * @param path 需要获取子节点的路径
              * @return
              * @throws Exception
              */
             @RequestMapping("getChildren")
             public List<String> getChildren(String path) throws Exception{
                 List<String> children = zooKeeper.getChildren(path, false);
                 return children;
             }
                    
                    
             /**
              * 删除节点
              * @param path 要删除的路径节点
              * @return
              * @throws Exception
              */
             @RequestMapping("delete")
             public String delete(String path) throws Exception{
                 Stat stat = zooKeeper.exists(path, false);
                 if(stat!=null){
                     zooKeeper.delete(path,stat.getVersion());
                 }
                 return "删除成功";
             }
                    
                    
             /**
              * 节点更新: 指的是更新数据
              * @param path 需要更新的节点
              * @param data 更新的数据
              * @return
              * @throws Exception
              */
             @RequestMapping("update")
             public String update(String path,String data) throws Exception{
                 Stat stat = zooKeeper.exists(path, false);
                 if(stat!=null){
                     zooKeeper.setData(path,data.getBytes(),stat.getVersion());
                 }
                 return "更新成功";
             }
                    
             /**
              * 绑定事件,一次性事件
              * 如果绑定事件的get、set方法(getData getChildren),绑定的就是数据改变事件
              * 如果调用的方法是ls->getChildren,绑定的事件就是子节点改变事件(增加、减少)
              * 绑定的事件是一次性的,通知完会移除事件
              * @param path 需要绑定事件的节点
              * @return
              * @throws Exception
              */
             @RequestMapping("addWatch1")
             public String addWatch1(String path) throws Exception{
                 Stat stat = zooKeeper.exists(path, false);
                 //定义一个监视器对象
                 Watcher watcher = new Watcher() {
                     @Override
                     public void process(WatchedEvent event) { // 数据改变事件,而且还是一次性
                         System.out.println("事件类型:" + event.getType());
                         System.out.println("数据发生改变, 请及时更新");
                         try {
                             //事件触发完,重新绑定,因为事件是一次性的,下次触发还能响应事件
                             byte[] data = zooKeeper.getData(path, this, stat);
                             System.out.println("更新后的数据:"+new String(data));
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
                     }
                 };
                 //给getData绑定事件监听
                 zooKeeper.getData(path, watcher, stat);
                 return "success";
             }
                    
             /**
              * 绑定永久事件
              * 监听:数据变化事件  子节点改变事件
              * @param path
              * @return
              * @throws Exception
              */
             @RequestMapping("addWatch2")
             public String addWatch2(String path) throws Exception{
                 Stat stat = zooKeeper.exists(path, false);
                 //只是获取数据, 没有绑定事件
                 byte[] data = zooKeeper.getData(path, null, stat);
                 System.out.println("获取到数据:"+new String(data));
                 //绑定永久的事件  --> 1 数据变化事件  2  子节点改变事件
                 zooKeeper.addWatch(path, new Watcher() {
                     @Override
                     public void process(WatchedEvent event) {
                         System.out.println("event = " + event);
                         if(event.getType()== Event.EventType.NodeDataChanged){
                             try {
                                 //重新获取数据
                                 Stat stat = zooKeeper.exists(path, false);
                                 //只是获取数据, 没有绑定事件
                                 byte[] data = zooKeeper.getData(path, null, stat);
                                 System.out.println("更新的数据:"+new String(data));
                             } catch (Exception e) {
                                 e.printStackTrace();
                             }
                         }else if(event.getType() == Event.EventType.NodeChildrenChanged){
                             //重新获取子节点列表
                             System.out.println("子节点数据发生改变");
                         }
                     }
                 }, AddWatchMode.PERSISTENT);
                 return "success";
             }
         }
        

ZK应用实战

图1

  1. 上图分析:
    1. 秒杀服务相当于服务器,采用分布式部署;订单服务相当于客户端,向秒杀服务请求数据
    2. ZK集群就相当于上面部署的三个SK主机,分别运行
    3. 主要分为3个方面
      1. 部署好ZK集群,并启动
      2. 服务器集群启动,向ZK集群注册信息
      3. 客户端向ZK集群获取服务列表
    4. 服务器集群启动,向ZK集群注册信息
      1. 连接Zookeeper注册中心
      2. 创建一个目录节点(临时的顺序节点servers/crmserver0001
        1. 之所以用临时节点的目的就是能够自动删除
      3. 当有节点增加的时候,注册临时顺序节点即可
      4. 当有服务器退出的时候,会自动删除临时节点
    5. 客户端向ZK集群获取服务列表
      1. 连接Zookeeper注册中心
      2. 获取指定目录下面的子节点列表(/servers)
      3. 注册(添加)子列表改变事件(持久性事件)
      4. 当前/servers下面的子节点发生改变,即时的通知客户端
      5. 把获取的数据缓存到本地列表
  2. 秒杀服务代码示例
    1. 跟之前一样,新建一个springboot项目02-zookeeper-seckill-server
    2. Application代码

       @SpringBootApplication
       public class Application {
           //ZK集群的地址,端口,多个用逗号隔开
           private  static  String ZK_SERVER_ADDR="192.168.26.100:2181,192.168.27.100:2181,192.168.28.100:2181";
           //设置连接会话超时时长
           private static  int SESSION_TIMEOUT=30000;
              
           //要注册的节点路径
           private static  String PATH="/servers";
           private static  String SUB_PATH="/seckillServer";
              
           @Value("${server.host}")
           private String host;
           @Value("${server.port}")
           private String port;
              
           private ZooKeeper zooKeeper;
              
           public static void main(String[] args) {
               SpringApplication.run(Application.class, args);
           }
           //创建一个zookeeper的连接
           @Bean //创建之后放到容器
           public ZooKeeper zooKeeper() throws Exception{
               // 1. 连接Zookeeper注册中心
               zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
                   @Override
                   public void process(WatchedEvent event) {
                       System.out.println("event = " + event);
                       if(event.getState()== Event.KeeperState.SyncConnected){
                           System.out.println("zookeeper服务端连接成功");
                           //2. 创建一个目录节点,启动服务注册节点,注册对应的信息,创建临时顺序节点
                           try {
                               zooKeeper.create(PATH+SUB_PATH, (host+":"+port).getBytes(),
                                       ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                           System.out.println(1234);
                       }
                   }
               });
               return zooKeeper;
           }
              
       }
      
    3. application.properties

       server.port=8888
       server.host=192.168.48.1
      
    4. 服务器一旦停止会自动删除节点,因为是顺时节点
  3. 订单服务代码示例
    1. 跟之前一样,新建一个springboot项目03-zookeeper-order-server,作为客户端,通过ZK集群访问秒杀服务集群
    2. Application代码举例:

       @SpringBootApplication
       public class Application {
           //ZK集群的地址,端口,多个用逗号隔开
           private  static  String ZK_SERVER_ADDR="192.168.26.100:2181,192.168.27.100:2181,192.168.28.100:2181";
           private static  int SESSION_TIMEOUT=30000;
           private static  String PATH="/servers";
              
           public static List<String> addrList;
              
           // volatile: 保证在多线程之间的变量的可见性,就是当这个实例在子线程使用时要用volatile修饰
           private volatile ZooKeeper zooKeeper;
              
              
           public static void main(String[] args) {
               SpringApplication.run(Application.class, args);
           }
           @Bean
           public ZooKeeper zooKeeper() throws Exception{
               // 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
               zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
                   @Override
                   public void process(WatchedEvent event) {
                       System.out.println("event = " + event);
                       if(event.getState()== Event.KeeperState.SyncConnected){
                           System.out.println("zookeeper客户端连接成功");
                           try {
                               //1 获取对应的地址列表
                               getData();
                               //2 绑定永久的事件监听
                               zooKeeper.addWatch(PATH, new Watcher() {
                                   @Override
                                   public void process(WatchedEvent event) {// 事件监听是开启另外的线程处理
                                       try {
                                           getData();
                                       } catch (Exception e) {
                                           e.printStackTrace();
                                       }
                                   }
                               }, AddWatchMode.PERSISTENT);
              
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                       }
                   }
                   //获取数据
                   private void getData() throws KeeperException, InterruptedException {
                       List<String> serverAddr = zooKeeper.getChildren(PATH, null);
                       List<String> tempList=new ArrayList<>();
                       for (String path : serverAddr) {
                           //获取节点路径数据
                           byte[] data = zooKeeper.getData(PATH+"/"+path, null, new Stat());
                           String addrInfo = new String(data);
                           // 把数据添加到临时列表
                           tempList.add(addrInfo);
                       }
                       addrList=tempList;
                       System.out.println("获取到秒杀服务的最新地址\n"+addrList);
                   }
               });
               return zooKeeper;
           }
       }
      

分布式锁

  1. 为什么程序中需要锁
    1. 多个任务同时执行, 可以是多线程, 也可以是多进程
    2. 所有的任务都需要对同一资源进行写操作
    3. 对资源的访问是互斥的,对于资源的访问, 不同多个任务同时执行, 同一时间只能一个任务访问资源, 其他的任务处于阻塞状态
  2. 锁的基本概念
    1. 竞争锁
      1. 任务通过竞争获取锁才能对该资源进行操作
      2. 公平竞争: 按照一定的顺序, 先来先执行
      3. 非公平竞争:没有顺序, 不管先后顺序执行
    2. 占有锁:当有一个任务在对资源进行更新时
    3. 任务阻塞: 其他任务都不可以对这个资源进行操作
    4. 释放锁:直到该任务完成更新
  3. 案例:
    1. 一个生产订单的接口,如果有n个线程、进程访问,就会出现资源争夺现象
  4. 本节主要是讲利用ZK的一些特性来实现 多进程访问同一个资源 的分布式锁

情况1:一个进程n个线程访问同一个资源

  1. 生成订单函数,添加synchronized锁即可

     public class OrderIdGenerator {
         private  int count=0; //共享资源
         //生成id
         //synchronized --> jvm 如果是夸jvm, 分布式程序, 多进程之间进行数据的共享--> 分布式锁
         public synchronized String getId(){
             String id = null;
             try {
                 TimeUnit.MILLISECONDS.sleep(10);//模拟网络延迟
                 //SimpleDateFormat 用来把日期和字符串进行相互转换
                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                 count=count+1;//并发的原子性, 有序性, 可见性
                 //format可视化日期, parse: 解析字符
                 id = sdf.format(new Date()) + "-" + count;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return id;
         }
     }
    
  2. 模拟多线程, 调用获取订单id的方法

     public class OrderService  implements  Runnable{
         //用来保存所有的订单id   1 去重操作 2 有序的
         private static Set<String> ids=new HashSet<>();
         // CountDownLatch 并发编程的api 发令抢 主要用于程序等待
         private static CountDownLatch countDownLatch=new CountDownLatch(50);
         //id生成器
         private OrderIdGenerator orderIdGenerator=new OrderIdGenerator();
         public static void main(String[] args) throws Exception{
             OrderService orderService = new OrderService();//i=0
             //开启50个线程
             for(int i=0;i<50;i++){
                 new Thread(orderService).start();
             }
             //等待所有线程执行完成, 在去执行主线程
             countDownLatch.await();//等待状态, countDownLatch 中计数器的值变为0
             System.out.println("获取到的订单的id数量:"+ids.size());
             System.out.println("==============================");
             System.out.println(ids);
         }
         @Override
         public void run() {
             //调用id生成的方法  多线程执行的任务
             String id = orderIdGenerator.getId();
             ids.add(id);
             countDownLatch.countDown();//让计数器的值减一
         }
     }
    
  3. 该方法只能解决同一个应用进程,n个线程访问的情况,无法解决多个应用进程同时访问情况

情况2: n个进程访问一个资源,使用ZK同名节点创建

  1. 实现步骤:n个服务器,访问订单接口
    1. 本质就是n个服务到zk中去创建一个同名节点,因为是同名的节点那么只有一个服务可以创建成功,利用同名节点这个特性,来进行资源的原子性共享
    2. 每个服务去zooKeeper尝试获取(创建)锁资源(节点),获取成功,返回true,反之false
      1. 如果创建/locks/orderIdLock节点成功,返回true,反之false
    3. 如果获取成功
      1. 开始调用生成订单id的服务,可通过远程调用接口
      2. 执行完成,释放锁资源(unlock)
      3. 执行unlock,本质就是删除/locks/orderIdLock节点,其他处于等待状态的节点就会收到监听
    4. 如果获取失败
      1. 对于当前连接,注册一个事件(子节点改变事件getChildren)必须是一次性的事件,避免重复注册
      2. 在事件中进行数据处理,重新再去过去订单id,调用方法
  2. 代码举例:
    1. 各个服务连接ZK集群

       @SpringBootApplication
       public class Application {
           private  static  String ZK_SERVER_ADDR="192.168.26.100:2181,192.168.27.100:2181,192.168.28.100:2181";
           private static  int SESSION_TIMEOUT=30000;
           public static void main(String[] args) {
               SpringApplication.run(Application.class, args);
           }
           //创建一个zookeeper的连接
           @Bean
           public ZooKeeper zooKeeper() throws Exception{
               CountDownLatch countDownLatch = new CountDownLatch(1);//并发等待
               // 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
               ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
                   @Override
                   public void process(WatchedEvent event) {
                       System.out.println("event = " + event);
                       if(event.getState()== Event.KeeperState.SyncConnected){
                           countDownLatch.countDown();//等待连接成功在返回对象
                       }
                   }
               });
       //        zooKeeper.getState(); connecting --> connected
              
               countDownLatch.await();
               return zooKeeper;
           }
       }
      
    2. 模拟各个服务服务访问订单id程序如下

       @RestController
       public class OrderController {
              
           private  RestTemplate restTemplate=new RestTemplate();
              
           @Autowired
           private ZooKeeper zooKeeper;
              
           private String path="/locks";
           private String node="/orderIdLock";
              
           @RequestMapping("createOrder")
           public String createOrder() throws Exception{
               //获取id
               if(tryLock()){ //尝试获取锁
                   //调用业务方法 生成订单编号
                   String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
                   System.out.println("获取到的id:"+id);
                   //释放锁
                   unlock();
               }else {
                   //等待锁
                   waitLock();
               }
               return "success";
           }
           //尝试获取id, 如果获取到了, 返回true, 否则返回false
           //竞争锁资源
           public boolean tryLock(){
               try {
                   zooKeeper.create(path+node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                   return true;
               } catch (Exception e) {
                   e.printStackTrace();
                   return false;
               }
           }
           //释放锁资源
           public void unlock(){
               //删除指定节点
               try {
                   Stat stat = zooKeeper.exists(path + node, false);
                   if(stat!=null){
                       zooKeeper.delete(path+node, stat.getVersion());
                   }
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
              
           //阻塞状态 --等待锁
           public  void waitLock(){
               //绑定监听时间
               try {
                   //绑定的是一次性事件
                   zooKeeper.getChildren(path, new Watcher() {
                       @Override
                       public void process(WatchedEvent event) {
                           if(event.getType()== Event.EventType.NodeChildrenChanged){
                               try {
                                   createOrder();
                               } catch (Exception e) {
                                   e.printStackTrace();
                               }
                           }
                       }
                   });
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       }
      
    3. 模拟订单id生成服务接口如下

       @RestController
       public class OrderIDGeneratorController {
              
           private  int count=0;
              
           @RequestMapping("getId")
           public String getId(){
               String id = null;
               try {
                   TimeUnit.MILLISECONDS.sleep(50);
                   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                   count=count+1;//并发的原子性, 有序性, 可见性
                   id = sdf.format(new Date()) + "-" + count;
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               return id;
           }
       }
      
  3. 本质分析:
    1. 该分布式锁的实现原理就是利用了ZK创建同名节点的特性,zk只能创建一个同名的节点,如果该节点已经存在,其他客户端无法创建该节点
    2. 也就是说,ZK并不是具备分布式锁,而是ZK的一些特性具有原子性,可以实现分布式锁的能力
  4. 使用同名节点弊端:
    1. 一旦任务处理完释放锁,会通知n个监听的服务
    2. n个监听的服务会同时访问zk进行节点创建,比较消耗性能

情况3: n个进程访问一个资源,使用顺序节点

  1. 顺序节点使用原理
    1. 所有的订单服务都去ZK集群创建一个临时顺序节点
    2. 尝试获取锁(trylock)
      1. 判断当前节点是不是第一个节点(最小的节点),如果是,返回true;反之返回false
    3. 获取到锁
      1. 执行任务,调用生成id
      2. 释放锁,删除节点
    4. 没有获取到
      1. 找到当前节点的前一个节点
      2. 在前一个节点绑定一个结点删除事件(这样当一个节点删除之后,只会触发下一个节点的对应服务监听事件,不会触发所有服务监听事件)
      3. 当节点删除之后,触发事件,此时锁已经获取到了
      4. 执行任务,释放锁
  2. 代码举例:

     @RestController
     public class Order02Controller {
        
         private RestTemplate restTemplate = new RestTemplate();
        
         @Autowired
         private ZooKeeper zooKeeper;
        
         private String path = "/locks02";
         private String node = "/orderIdLock";
        
         @RequestMapping("createOrder02")
         public String createOrder() throws Exception {
             //创建一个临时顺序节点   /locks02/orderIdLock0000000001
             String currentPath = zooKeeper.create(this.path + this.node,
                     null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
             // orderIdLock0000000001 字符串处理
             currentPath=currentPath.substring(currentPath.lastIndexOf("/")+1);
             //获取id
             if (tryLock(currentPath)) {
                 //调用业务方法
                 String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
                 System.out.println("获取到的id:" + id);
                 //释放锁
                 unlock(currentPath);
             } else {
                 waitLock(currentPath);
             }
             return "success";
         }
        
         //尝试获取id, 如果获取到了, 返回true, 否则返回false
         //竞争锁资源
         public boolean tryLock(String currentPath) {
             try {
                 //1 获取所有的子节点列表
                 List<String> children = zooKeeper.getChildren(path, false);
                 Collections.sort(children);
                 //2 判断当前的currentPath是否是最小的节点
                 if(StringUtils.pathEquals(currentPath,children.get(0))){
                     return true;
                 }else{
                     return false;
                 }
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return false;
         }
        
         //释放锁资源
         public void unlock(String currentPath) {
             try {
                 Stat stat = zooKeeper.exists(path + "/" + currentPath, false);
                 if(stat!=null){
                     zooKeeper.delete(path + "/" + currentPath,stat.getVersion());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
        
         //阻塞状态
         public void waitLock(String currentPath) {
             try {
                 //1 获取所有的子节点列表
                 List<String> children = zooKeeper.getChildren(path, false);
                 Collections.sort(children);
                 //拿到当前节点的位置
                 int index = children.indexOf(currentPath);
                 if(index>0){
                     String preNode = children.get(index - 1);
                     //对前一个节点绑定监听节点删除事件
                     zooKeeper.getData(path + "/" + preNode, new Watcher() {
                         @Override
                         public void process(WatchedEvent event) {//前一个节点删除
                             if(event.getType()== Event.EventType.NodeDeleted){
                                 //调用业务方法
                                 String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
                                 System.out.println("获取到的id:" + id);
                                 //释放锁
                                 unlock(currentPath);
                             }
                         }
                     }, new Stat());
                 }
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
    
  3. 优点分析:
    1. 该分布式锁实现的原理就是利用zk顺序节点的特性,发现当前服务是否是最小的顺序节点,如果是就访问资源,如果不是就监听上一个节点的删除情况。一旦上个节点删除,当前服务就去访问资源
    2. 由于每个服务都是去监听当前服务对应的上一个节点,那么就意味着,当最小节点删除的时候,只会触发下一个节点对应服务的监听事件,不会所有服务都会触发

知识点补充

  1. 数据库单表数据超过500w性能会下降
  2. 一个Tomcat的真正并发请求能达到350(同时能处理350个并发请求),默认是150个
    1. 打开.../apache-tomcat-9.0.34/conf/server.xml
     <Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
         maxThreads="150" minSpareThreads="4"/>
    
  3. Zookeeper的选举机制:ZK集群有个是leader,其余全是follower,这个leader是如何选举的?
    1. 我们在进行zk集群启动的时候, 集群中会有Leader节点,Follower节点, 一个集群中只会有一个Leader节点, 并且在启动节点的时候Leader并不是固定的, 而是通过一定的选举策略所产生
    2. 在选择Leader节点的时候, 需要进行投票(vote), 其中每个节点都可以进行投票, 并且把字节的投票结果发送给其他的所有节点,其中投票的主要的信息vote 包含两个字段(myid,zxid): 其中myid代表的是服务器节点的id, zxid代表的是选举的全局事务ID
      1. 当节点处于looking状态的时候, 会开始进行投票
      2. 第一次投票的时候, 永远是投自己的票vote(myid,zxid) 发送自己的ID信息
      3. 当收到其他节点发送的投票信息voteN(myidN,zxidN)时候, 会进行vote信息比较
        1. 首先根据zxid进行比较, zxid值最大的准备选择为leader
        2. 如果zxid相等, 那么根据myidN进行比较, 选择myIdN值大的作为leader, 重新发出投票信息
      4. 当有超过半数的server选择相同的server作为leader,那么leader节点选择完成,改变服务器状态
Table of Contents