基于redis的分布式锁实现

NoSQL专家
• 阅读 72533

基于redis的分布式锁实现

随着业务越来越复杂,应用服务都会朝着分布式、集群方向部署,而分布式CAP原则告诉我们,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。

很多场景中,需要使用分布式事务、分布式锁等技术来保证数据最终一致性。有的时候,我们需要保证某一方法同一时刻只能被一个线程执行。
在单机(单进程)环境中,JAVA提供了很多并发相关API,但在多机(多进程)环境中就无能为力了。

对于分布式锁,最好能够满足以下几点

可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好

针对分布式锁,目前有以下几种实现方案(From: http://www.hollischuang.com/a...

基于数据库实现分布式锁
基于缓存实现分布式锁
基于zookeeper实现分布式锁

对于第一种(基于数据库)及第三种(基于zookeeper)的实现方式可以参考博文http://www.hollischuang.com/a...,本篇文章介绍如何基于redis实现分布式锁

首先奉上源码 https://github.com/manerfan/m...

分布式同步锁实现

实现思路

锁的实现主要基于redis的SETNX命令(SETNX详细解释参考这里),我们来看SETNX的解释

SETNX key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

返回值:
设置成功,返回 1 。
设置失败,返回 0 。

使用SETNX完成同步锁的流程及事项如下:

  1. 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间
  3. 释放锁,使用DEL命令将锁数据删除

实现过程

创建同步锁实现类

/**
 * 同步锁
 *
 * @property key Redis key
 * @property stringRedisTemplate RedisTemplate
 * @property expire Redis TTL/秒
 * @property safetyTime 安全时间/秒
 */
class SyncLock(
        private val key: String,
        private val stringRedisTemplate: StringRedisTemplate,
        private val expire: Long,
        private val safetyTime: Long
)

key reids中的key,对应java api synchronized的对象
expire reids中key的过期时间
safetyTime 下文介绍其作用

实现锁的获取功能

private val value: String get() = Thread.currentThread().name

/**
 * 尝试获取锁(立即返回)
 *
 * @return 是否获取成功
 */
fun tryLock(): Boolean {
    val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false
    if (locked) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    }
    return locked
}

这里使用setIfAbsent函数(对应SETNX命令)尝试设置key的值为value(当前线程id+线程名),若成功则同时设置key的过期时间并返回true,否则返回false

实现带超时时间的锁获取功能

private val waitMillisPer: Long = 10

/**
 * 尝试获取锁,并至多等待timeout时长
 *
 * @param timeout 超时时长
 * @param unit 时间单位
 *
 * @return 是否获取成功
 */
fun tryLock(timeout: Long, unit: TimeUnit): Boolean {
    val waitMax = unit.toMillis(timeout)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    if (waitAlready < waitMax) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
        return true
    }
    return false
}

这里使用while循环不断尝试锁的获取,并至多尝试timeout时长,在timeout时间内若成功则同时设置key的过期时间并返回true,否则返回false

其实以上两种tryLock函数还是有一种可能便是,在调用setIfAbsent后、调用expire之前若服务出现异常,也将导致该锁(key)无法释放(过期或删除),使得其他线程/进程再无法获取锁而进入死循环,为了避免此问题的产生,我们引入了safetyTime
该参数的作用为,从获取锁开始直到safetyTime时长,若仍未获取成功则认为某一线程/进程出现异常导致数据不正确,此时强制获取,其实现如下

实现带保护功能的锁获取功能

/**
 * 获取锁
 */
fun lock() {
    val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    // stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)
}

这里同样使用while循环不断尝试锁的获取,但至多等待safetyTime时长,最终不论是否成功,均使用SETEX 命令将key设置为当前先线程对应的value,并同时设置该key的过期时间

实现锁的释放功能

/**
 * 释放锁
 */
fun unLock() {
    stringRedisTemplate.opsForValue()[key]?.let {
        if (it == value) {
            stringRedisTemplate.delete(key)
        }
    }
}

锁的释放使用DEL命令删除key,但需要注意的是,释放锁时只能释放本线程持有的锁
若expire设置不合理,如expire设置为10秒,结果在获取锁后线程运行了20秒,该锁有可能已经被其他线程强制获取,即该key代表的锁已经不是当前线程所持有的锁,此时便不能冒然删除该key,而只能释放本线程持有的锁。


集成spring boot

为了更好的与spring集成,我们创建一个工厂类来辅助创建同步锁实例

/**
 * SyncLock同步锁工厂类
 */
@Component
class SyncLockFactory {
    @Autowired
    private lateinit var stringRedisTemplate: StringRedisTemplate

    private val syncLockMap = mutableMapOf<String, SyncLock>()

    /**
     * 创建SyncLock
     *
     * @param key Redis key
     * @param expire Redis TTL/秒,默认10秒
     * @param safetyTime 安全时间/秒,为了防止程序异常导致死锁,在此时间后强制拿锁,默认 expire * 5 秒
     */
    @Synchronized
    fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock {
        if (!syncLockMap.containsKey(key)) {
            syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime)
        }
        return syncLockMap[key]!!
    }
}

在spring框架下可以更方便的使用

@Component
class SomeLogic: InitializingBean {
  @Autowired
  lateinit var syncLockFactory: SyncLockFactory
  
  lateinit var syncLock

  override fun afterPropertiesSet() {
    syncLock = syncLockFactory.build("lock:some:name", 10)
  }

  fun someFun() {
    syncLock.lock()
    try {
      // some logic
    } finally {
      syncLock.unlock()
    }
  }
}

注解的实现

借助spring aop框架,我们可以将SyncLock的使用进一步简化

创建注解类

/**
 * 同步锁注解
 *
 * @property key Redis key
 * @property expire Redis TTL/秒,默认10秒
 */
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class SyncLockable(
        val key: String,
        val expire: Long = 10
)

实现AOP

/**
 * 同步锁注解处理
 */
@Aspect
@Component
class SyncLockHandle {
    @Autowired
    private lateinit var syncLockFactory: SyncLockFactory

    /**
     * 在方法上执行同步锁
     */
    @Around("@annotation(syncLockable)")
    fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? {
        val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)

        try {
            lock.lock()
            return jp.proceed()
        } finally {
            lock.unLock()
        }
    }
}

如此一来,我们便可以按照如下方式使用SyncLock

@Component
class SomeLogic {
    @SyncLockable("lock:some:name", 10)
    fun someFun() {
        // some logic
    }
}

是不是显得更加方便!


基于redis的分布式锁实现

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Stella981 Stella981
4年前
Redis分布式锁的正确实现方式
前言分布式锁一般有三种实现方式:1.数据库乐观锁;2.基于Redis的分布式锁;3.基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。
Stella981 Stella981
4年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
4年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Easter79 Easter79
4年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
图解Redis和Zookeeper分布式锁 | 京东云技术团队
使用Redis还是Zookeeper来实现分布式锁,最终还是要基于业务来决定,可以参考以下两种情况:(1)如果业务并发量很大,Redis分布式锁高效的读写性能更能支持高并发(2)如果业务要求锁的强一致性,那么使用Zookeeper可能是更好的选择
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这