Java分布式锁之数据库实现

Wesley13
• 阅读 484

之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。以下详细讲解了数据库的交易同步锁和交易重试补偿锁的实现。

一、数据库的设计

数据库锁表的表结构如下:

 
field type comment
ID bigint 主键
OUTER\_SERIAL\_NO varchar 流水号
CUST\_NO char 客户号
SOURCE\_CODE varchar 锁操作
THREAD\_NO varchar 线程号
STATUS char 锁状态
REMARK varchar 备注
CREATED\_AT timestamp 创建时间
UPDATED\_AT timestamp 更新时间

作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

客户号:客户的唯一标识

锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

线程号:当前操作线程的线程号,比如取当前线程的uuid

锁状态:P处理中,F失败,Y成功

二、代码设计

代码的目录结构如下: 

Java分布式锁之数据库实现

主要贴一下锁操作的核心代码实现:

锁接口定义:DbLockManager.java

Java分布式锁之数据库实现 Java分布式锁之数据库实现 View Code

锁接口实现类:DbLockManagerImpl.java

Java分布式锁之数据库实现 Java分布式锁之数据库实现
/\*\* \* \* 数据库锁实现<br> \* \* @author fugaoyang \* \*/ @Service public class DbLockManagerImpl implements DbLockManager {

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">final</span> Logger LOG = LoggerFactory.getLogger(<span style="color: #0000ff">this</span><span style="color: #000000">.getClass());

@Autowired
</span><span style="color: #0000ff">private</span><span style="color: #000000"> DbSyncLockMapper lockMapper;

@Transactional
</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> lock(String outerSerialNo, String custNo, LockSource source) {

    </span><span style="color: #0000ff">boolean</span> isLock = <span style="color: #0000ff">false</span><span style="color: #000000">;
    TradeSyncLock lock </span>= <span style="color: #0000ff">null</span><span style="color: #000000">;
    </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
        lock </span>=<span style="color: #000000"> lockMapper.find(outerSerialNo, custNo, source.getCode());

        </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">null</span> ==<span style="color: #000000"> lock) {
            lock </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> TradeSyncLock();
            createLock(lock, outerSerialNo, custNo, source);

            </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.insert(lock);
            </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) {
                isLock </span>= <span style="color: #0000ff">true</span><span style="color: #000000">;
            }

            LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "加入锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode());
            </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock;
        }

        </span><span style="color: #008000">//</span><span style="color: #008000"> 根据交易类型进行加锁</span>
        isLock =<span style="color: #000000"> switchSynsLock(lock, source);
        LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "更新锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode());

    } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) {
        LOG.error(ThreadLogUtils.getLogPrefix() </span>+ "交易加锁异常, 客户号:" +<span style="color: #000000"> custNo, e);
    }
    </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock;
}

@Transactional
</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) {

    </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
        TradeSyncLock lock </span>=<span style="color: #000000"> lockMapper.find(outerSerialNo, custNo, source.getCode());

        </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">null</span> !=<span style="color: #000000"> lock) {
            lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
                    ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
        }

        LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "释放锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode());
    } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) {
        LOG.error(ThreadLogUtils.getLogPrefix() </span>+ "释放锁异常, 客户号:{}"<span style="color: #000000">, custNo, e);
    }
}

</span><span style="color: #008000">/**</span><span style="color: #008000">
 * 匹配加锁
 </span><span style="color: #008000">*/</span>
<span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> switchSynsLock(TradeSyncLock lock, LockSource source) {
    </span><span style="color: #0000ff">boolean</span> isLock = <span style="color: #0000ff">false</span><span style="color: #000000">;

    </span><span style="color: #0000ff">switch</span><span style="color: #000000"> (source) {
    </span><span style="color: #0000ff">case</span><span style="color: #000000"> WITHDRAW:
        ;
        isLock </span>=<span style="color: #000000"> tradeSynsLock(lock);
        </span><span style="color: #0000ff">break</span><span style="color: #000000">;
    </span><span style="color: #0000ff">case</span><span style="color: #000000"> WITHDRAW_RETRY:
        ;
        isLock </span>=<span style="color: #000000"> retrySynsLock(lock);
        </span><span style="color: #0000ff">break</span><span style="color: #000000">;
    </span><span style="color: #0000ff">default</span><span style="color: #000000">:
        ;
    }
    </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock;
}

</span><span style="color: #008000">/**</span><span style="color: #008000">
 * 交易同步锁
 </span><span style="color: #008000">*/</span>
<span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> tradeSynsLock(TradeSyncLock lock) {
    </span><span style="color: #008000">//</span><span style="color: #008000"> 处理中的不加锁,即不执行交易操作</span>
    <span style="color: #0000ff">if</span><span style="color: #000000"> (LockStatus.P.getName().equals(lock.getStatus())) {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">;
    }

    </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
            ThreadLogUtils.getCurrThreadUuid(), </span><span style="color: #0000ff">null</span><span style="color: #000000">);
    </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">;
    }
    </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">;
}

</span><span style="color: #008000">/**</span><span style="color: #008000">
 * 补偿同步锁
 </span><span style="color: #008000">*/</span>
