分布式锁 — Redisson 全面解析!

前言

分布式锁主要是解决集群,分布式下数据一致性的问题。在单机的环境下,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatileReentrantLocksynchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。

分布式锁的实现主要有以下方式:

  • 基于数据库

  • 基于分布式协调系统

  • 基于缓存

    • 基于redis命令。如:setnx等操作

    • 基于redis Lua脚本能力(本文介绍的实现方式 redisson)

一、Redisson的使用

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例

引入maven依赖

<!-- maven版本号根据项目版本自行调整 -->
<!--redis-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.0</version>
</dependency>
<!--使用redisson作为分布式锁-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.8</version>
</dependency>

yml配置

spring:redis:# Redis数据库索引(默认为0)database: 0# Redis服务器地址host: 127.0.0.1# Redis服务器连接端口port: 6379# Redis服务器链接密码(默认为空)password:jedis:pool:# 连接池最大链接数(负值表示没有限制)max-active: 20# 连接池最大阻塞等待时间(负值表示没有限制)max-wait: -1# 链接池中最大空闲链接max-idle: 10# 连接池中最小空闲链接min-idle: 0# 链接超市时间(毫秒)timeout: 1000

Redisson配置

@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String redisHost;@Value("${spring.redis.password}")private String redisPassword;@Value("${spring.redis.port}")private String port;@Bean@ConditionalOnMissingBeanpublic RedissonClient redissonClient() {Config config = new Config();//单机模式  依次设置redis地址和密码System.out.println(redisHost);// config.useSingleServer().setAddress("redis://" + redisHost + ":" + port).setPassword(redisPassword);// 没有配置redis密码config.useSingleServer().setAddress("redis://" + redisHost + ":" + port);return Redisson.create(config);}
}

测试用例

package com.example.aopdemo;import com.example.aopdemo.springbootaopdemo.SpringBootDemoxzApplication;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** @ClassName RedissonTest* @Description Redisson测试用例* @Author 阿Q* @Date 2022/11/26*/
@Slf4j
@SpringBootTest(classes = SpringBootDemoxzApplication.class)
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;@Resourceprivate ThreadPoolTaskExecutor executor;// redisson分布式锁的keyprivate static final String LOCK_TEST_KEY = "redisson:lock:test";int n = 500;/*** 分布式锁测试用例*/@Testpublic void lockTest() {// 利用 循环+多线程 模仿高并发请求for (int i = 0; i < 10; i++) {executor.execute(() -> {// 这里获取公平锁,遵循先进先出原则,方便测试RLock fairLock = redissonClient.getFairLock(LOCK_TEST_KEY);try {// 尝试加锁// waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败// leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)boolean lock = fairLock.tryLock(3000, 30, TimeUnit.MILLISECONDS);if (lock){log.info("线程:" + Thread.currentThread().getName() + "获得了锁");log.info("剩余数量:{}", --n);}} catch (InterruptedException e) {e.printStackTrace();} finally {log.info("线程:" + Thread.currentThread().getName() + "准备释放锁");// 注意,无论出现任何情况,都要主动解锁fairLock.unlock();}});}try {// ->_-> 这里使当前方法占用的线程休息10秒,不要立即结束Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}
}

二、Redisson源码分析

redisson这个框架的实现依赖了Lua脚本和Netty,以及各种Future及FutureListener的异步、同步操作转换,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能

简单了解Lua脚本(Redisson 版本 3.16.8)

想要真正读懂redisson底层的加锁解锁实现,基本的lua脚本还是要了解一下的,这里作者就不深入了(毕竟我也是门外汉),大家有兴趣的可以去深入一下,这里只针对锁的实现来解释一下。另外,搜索公众号Linux就该这样学后台回复“猴子”,获取一份惊喜礼包。

加锁脚本
  • KEYS[1] 锁的名字

  • ARGV[1] 锁自动失效时间(毫秒,默认30s(看门狗续期时长))

  • ARGV[2] hash子项的key(uuid+threadId)

