Redis缓存穿透、缓存雪崩、并发问题分析与解决方案

Stella981
• 阅读 280

(一)缓存和数据库间数据一致性问题

分布式环境下(单机就不用说了)非常容易出现缓存和数据库间的数据一致性问题,针对这一点的话,只能说,如果你的项目对缓存的要求是强一致性的,那么请不要使用缓存。我们只能采取合适的策略来降低缓存和数据库间数据不一致的概率,而无法保证两者间的强一致性。合适的策略包括 合适的缓存更新策略,更新数据库后要及时更新缓存、缓存失败时增加重试机制,例如MQ模式的消息队列。

(二)缓存击穿问题

缓存击穿表示恶意用户模拟请求很多缓存中不存在的数据,由于缓存中都没有,导致这些请求短时间内直接落在了数据库上,导致数据库异常。这个我们在实际项目就遇到了,有些抢购活动、秒杀活动的接口API被大量的恶意用户刷,导致短时间内数据库宕机了,好在数据库是多主多从的,hold住了。

解决方案的话:

1、使用互斥锁排队

业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。

一、加锁操作

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

private static final String LOCKED_SUCCESS = "OK";
private static final String NX = "NX";
private static final String EXPIRE_TIME = "PX";
 
/**
* 获取锁
*
* @param jedis   redis客户端
* @param lockKey    锁的key
* @param uniqueId   请求标识
* @param expireTime 过期时间
* @return 是否获取锁
*/
public static boolean tryDistributedLock(Jedis jedis, String lockKey, String uniqueId, long expireTime) {
    String result = jedis.set(lockKey, uniqueId, NX, EXPIRE_TIME, expireTime);
    return LOCKED_SUCCESS.equals(result);
}

在低版本的redis中是没有这个set方法的,至于为什么这个简单的set方法能够保证前面提到的分布式锁原则呢?看下这个set的源码参数

/**
* Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1
* GB).
* @param key  唯一标识key
* @param value   存储的值value
* @param nxxx  可选项:NX、XX  其中NX表示当key不存在时才set值,XX表示当key存在时才set值
* @param expx  过期时间单位,可选项:EX|PX 其中EX为seconds,PX为milliseconds
* @param time  过期时间,单位取上一个参数
* @return Status code reply
*/
public String set(final String key, final String value, final String nxxx, final String expx,
    final long time) {
  checkIsInMultiOrPipeline();
  client.set(key, value, nxxx, expx, time);
  return client.getStatusCodeReply();
}

如上的tryDistributedLock就可以实现简单的redis分布式锁了(此set方法的原子性)

1、set方法中nxx参数为NX,表示当key不存在时才会set值,保证了互斥性;

2、set值的同时设置过期时间(过期后del此key),客户端宕机或网络延迟时不会一直持有锁,避免了死锁发生;

3、set方法中的value,比如UUID之类的,用来表示当前请求客户端的唯一性标识;

4、因为是redis单例,暂时没有考虑容错性;

常见的错误分布式加锁实现

private static final Long RELEASE_SUCCESS = 1L;
 
/**
* 释放锁
*
* @param jedis     redis客户端
* @param lockKey   锁的key
* @param uniqueId 请求标识
* @return 是否释放
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String uniqueId) {
    String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(luaScript, Collections.singletonList(lockKey), Collections.singletonList(uniqueId));
    return RELEASE_SUCCESS.equals(result);
}

二、解锁操作

private static final Long RELEASE_SUCCESS = 1L;
 
/**
* 释放锁
*
* @param jedis     redis客户端
* @param lockKey   锁的key
* @param uniqueId 请求标识
* @return 是否释放
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String uniqueId) {
    String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(luaScript, Collections.singletonList(lockKey), Collections.singletonList(uniqueId));
    return RELEASE_SUCCESS.equals(result);
}

不难看出使用Lua脚本告诉redis,如果这个key存在,且其存储的值和指定的value一致才可以删除这个key,从而释放锁,这样也保证了分布式锁的几个原则. 常见的错误释放锁会直接del这个key,没有考虑当前锁的拥有者,不符合分布式锁原则的有始有终原则;

如果不想用上面的Lua脚本,也可以用如下代码:

public static void releaseLock(Jedis jedis,String lockKey,String uniqueId){
      if(uniqueId.equals(jedis.get(lockKey))){
          jedis.del(lockKey);
      }
}

public String getWithLock( String key, Jedis jedis, String lockKey, String uniqueId, long expireTime )
{
    /* 通过key获取value */
    String value = redisService.get( key );
    if ( StringUtil.isEmpty( value ) )
    {
        /*
         * 分布式锁,详细可以参考https://blog.csdn.net/fanrenxiang/article/details/79803037
         * 封装的tryDistributedLock包括setnx和expire两个功能,在低版本的redis中不支持
         */
        try {
            boolean locked = redisService.tryDistributedLock( jedis, lockKey, uniqueId, expireTime );
            if ( locked )
            {
                value = userService.getById( key );
                redisService.set( key, value );
                redisService.del( lockKey );
                return(value);
            } else {
                /* 其它线程进来了没获取到锁便等待50ms后重试 */
                Thread.sleep( 50 );
                getWithLock( key, jedis, lockKey, uniqueId, expireTime );
            }
        } catch ( Exception e ) {
            log.error( "getWithLock exception=" + e );
            return(value);
        } finally {
            redisService.releaseDistributedLock( jedis, lockKey, uniqueId );
        }
    }
    return(value);
}