<span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> retrySynsLock(TradeSyncLock lock) {
    </span><span style="color: #008000">//</span><span style="color: #008000"> 处理中或处理完成的不加锁,即不执行补偿操作</span>
    <span style="color: #0000ff">if</span> (LockStatus.P.getName().equals(lock.getStatus()) ||<span style="color: #000000"> LockStatus.S.getName().equals(lock.getStatus())) {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">;
    }

    </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
            ThreadLogUtils.getCurrThreadUuid(), </span><span style="color: #0000ff">null</span><span style="color: #000000">);
    </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">;
    }
    </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">;
}

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span><span style="color: #000000"> createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
    lock.setOuterSerialNo(outerSerialNo);
    lock.setCustNo(custNo);
    lock.setSourceCode(source.getCode());
    lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
    lock.setStatus(LockStatus.P.getName());
    lock.setRemark(source.getDesc());
}

}

View Code

获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

Java分布式锁之数据库实现 Java分布式锁之数据库实现
/\*\* \* \* 线程处理<br> \* \* @author fugaoyang \* \*/ public class ThreadLogUtils {

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLogUtils instance = <span style="color: #0000ff">null</span><span style="color: #000000">;

</span><span style="color: #0000ff">private</span><span style="color: #000000"> ThreadLogUtils() {
    setInstance(</span><span style="color: #0000ff">this</span><span style="color: #000000">);
}

</span><span style="color: #008000">//</span><span style="color: #008000"> 初始化标志</span>
<span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">final</span> Object __noop = <span style="color: #0000ff">new</span><span style="color: #000000"> Object();
</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal&lt;Object&gt; __flag = <span style="color: #0000ff">new</span> InheritableThreadLocal&lt;Object&gt;<span style="color: #000000">() {
    @Override
    </span><span style="color: #0000ff">protected</span><span style="color: #000000"> Object initialValue() {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">null</span><span style="color: #000000">;
    }
};

</span><span style="color: #008000">//</span><span style="color: #008000"> 当前线程的UUID信息,主要用于打印日志;</span>
<span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal&lt;String&gt; currLogUuid = <span style="color: #0000ff">new</span> InheritableThreadLocal&lt;String&gt;<span style="color: #000000">() {
    @Override
    </span><span style="color: #0000ff">protected</span><span style="color: #000000"> String initialValue() {
        </span><span style="color: #0000ff">return</span> UUID.randomUUID().toString()<span style="color: #008000">/*</span><span style="color: #008000"> .toUpperCase() </span><span style="color: #008000">*/</span><span style="color: #000000">;
    }
};

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal&lt;String&gt; currThreadUuid = <span style="color: #0000ff">new</span> ThreadLocal&lt;String&gt;<span style="color: #000000">() {
    @Override
    </span><span style="color: #0000ff">protected</span><span style="color: #000000"> String initialValue() {
        </span><span style="color: #0000ff">return</span><span style="color: #000000"> UUIDGenerator.getUuid();
    }
};

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> clear(Boolean isNew) {
    </span><span style="color: #0000ff">if</span><span style="color: #000000"> (isNew) {

        currLogUuid.remove();

        __flag.remove();

        currThreadUuid.remove();

    }
}

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getCurrLogUuid() {
    </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">isInitialized()) {
        </span><span style="color: #0000ff">throw</span> <span style="color: #0000ff">new</span> IllegalStateException("TLS未初始化"<span style="color: #000000">);
    }

    </span><span style="color: #0000ff">return</span><span style="color: #000000"> currLogUuid.get();
}

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getCurrThreadUuid() {
    </span><span style="color: #0000ff">return</span><span style="color: #000000"> currThreadUuid.get();
}

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> clearCurrThreadUuid() {
    currThreadUuid.remove();
}

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getLogPrefix() {
    </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">isInitialized()) {
        </span><span style="color: #0000ff">return</span> ""<span style="color: #000000">;
    }

    </span><span style="color: #0000ff">return</span> "&lt;uuid=" + getCurrLogUuid() + "&gt;"<span style="color: #000000">;
}

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> isInitialized() {
    </span><span style="color: #0000ff">return</span> __flag.get() != <span style="color: #0000ff">null</span><span style="color: #000000">;
}

</span><span style="color: #008000">/**</span><span style="color: #008000">
 * 初始化上下文,如果已经初始化则返回false,否则返回true&lt;br/&gt;
 *
 * </span><span style="color: #808080">@return</span>
 <span style="color: #008000">*/</span>
<span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> initialize() {
    </span><span style="color: #0000ff">if</span><span style="color: #000000"> (isInitialized()) {
        </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">;
    }

    __flag.set(__noop);
    </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">;
}

</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> setInstance(ThreadLogUtils instance) {
    ThreadLogUtils.instance </span>=<span style="color: #000000"> instance;
}

</span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> ThreadLogUtils getInstance() {
    </span><span style="color: #0000ff">return</span><span style="color: #000000"> instance;
}

}

View Code

两种锁的实现的大致思路如下:

1.交易同步锁

当一个客户来取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

2.交易重试补偿锁

1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

后续可以优化的部分:

1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

 

因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

完整示例代码后续会更新到github。

 

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
2年前
Redis分布式锁的正确实现方式
前言分布式锁一般有三种实现方式:1.数据库乐观锁;2.基于Redis的分布式锁;3.基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。
Stella981 Stella981
2年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
京东云开发者 京东云开发者
5个月前
线上SQL超时场景分析-MySQL超时之间隙锁 | 京东物流技术团队
前言之前遇到过一个由MySQL间隙锁引发线上sql执行超时的场景,记录一下。背景说明分布式事务消息表:业务上使用消息表的方式,依赖本地事务,实现了一套分布式事务方案消息表名:mqmessages数据量:3000多万索引:createtime和statuss
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这