--如果锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
--重入次数初始为0后加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--设锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表加锁成功
return nil;
--结束符
end;
--如果加锁的进程已存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--重入次数加一
redis.call('hincrby', KEYS[1], ARGV[2], 1);
--更新锁的过期时间(毫秒)
redis.call('pexpire', KEYS[1], ARGV[1]);
--返回null-代表重入成功
return nil;
--结束符
end;
--返回锁的剩余时间(毫秒)-代表加锁失败
return redis.call('pttl', KEYS[1]);

图片

结论:当且仅当返回nil,才表示加锁成功;

解锁脚本

  • KEYS[1] 锁的名字

  • KEYS[2] 发布订阅的channel=redisson_lock__channel:{lock_name}

  • ARGV[1] 发布订阅中解锁消息=0

  • ARGV[2] 看门狗续期时间

  • ARGV[3] hash子项的key=uuid+threadId

--如果锁不存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
--返回null-代表解锁成功
return nil;
end;
--重入次数减一
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
--如果重入次数不为0,对锁进行续期(使用看门狗的续期时间,默认续期30s)
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
--返回0-代表锁的重入次数减一,解锁成功
return 0;
--否则重入次数<=0
else
--删除key
redis.call('del', KEYS[1]);
--向channel中发布删除key的消息
redis.call('publish', KEYS[2], ARGV[1]);
--返回1-代表锁被删除,解锁成功
return 1;
end;
return nil;

图片

结论:当且仅当返回1,才表示当前请求真正解锁;

看门口续期lua脚本

io.netty.util.TimerTask每10秒执行一次(30(续期时间)/3)

  • KEYS[1] 锁的名字

  • ARGV[1]

--自己加的锁存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
--续期
redis.call('pexpire', KEYS[1], ARGV[1]);
--1代表续期成功
return 1;
end;
--自己加的锁不存在,后续不需要再续期
return 0;

源码鉴赏

加锁逻辑

// tryLock 是Redisson加锁的核心代码,在这里,我们基本可以了解加锁的整个逻辑流程
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 获取锁能容忍的最大等待时长long time = unit.toMillis(waitTime);// 获取当前系统时间 - 节点一long current = System.currentTimeMillis();// 获取当前线程idlong threadId = Thread.currentThread().getId();// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;}// 剩余等待时长 =   最大等待时长-(当前时间-节点一)time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();// 【核心点2】订阅解锁消息CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {// 以阻塞的方式获取订阅结果,最大等待时常timesubscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);} catch (ExecutionException | TimeoutException e) {// 判断异步任务是否不存在if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 异步任务出现异常,取消订阅if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {// 剩余等待时长time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 循环获取锁      while (true) {long currentTime = System.currentTimeMillis();// 再次获取锁,成功则返回ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}currentTime = System.currentTimeMillis();// 【核心点3】阻塞等待信号量唤醒或者超时,接收到订阅时唤醒// 使用的是Semaphore#tryAcquire()// 判断 锁的占有时间(ttl)是否小于等待时间  if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因为是同步操作,所以无论加锁成功或失败,都取消订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}
}