这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。

2、布隆过滤器(推荐)

bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小,下面先来简单的实现下看看效果,我这里用guava实现的布隆过滤器:

<dependencies >
 < dependency >
 < groupId > com.google.guava</ groupId>
 < artifactId > guava</ artifactId>
 < version > 23.0 < / version >
 < / dependency >
 < / dependencies >
 public class BloomFilterTest {
     private static final int capacity    = 1000000;
     private static final int key        = 999998;
     private static BloomFilter<Integer> bloomFilter = BloomFilter.create( Funnels.integerFunnel(), capacity );
     static {
         for ( int i = 0; i < capacity; i++ )
         {
             bloomFilter.put( i );
         }
     }
     public static void main( String[] args )
     {
 /*返回计算机最精确的时间,单位微妙*/
         long start = System.nanoTime();
         if ( bloomFilter.mightContain( key ) )
         {
             System.out.println( "成功过滤到" + key );
         }
         long end = System.nanoTime();
         System.out.println( "布隆过滤器消耗时间:" + (end - start) );
         int sum = 0;
         for ( int i = capacity + 20000; i < capacity + 30000; i++ )
         {
             if ( bloomFilter.mightContain( i ) )
             {
                 sum = sum + 1;
             }
         }
         System.out.println( "错判率为:" + sum );
     }
 }
 成 功过滤到999998
 布 隆过滤器消 耗 时间 : 215518
 错 判率 为 : 318

可以看到,100w个数据中只消耗了约0.2毫秒就匹配到了key,速度足够快。然后模拟了1w个不存在于布隆过滤器中的key,匹配错误率为318/10000,也就是说,出错率大概为3%,跟踪下BloomFilter的源码发现默认的容错率就是0.03:

public static < T > BloomFilter<T> create( Funnel<T> funnel, int expectedInsertions /* n */ )
{
    return(create( funnel, expectedInsertions, 0.03 ) ); /* FYI, for 3%, we always get 5 hash functions */
}

要注意的是,布隆过滤器不支持删除操作。用在这边解决缓存穿透问题就是:

public String getByKey( String key )
{
    /* 通过key获取value */
    String value = redisService.get( key );
    if ( StringUtil.isEmpty( value ) )
    {
        if ( bloomFilter.mightContain( key ) )
        {
            value = userService.getById( key );
            redisService.set( key, value );
            return(value);
        } else {
            return(null);
        }
    }
    return(value);
}

(三)缓存雪崩问题

缓存在同一时间内大量键过期(失效),接着来的一大波请求瞬间都落在了数据库中导致连接异常。

解决方案:

1、也是像解决缓存穿透一样加锁排队,实现同上;

2、建立备份缓存,缓存A和缓存B,A设置超时时间,B不设值超时时间,先从A读缓存,A没有读B,并且更新A缓存和B缓存;

public String getByKey( String keyA, String keyB )
{
    String value = redisService.get( keyA );
    if ( StringUtil.isEmpty( value ) )
    {
        value = redisService.get( keyB );
        String newValue = getFromDbById();
        redisService.set( keyA, newValue, 31, TimeUnit.DAYS );
        redisService.set( keyB, newValue );
    }
    return(value);
}
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这