Java进阶-Redis进阶

持久化

  1. redis存储在内存中,一旦机器断点、重启数据就没了,如何保证数据的可靠性?
  2. 整体思路:将redis数据进行持久化,在服务器断电之后重新启动,数据还能恢复Redis
  3. 持久化:Redis所有的数据都是保存在内存当中, 对数据的更新将异步或者同步的保存到磁盘上
  4. 持久化作用:避免数据丢失,对数据进行备份,可以还原指定时间的数据
  5. 持久化的方式:
    1. 快照
      1. 将redis中的数据写入到磁盘中,磁盘中的数据是二进制数据,又叫做RDB文件
      2. 一旦redis重启,读取RDB文件,恢复数据
      3. 会默认在/usr/local/redis-6.2.5/data/下生成dump.rdb文件
    2. 写日志
      1. 将写入到redis的命令写入到磁盘中,磁盘中存储的数据是命令,为AOF文件
      2. 一旦redis重启,读取AOF文件,重新执行命令,恢复数据

RDB方式持久化

  1. 触发方式
    1. 执行save(同步)命令(很少用
      1. 如果存在老的rdb文件, 新的文件会替换老的文件
    2. 执行bgsave(异步)命令(很常用
      1. 已使用异步处理使用linux的fork函数
    3. 自动配置,满足什么条件才会自动持久化

       //这里尽管写的是save但是实际是使用bgsave生成文件频繁的生成RDB文件
       //900s内有1个key修改,就会触发bgsave
       save 900 1
       //300s内有10个key修改,就会触发bgsave
       save 300 10
       //60s内有1w个key进行修改,就会触发bgsave
       save 60 10000
      
      1. 注意:自动配置在配置文件中有默认配置redis_6379.conf
  2. 自动触发机制(下面情形会自动触发)
    1. 执行“全量复制”命令
    2. 执行debug reload命令
    3. 执行shutdown命令
  3. 其他相关配置(也是在配置文件redis_6379.conf中设置)

     dbfilename dump_6379.rdb  #设置备份文件名称,默认是dump.rdb
     dir /usr/local/redis-5.0.8/data   #设置要备份目录路径
     stop-writes-on-bgsave-error yes #在出现错误的时候终止rdb备份
     rdbcompression yes  #是否进行压缩,在内存中压缩,减少占用空间
     rdbchecksum yes     # 是否进行检查sum值校验
    
  4. 不足之处
    1. 耗时、耗性能IO性能:将数据频繁写入IO耗性能
    2. 不可控,丢失数据,宕机

AOF方式持久化

  1. AOF的三种触发方式
    1. always
      1. 每条命令都要写到磁盘
      2. 实际流程:redis–》写命令刷新的缓冲区—》每条命令fsync到硬盘—》AOF文件
    2. everysec
      1. 每隔1s写到磁盘
      2. everysec(配置文件默认值):redis——》写命令刷新的缓冲区—》每秒把缓冲区fsync到硬盘–》AOF文件
    3. no
      1. 什么时间写到磁盘,由操作系统决定
      2. edis——》写命令刷新的缓冲区—》操作系统决定,缓冲区fsync到硬盘–》AOF文件
  2. 三种策略的比较
    1. always:不丢失数据 IO开销大,一般的sata盘只有几百TPS
    2. everysec:每秒一次fsync,丢失1秒数据(最常用)
    3. no: 不可控
  3. 配置文件中配置

     appendonly yes  # 是否开启aof文件模式
     appendfilename "appendonly-6379.aof" # aof文件名称
     appendfsync  everysec  # aof策略模式
     dir /redis/data  #数据存储路径
        
     //以下三种配置是aof重写配置
     no-appendfsync-on-rewrite yes  #在重写的时候, 不要执行aof操作
     auto-aof-rewrite-percentage 100  # 自动重写的百分比
     auto-aof-rewrite-min-size     64mb # 重写的大小配置
    
  4. AOF重写
    1. 随着命令的逐步写入,并发量的变大,AOF文件会越来越大,通过优化的命令,来优化这样可以减少磁盘占用量,加速恢复速度
    2. 即当AOF文件达到某个限值时,redis会优化命令,将n条命令优化成1条,重新存储
    3. AOF重写配置

       auto-aof-rewrite-min-size AOF文件重写需要的大小
       auto-aof-rewrite-percentage: AOF文件增长率
       aof_current_size  AOF当前尺寸(单位:字节)
       aof_base_size     AOF上次启动和重写的尺寸(单位:字节)
       自动触发时机(两个条件同时满足)
       aof_current_size>auto-aof-rewrite-min-size:当前尺寸大于重写需要尺寸
       (aof_current_size-aof_base_size)/aof_base_size>auto-aof-rewrite-percentage:(增长率)当前尺寸减去上次重写的尺寸,除以上次重写的尺寸如果大于配置中的增长率
      
  5. RDB和AOF比较
    1. RDB和AOF都有各自的缺点
    2. 因此可以两种混合:混合持久化内容的AOF文件,配置文件做如下配置

       appendonly yes  # 开启oaf模式
       aof-use-rdb-preamble yes # 使用rdb和aof的混合模式
      

Redis高可用

  1. redis为什么要高可用?
    1. 单点故障,服务器宕机,无法访问
    2. 一台服务器的吞吐量有限(11w/s)
    3. 提高存储性能(增加存储量)
  2. 我们在做主从分离的时候,是一台主服务器,多台从服务器.如果主服务器挂了意味我们无法给引用提供服务了.高可用就是我们会有多台主服务器,如果一台主服务器挂了,它下面的从服务器会自动挂载到新的主服务器中.
  3. 三种形式:
    1. 主从形式:只有一台机器是主节点,有多台机器是从节点.如果主节点挂了,应用就挂.
    2. 哨兵形式(Redis2.x版本):在主从模式下,会有一台机器充当哨兵的角色.它去监控其他机器的状态.当主节点挂了会在从节点之间进行选举,看那台机器性能比较好,比较高,然后把指定的一台从节点升级为主节点,实现高可用;下次主节点修复好了,会自动加入进来变成从节点.
    3. 集群模式(Redis3.0之后):可以拥有多主多从,如果其中一台主节点挂了,会选举下面的从节点为主节点,可以做哨兵的这个事情

redis的主从复制

  1. 一个主(master)节点,n个从节点(slave)
  2. 写数据只能是master节点,外部读取数据从slave读取,slave定时的从master同步数据
  3. 即:一个Master可以有多个slave,一个slave只能有一个master,数据流向是单向的, 只能从Master流向Slave
  4. 主从复制的作用
    1. 单机故障
    2. 读写分离
    3. 一主多从
    4. 多副本
  5. 复制的配置
    1. 从服务器slave如何实现复制呢?
    2. 方法一:通过命令直接从master服务器复制

       //直接从6379服务端口Master进行数据复制
       slaveof 127.0.0.1 6379 
       //其他命令: 当前服务器不是任何服务器的从服务器,即这台服务器也是master有master性质,这样能保证这台服务器也可以写数据
       slaveof no one
      
    3. 方法二:配置slave服务器redis配置文件

       //老的方法
       slaveof ip port
       //新方法,等价于上面
       replicaof ip port
       //这台redis服务器只能读取,不能写入
       replica-read-only yes
      
  6. 主从节点的问题
    1. 如果主节点挂了,从节点需要手动做如下操作
      1. 将某个从节点切换为主节点
      2. 设置可以读写
      3. 将其他节点配置连接新的主节点
    2. 解决上面问题的办法-redis的哨兵机制Redis Sentinel

redis的哨兵机制Redis Sentinel

  1. 配置一台redis哨兵服务器,这台服务器专门监听所有redis集群(主节点、从节点)
  2. 外部连接这台哨兵服务器
  3. 哨兵服务器可以监听所有集群中的节点运行情况,可以重新选举主节点、修改其他节点的指向
  4. 如何配置一台哨兵服务器 略
  5. 代码举例:外部如何连接哨兵服务器

     @Test
     public void testfailOverx() throws Exception {
         //redis服务集群的名字
         String masterName="mymaster";
         //哨兵集群的名字
         Set<String> sentinels=new HashSet<>();
         //添加集群redis
         sentinels.add("192.168.48.102:3367");
         sentinels.add("192.168.48.103:3367");
         sentinels.add("192.168.48.104:3367");
         sentinels.add("192.168.48.105:3367");
         JedisSentinelPool pool =new JedisSentinelPool(masterName,sentinels);
         //获取到连接客户端
         while (true){
             Jedis jedis = null;
             try {
                 //要从哨兵服务器获取具体redis的主节点
                 jedis=pool.getResource();
                 jedis.incr("age");
                 System.out.println("age="+jedis.get("age"));
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } finally {
                 if(jedis!=null){
                     jedis.close();
                 }
             }
         }
     }
    
  6. 注意:主从+哨兵由于集群中的主从节点存储的都是一样的数据,因此无法提高数据的存储量

redis集群模式Redis Cluster

  1. 为什么需要使用集群?
    1. 并发量: 如果我们需要的并发量达到100w/s的时候, 单个服务的性能是11w/s, 这个时候我们可能就需要10个服务器集群同时提供服务
    2. 数据量:
      1. 我们需要存储更多的数据的时候, 可以考虑使用Redis集群, 集群中的每个服务器都存放一部分数据
      2. 比如说一个服务存放的数据量是200G, 那么如果需要存放1T的数据的话, 我们需要5台服务器进行集群操作
    3. 网络流量
      1. 单台服务器的网卡的流量是固定的, 比如说一个服务器的网卡是1G, 那么需要达到10G的网络流量速度的话, 我们可以使用10台服务器集群
  2. 数据分布
    1. 既然是集群,那么总量的数据如何分布到各个节点呢?即数据按照什么规则进行分区呢?
    2. 分区规则2种类型:顺序分区、Hash分区
    3. 顺序分区
      1. 比如100条数据,第一个节点前33条,第二个节点中间33条,最后一个结点最后34条
      2. 特点
        1. 分区的规则和业务数据的key是有关联的
        2. 数据散列不均匀:比如每个放30,有3个节点,那么如果有30条数据,后面两个节点没有数据
        3. 支持批量操作
        4. 可以顺序访问
    4. Hash分区
      1. 将每条数据进行hash运算得到值,然后除以节点总数后的余数 (hash(k)%n),根据余数分别放到不同的节点
      2. 特点
        1. 数据分散度高
        2. 键值分布与业务无关
        3. 支持批量操作
        4. 无法顺序访问
  3. 集群特点:多个master、多个slave
  4. 搭建高可用集群需要做哪些事情
    1. 启动多个集群服务器
    2. 使用meet操作,让服务器自动感知
    3. 给每个master服务器指定slot操作槽位
    4. 给master节点指定复制节点(从节点)

搭建集群(承接之前的单机配置)

  1. 集群分布如下图:

    图1

  2. 新建redis_7000.conf
     cd /usr/local/redis-6.2.5/conf
     //将redis.conf 拷贝到该文件夹下,命名为redis_7000.conf
     //vi redis_7000.conf
    
  3. 然后修改配置文件内容如下:

     port 7000   # 设置端口
     daemonize yes  #设置是否以守护进程开启
     dir  /usr/local/redis-5.0.8/data  #设置目录,例如日志,rdb文件等等
     dbfilename dump_7000.dbf  #设置rdb文件名
     logfile 7000.log  # 设置日志文件名
     cluster-enabled yes  #是否开启集群模式
     cluster-config-file  node-7000.conf  #集群启动会自动生成一个配置文件,集群节点的单独配置,保存跟每个服务器之间的相关通信信息
     cluster-node-timeout 15000  #超时时间
     cluster-require-full-coverage no  #用来设置什么情况下集群可以对外提供服务,yes表示只有集群节点都正常才提供对外服务,一般生产环境设置为no
     bind 192.168.48.101  #给当前服务绑定固定IP,即访问ip
     protected-mode no #关闭保护模式
    
  4. 同理,在该目录下创建redis_7001.conf
    1. 可以使用命令行:sed 's/7000/7001/g' redis_7000.conf > redis_7001.conf
    2. 将redis_7000.conf复制命名为redis_7001.conf,然后将redis_7001.conf文件中的7000更换为7001
    3. 然后分别创建redis_7002.conf、redis_7003.conf、redis_7004.conf、redis_7005.conf
    4. 这样conf文件夹下就有6个redis_700x.conf文件
  5. 启动服务

     cd ..  # 返回到redis安装目录下
     # 启动6个服务
     bin/redis-server conf/redis_7000.conf
     bin/redis-server conf/redis_7001.conf
     bin/redis-server conf/redis_7002.conf
     bin/redis-server conf/redis_7003.conf
     bin/redis-server conf/redis_7004.conf
     bin/redis-server conf/redis_7005.conf
    
  6. 查看状态
    1. 查看服务状态

       # 方式1 
       ps -ef |grep redis
       # 方式2
       netstat -ntlp
      
    2. 查看某个节点集群信息

       bin/redis-cli -p 7000 cluster info
      
      1. 会发现这个节点,没有连接其他节点
  7. 节点间通信
    1. 让这6个节点之间相互通信

       bin/redis-cli -h 192.168.48.101 -p 7000  cluster meet 192.168.48.101 7001
       bin/redis-cli -h 192.168.48.101 -p 7000  cluster meet 192.168.48.101 7002
       bin/redis-cli -h 192.168.48.101 -p 7000  cluster meet 192.168.48.101 7003
       bin/redis-cli -h 192.168.48.101 -p 7000  cluster meet 192.168.48.101 7004
       bin/redis-cli -h 192.168.48.101 -p 7000  cluster meet 192.168.48.101 7005
      
    2. 查看集群通信情况

       cluster nodes
      
  8. 连接某个节点:

     bin/redis-cli -c -h 192.168.48.101 -p 7000
    
    1. 通过 exit退出当前节点
  9. 分配slot卡槽
    1. 存储数据总共固定分为16383个(行业规定)卡槽,要给每个节点分配占用卡槽数,否则无法存储数据
    2. 编写分配卡槽脚本 addslots.sh
      1. cd bin 到redis安装目录的bin目录下
      2. vi addslots.sh ,下面复制进去
      3. 设置该文件的可执行权限:chmod 775 addslots.sh
       start=$1
       end=$2
       port=$3 
       for slot in `seq ${start} ${end}`
       do
               redis-cli -h 192.168.48.101 -p ${port} cluster addslots ${slot}
       done
      
      1. $1代表第一个参数
    3. 执行脚本
      1. 给3个主节点分配卡槽
       sh addslots.sh 0 5460 7000
       sh addslots.sh 5461 10922 7001
       sh addslots.sh 10923 16383 7002
      
  10. 配置从节点
    1. 给上面3个主节点配置从节点

       # 这里需要配置一个node节点的id信息
       redis-cli -h 192.168.48.101 -p 7003 cluster replicate ce9a3bf38f490594ea6c6a1c1916b56afd93dc1c
       redis-cli -h 192.168.48.101 -p 7004 cluster replicate 8f89b4cd3f4cd68d1fbb741dbfca638270ad9c0c
       redis-cli -h 192.168.48.101 -p 7005 cluster replicate c33fd209b4003cef50479a1cee4495111d078109
      
    2. 上面需要设置节点id,通过下面命令查所有节点信息

       cluster nodes
      
  11. 查看data下的文件目录,会发现自动生成了node_700x.conf配置文件
  12. redis5之后的集群搭建方式
    1. 直接从第5步之后,一行命令搞定后面所有操作,不需要节点间通信、分配卡槽、配置从节点

       # 无密码
       bin/redis-cli --cluster create 192.168.48.101:7000 192.168.48.101:7001 192.168.48.101:7002 192.168.48.101:7003 192.168.48.101:7004 192.168.48.101:7005 --cluster-replicas 1 
              
       # 有密码
       bin/redis-cli -a 123456 --cluster create ...
      
      1. 执行过程中需要输入yes,然后直接回车即可
    2. 命令行解释:

       --cluster-replicas 1 :后面的1表示主从比例1:1
       --cluster-replicas 2 :后面的1表示主从比例1:2
      
  13. 总结
    1. 以上已经搭建好集群了,如果某个主节点挂了,它的从节点会自动设置为主节点
    2. 从节点会自动从主节点同步数据
  14. 测试
    1. 登录

       # 有密码时的登录方式
       redis-cli -c -h 192.168.48.101 -p 7000 -a 123456
       # 无密码登录方式
       redis-cli -c -h 192.168.48.101 -p 7000
      
    2. 查看集群:

       cluster nodes 或cluster info
      
    3. 集群停止

      1. 查看redis进程 ps -ef|grep redis
      2. 关闭redis进程,使用kill -9 ${进程号},如果需要关闭多个进程,进程号之间空格隔开即可

         kill -9 7000 7001 7002 7003 7004 7005
        
      3. 也可执行以下命令来关闭redis进程

         pkill -9 redis
        

Java操作Redis

单机使用

  1. Jedis 基本使用
    1. 上面已经讲解,略(单个连接,使用后关闭)
  2. JedisPool连接池使用(本质是连接一个redis
    1. 单个连接,使用完关闭,还是比较浪费的,此时可以使用连接池,当前使用完不用功关闭,共享给别人使用
     @Test
     public void testPool() throws Exception {
         //1 创建连接池对象
         //连接池参数配置
         GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
         //最多连接数
         poolConfig.setMaxTotal(10);
         //最小空闲数
         poolConfig.setMinIdle(10);
         //最多空闲数
         poolConfig.setMaxIdle(10);
         //最多等待时间
         poolConfig.setMaxWaitMillis(10000);
         String host="192.168.48.101";
         int port=6379;
         //jedis连接池对象配置,构造参数可以传密码,如果连接池有密码
         JedisPool jedisPool = new JedisPool(poolConfig,host,port);
         //2 获取连接对象
         Jedis jedis=null;
         try {
             jedis = jedisPool.getResource();
             jedis.set("bb","1");
         } catch (Exception exception) {
             exception.printStackTrace();
         } finally {
             if(jedis!=null){
                 //不会真正关闭连接, 把连接对象归还到连接池
                 jedis.close();
             }
         }
     }
    

redis集群使用(重点!!!)

  1. 用户登录场景(详见代码login_app)
    1. 用户根据用户名和密码进行登录
    2. 优先在redis缓存中根据用户名查询用户信息
    3. 如果redis缓存中没有对应的用户信息, 那么从数据库查询用户信息, 并且返回对应的数据, 保存到redis缓存中
    4. 对于缓存到redis中的用户信息设置一个有效时间(7天)
  2. 导入pom依赖

     <!--1.导入redis相关的包-->
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-data-redis</artifactId>
     </dependency>
     <!--2. 连接池jar包-->
     <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-pool2</artifactId>
     </dependency>
    
  3. yml配置集群信息

     spring:
       redis:
         #集群模式
         cluster:
           #节点有哪些,多个之间用逗号分隔
           nodes: 192.168.48.101:7000,192.168.48.101:7001,192.168.48.101:7002,192.168.48.101:7003,192.168.48.101:7004,192.168.48.101:7005
         #连接池配置
         lettuce:
           pool:
             max-wait: 15000
             max-idle: 10
             max-active: 10
             min-idle: 10
    
  4. controller

     @Controller
     public class UserController {
        
         @Autowired
         private IUserService userService;
        
         @ResponseBody
         @RequestMapping("login")
         public PageResult login(User user) throws Exception {
             //到数据库中查询
             //User u = userService.queryByName(user.getUsername());
             //直接到缓存中查询
             User u = userService.queryByNameByCache(user.getUsername());
             if(user.getPassword()!=null&& u !=null && user.getPassword().equals(u.getPassword())){
                 return PageResult.success();
             }else{
                 return PageResult.mark("用户名或者密码错误");
             }
         }
     }
    
  5. service层

     public interface IUserService {
         User queryByName(String username);
         void insert(User user);
         User queryByNameByCache(String username) throws Exception ;
     }
     @Service
     public class UserServiceImpl implements IUserService {
        
         @Autowired
         private UserMapper userMapper;
        
         //专门用来获取集群对象,默认已经创建在容器中
         @Autowired
         private StringRedisTemplate redisTemplate;
        
         @Override
         public User queryByName(String username) {
             System.out.println("从数据库查询结果:"+username);
             return userMapper.queryByName(username);
         }
        
         @Override
         public void insert(User user) {
             userMapper.insert(user);
         }
            
         @Override
         public User queryByNameByCache(String username) throws Exception {
             System.out.println("从redis缓存中查询查询结果:"+username);
             //获取集群对象
             ValueOperations<String, String> ops = redisTemplate.opsForValue();
             String value = ops.get("user:" + username);
             ObjectMapper objectMapper = new ObjectMapper();
             User user=null;
             //判断redis的返回结果
             if(StringUtils.isEmpty(value)){
                 // 查询数据库
                 user= queryByName(username);
                 if(user!=null){//添加到缓存中,缓存7天
                     ops.set("user:"+username,objectMapper.writeValueAsString(user),7,TimeUnit.DAYS);
                 }else{ //单个key穿透,查询不到,缓存一个空对象,设置很小的缓存时间
                     ops.set("user:"+username,objectMapper.writeValueAsString(new User()),1,TimeUnit.MINUTES);
                 }
             }else{
                 //将String序列化为对象
                 user = objectMapper.readValue(value,User.class);
             }
             return user;
         }
     }
    
  6. mapper层

     @Mapper
     public interface UserMapper {
        
         @Select("select * from user where username = #{username}")
         public User queryByName(String username);
        
         @Insert("insert into user(username,password) values(#{username},#{password})")
         public void insert(User user);
     }
    

缓存常见问题(面试题,重要!!!)

  1. 穿透
    1. 概念:缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次都需要到数据库中去查询,造成缓存穿透
    2. 解决方案:
      1. 单个key穿透
        1. 持久层查询不到就缓存空结果,查询时先判断缓存中是否exists(key), 如果有直接返回空,没有则查询后返回对于空值设置一个很小的缓存过期时间
      2. 多个key穿透
        1. 先判断key是否是一个存在的key, 如果key是一个合法的key, 那么就查询数据, 如果是一个不合法的key, 那么就不在查询数据库
        2. 数据量不大, 可以把key 都存放到redis的set集合中
        3. 数据量非常大(上亿级别), 可以考虑使用布隆过滤器来完成
  2. 雪崩
    1. 概念: 如果缓存集中在一个时间段内失效,发生大量的缓存穿透,所有的查询都落到了数据库上,造成数据库服务器雪崩。
      1. 就是说有大量的key在同一时间失效,导致大量的查询直接到数据库了,导致数据库压力很大
    2. 解决方案
      1. 尽量让key的过期时间均匀分布
      2. 控制同一个key的线程数量(读取数据库的线程数量)–> 限流
        1. 常见的限流算法有: 计数算法, 滑动窗口,令牌桶,漏桶
  3. 热点key
    1. 某个key 的访问频率非常频繁,导致Redis服务器的压力剧增, 没法保证Redis可以正常提供服务
    2. 解决方案
      1. 因为是Redis服务器的压力导致瘫痪的,所以解决办法主要集中在提高Redis的qps和控制对Redis的访问
      2. 方式1: 扩容, 添加集群中的从服务器, 提高读数据的能力
      3. 方式2: 考虑使用本地缓存, 把redis中的热点数据直接缓存在本地的服务器的内存中, 减少对redis的一个访问
Table of Contents