接下来,我们再一起看一下 tryAcquire()

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// get方法就是以线程阻塞的方式获取结果,这里不再展示,有兴趣的朋友可以自行查看源码return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
/*** 异步的方式尝试获取锁*/
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// 占有时间等于 -1 表示会一直持有锁,直到业务进行完成,主动解锁(这里就显示出了finally的重要性)if (leaseTime != -1) {// 【核心点4】这里就是直接使用lua脚本ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}
/*** 兄弟们看到这里应该就恍然大悟了吧,redisson最底层就是lua脚本的直接调用* 这里就不作说明了,上面的lua脚本分析已经说的很明白了*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

解锁逻辑

/**
* 解锁逻辑
*/
@Override
public void unlock() {try {// 相信看来上面的逻辑,兄弟们应该已经很熟悉redisson作者的编码习惯了,照例get还是以线程阻塞的方式获取结果get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}
}
// 异步解锁
@Override
public RFuture<Void> unlockAsync(long threadId) {// 调用异步解锁方法// unlockInnerAsync() 是抽象方法,所有的实现方法都是直接调用lua脚本,这里不做展示,感兴趣的同学可以自行查看// 调用真正的实现取决于上面我们创建什么样的lock对象        RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/588308.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

某眼实时票房接口获取

某眼实时票房接口获取 前言解决方案1.找到veri.js2.找到signKey所在位置3.分析它所处的这个函数的内容4.index参数的获取5.signKey参数的获取运行结果关键代码另一种思路票房接口:https://piaofang.maoyan.com/dashboard-ajax https://piaofang.maoyan.com/dashboard 实时票房…

每日一题(leetcode169):多数元素-哈希、随机、分治

哈希&#xff1a; class Solution { public:int majorityElement(vector<int>& nums) {int lennums.size();unordered_map<int,int> map;for (int i0;i<len;i){if(map.find(nums[i])map.end()){map[nums[i]]1;}else{map[nums[i]];}}int seqlen/2;int ansnu…

物联网行业中,我们如何选择数据库?

在当今数字化潮流中&#xff0c;我们面对的不仅是海量数据&#xff0c;更是时间的涟漪。从生产线的传感器到金融市场的交易记录&#xff0c;时间序列数据成为了理解事物演变和趋势的关键。在面对这样庞大而动态的数据流时&#xff0c;我们需要深入了解一种强大的工具——时序数…

1.Git是用来干嘛的

本文章学习于【GeekHour】一小时Git教程&#xff0c;来自bilibili Git就是一个文件管理系统&#xff0c;这样说吧&#xff0c;当多个人同时在操作一个文件的同时&#xff0c;很容易造成紊乱&#xff0c;git就是保证文件不紊乱产生的 包括集中式管理系统和分布式管理系统 听懂…

Vue3使用组件的计算属性代替v-for和v-if

Vue.js3组件的方法-CSDN博客 使用Vue3组件的计算属性-CSDN博客 Vue3组件计算属性的get和set方法-CSDN博客 Vue3组件计算属性的缓存-CSDN博客 在业务逻辑处理中&#xff0c;一般会使用v-for指令渲染列表的内容&#xff0c;有时也会使用v-if指令的条件判断过滤列表中不满足条…

剑指Offer题目笔记24(集合的组合、排序)

面试题79&#xff1a; 问题&#xff1a; ​ 输入一个不含重复数字的数据集合&#xff0c;找出它的所有子集。 解决方案&#xff1a; ​ 使用回溯法。子集就是从一个集合中选出若干元素。如果集合中包含n个元素&#xff0c;那么生成子集可以分为n步&#xff0c;每一步从集合中…

托管式 Kubernetes 服务,加速现代化云基础设施升级

降本提效&#xff0c;是创新开发的永恒话题。过去10年中&#xff0c;开发者纷纷拥抱容器技术以提高部署效率&#xff0c;降低运维负担。随着像 Docker 这类容器引擎使用量的不断增长&#xff0c;作为 Docker 管理系统的 Kubernetes&#xff08;简称 K8s&#xff09;顺势而出&am…

【Node.js从基础到高级运用】二十一、使用child_process模块创建子进程

引言 在Node.js中&#xff0c;child_process模块是一个提供了创建和管理子进程的能力的核心模块。通过使用child_process模块&#xff0c;Node.js可以执行系统命令、运行其他脚本或应用程序&#xff0c;实现与Node.js进程的并行处理。 child_process模块提供了几种创建子进程的…

Ant Design Vue中的table与pagination的联合使用

效果&#xff1a; 代码&#xff1a; <a-table:dataSource"dataSource":columns"columns":pagination"pagination"change"handleTableChange":scroll"{ x: 100%, y: 600 }"> </a-table> export default defin…

Lua 和 Love 2d 教程 二十一点朴克牌 (上篇lua源码)

GitCode - 开发者的代码家园 Lua版完整原码 规则 庄家和玩家各发两张牌。庄家的第一张牌对玩家是隐藏的。 玩家可以拿牌&#xff08;即拿另一张牌&#xff09;或 停牌&#xff08;即停止拿牌&#xff09;。 如果玩家手牌的总价值超过 21&#xff0c;那么他们就爆掉了。 面牌…

30道Java经典面试题总结

1、JDK 和 JRE 有什么区别&#xff1f; JDK&#xff08;Java Development Kit&#xff09;&#xff0c;Java 开发工具包 JRE&#xff08;Java Runtime Environment&#xff09;&#xff0c;Java 运行环境 JDK 中包含 JRE&#xff0c;JDK 中有一个名为 jre 的目录&#xff0c…

MyBatis 解决上篇的参数绑定问题以及XML方式交互

前言 上文:MyBatis 初识简单操作-CSDN博客 上篇文章我们谈到的Spring中如何使用注解对Mysql进行交互 但是我们发现我们返回出来的数据明显有问题 我们发现后面三个字段的信息明显没有展示出来 下面我们来谈谈解决方案 解决方案 这里的原因本质上是因为mysql中和对象中的字段属性…

社交互动:探讨Facebook对用户互动的影响

在当今数字化时代&#xff0c;社交网络已经成为了人们日常生活中不可或缺的一部分。而作为最著名的社交网络平台之一&#xff0c;Facebook不仅连接了全球数十亿用户&#xff0c;还对用户的社交互动产生了深远的影响。本文将深入探讨Facebook对用户互动的影响&#xff0c;以及它…

C刊级 | Matlab实现GWO-BiTCN-BiGRU-Attention灰狼算法优化双向时间卷积双向门控循环单元融合注意力机制多变量回归预测

C刊级 | Matlab实现GWO-BiTCN-BiGRU-Attention灰狼算法优化双向时间卷积双向门控循环单元融合注意力机制多变量回归预测 目录 C刊级 | Matlab实现GWO-BiTCN-BiGRU-Attention灰狼算法优化双向时间卷积双向门控循环单元融合注意力机制多变量回归预测效果一览基本介绍程序设计参考…

HarmonyOS入门-ArkTS学习(一)

1. 什么是ArkTS语言 学习之前&#xff0c;我们先初步了解下什么是ArkTS 官方指南这样介绍&#xff1a; ArkTS是TS的超集&#xff0c;ArkTS定义了声明式UI描述、自定义组件和动态扩展UI元素的能力&#xff0c;再配合ArkUI开发框架中的系统组件及其相关的事件方法、属性方法等共…

矩阵空间秩1矩阵小世界图

文章目录 1. 矩阵空间2. 微分方程3. 秩为1的矩阵4. 图 1. 矩阵空间 我们以3X3的矩阵空间 M 为例来说明相关情况。目前矩阵空间M中只关心两类计算&#xff0c;矩阵加法和矩阵数乘。 对称矩阵-子空间-有6个3X3的对称矩阵&#xff0c;所以为6维矩阵空间上三角矩阵-子空间-有6个3…

使用docker-tc对host容器进行限流

docker-tc是一个github开源项目&#xff0c;项目地址是https://github.com/lukaszlach/docker-tc。 运行docker-tc docker run -d \ --name docker-tc \ --network host \ --cap-add NET_ADMIN \ --restart always \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var…

STM32-03基于HAL库(CubeMX+MDK+Proteus)输入检测案例(按键控制LED)

文章目录 一、功能需求分析二、Proteus绘制电路原理图三、STMCubeMX 配置引脚及模式&#xff0c;生成代码四、MDK打开生成项目&#xff0c;编写HAL库的按键检测代码五、运行仿真程序&#xff0c;调试代码 一、功能需求分析 搭建完成开发STM32开发环境之后&#xff0c;开始GPIO…

备战蓝桥杯---刷杂题1

1.来个小定理&#xff08;上次DP的青蛙过河用过&#xff09; 事实上&#xff0c;假如他们的gcd&#xff01;1,那么P,q都可以表示成gcd的倍数&#xff0c;因此假如一个数不是gcd的倍数就不可以表示&#xff0c;若互质由裴蜀定理大于一定时一定可以表示出。 事实上为&#xff08…

解密AI人工智能的整体分层架构:探索智能科技的未来之路

随着人工智能技术的迅猛发展&#xff0c;AI已经渗透到我们生活的方方面面。而支撑AI人工智能系统运作的核心是其整体分层架构。本文将深入探讨AI人工智能的整体分层架构&#xff0c;揭示其中的奥秘&#xff0c;探索智能科技的未来之路。 ### AI人工智能整体分层架构的重要性 …