分布式锁(Redis)
Redis实战篇 | Kyle's Blog (cyborg2077.github.io)
目录
基本原理
实现分布式锁
获取锁和释放锁
Redis分布式锁误删
情况说明 1
问题解决(uuid判别)
情况说明2
问题解决(Redis的Lua脚本)
实现
分布式锁优化-Redisson
Redisson
Redisson入门
基本原理
-
分布式锁:满足分布式系统或集群模式下多线程可见并且可以互斥的锁
-
分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
-
那么分布式锁应该满足一些什么条件呢?
- 可见性:多个线程都能看到相同的结果。
注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
- 互斥:互斥是分布式锁的最基本条件,使得程序串行执行
- 高可用:程序不易崩溃,时时刻刻都保证较高的可用性
- 高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能
- 安全性:安全也是程序中必不可少的一环
实现分布式锁
获取锁和释放锁
- 核心思路
- 我们利用redis的SETNX方法,当有多个线程进入时,我们就利用该方法来获取锁。第一个线程进入时,redis 中就有这个key了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁(返回了0)的线程,等待一定时间之后重试
实现分布式锁时需要实现两个基本方法
- 获取锁
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- 添加锁和添加锁过期时间需要同时完成(避免添加锁结束了,redis宕机导致添加锁过期时间失败)
2.释放锁
- 手动释放
- 超时释放(如果redis宕机没来得及手动释放锁):获取锁的时候添加一个超时时间
接口
public interface ILock { /** * 尝试获取锁(非阻塞,不会重试获取锁) * @param timeoutSec 锁超时时间 * @return true表示获取锁成功,false表示获取锁失败 */ boolean tryLock(Long timeoutSec); /** * 释放锁 */ void unlock(); }实现类
public class SimpleRedisLock implements ILock { // 锁的前缀 private static final String KEY_PREFIX = "lock:"; // 具体业务名称,将前缀和业务名称拼接起来当作Key private String name; //这里不是@Autowired注入,采用构造器注入,在创建SimpleRedisLock对象时,将RedisTemplate作为参数传入 private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } // 获取锁 @Override public boolean tryLock(Long timeoutSec) { // 获取线程id long threadId = Thread.currentThread().getId(); // 获取锁, 使用setnx方法进行加锁, 同时设置过期时间, 防止死锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, java.util.concurrent.TimeUnit.SECONDS); //自动拆箱可能出现空指针异常,这样写更稳妥 return Boolean.TRUE.equals(success); } @Override public void unlock() { //通过DEL方法删除锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }修改业务代码
@Override public Result seckillVoucher(Long voucherId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper(); //1. 查询优惠券 queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId); SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper); //2. 判断秒杀时间是否开始 if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) { return Result.fail("秒杀还未开始,请耐心等待"); } //3. 判断秒杀时间是否结束 if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) { return Result.fail("秒杀已经结束!"); } //4. 判断库存是否充足 if (seckillVoucher.getStock()Redis分布式锁误删
情况说明 1
TTL过期时间比业务处理时间短导致分布式锁误删。
- 逻辑说明
- 持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
- 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
- 但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
- 那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况
- 解决方案就是在每个线程释放锁的时候,都判断一下这个锁是不是自己的,如果不属于自己,则不进行删除操作。
- 假设还是上面的情况,线程1阻塞,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1阻塞完了,继续往下执行,开始删除锁,但是线程1发现这把锁不是自己的,所以不进行删除锁的逻辑,当线程2执行到删除锁的逻辑时,如果TTL还未到期,则判断当前这把锁是自己的,于是删除这把锁
问题解决(uuid判别)
(判断我要释放的锁是不是我获取到的锁)
- 需求:修改之前的分布式锁实现
- 满足:在获取锁的时候存入线程标识(用UUID标识,在一个JVM中,ThreadId一般不会重复,但是我们现在是集群模式,有多个JVM,多个JVM之间可能会出现ThreadId重复的情况),在释放锁的时候先获取锁的线程标识,判断是否与当前线程标识一致
- 如果一致则释放锁
- 如果不一致则不释放锁
- 核心逻辑:在存入锁的时候,放入自己的线程标识,在删除锁的时候,判断当前这把锁是不是自己存入的
- 如果是,则进行删除
- 如果不是,则不进行删除
实现类
public class SimpleRedisLock implements ILock { // 锁的前缀 private static final String KEY_PREFIX = "lock:"; // 具体业务名称,将前缀和业务名称拼接起来当作Key private String name; //这里不是@Autowired注入,采用构造器注入,在创建SimpleRedisLock对象时,将RedisTemplate作为参数传入 private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; // 尝试获取锁 @Override public boolean tryLock(Long timeoutSec) { // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } // 释放锁 @Override public void unlock() { // 获取当前线程的标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标识是否一致 if (threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } } }情况说明2
判断锁标识是自己到释放锁的中间线程阻塞了,导致线程1认为锁还是自己的所以误删。
- 假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制)
- 于是锁的TTL到期了,自动释放了
- 那么现在线程2趁虚而入,拿到了一把锁
- 但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑
- 但是在阻塞前线程1已经判断了标识一致,所以现在线程1把线程2的锁给删了
- 那么就相当于判断标识那行代码没有起到作用
- 这就是删锁时的原子性问题
- 因为线程1的拿锁,判断标识,删锁,不是原子操作,所以我们要防止刚刚的情况
问题解决(Redis的Lua脚本) - Redis内置了Lua脚本功能,通过调用函数,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性(保障多条Redis命令能同时完成,不被阻塞打断)。
- Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:Lua 教程 | 菜鸟教程
- 这里重点介绍Redis提供的调用函数,我们可以使用Lua去操作Redis,而且还能保证它的原子性,这样就可以实现拿锁,判断标识,删锁是一个原子性动作了
Redis提供的调用函数语法如下:
例如我们要执行set name Kyle,则脚本是这样
redis.call('set', 'name', 'Kyle')('命令名称','key','其他参数', ...)例如我我们要执行set name David,在执行get name,则脚本如下
## 先执行set name David redis.call('set', 'name', 'David') ## 再执行get name local name = redis.call('get', 'name') ## 返回 return name写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要调用redis.call('set', 'name', 'Kyle') 0这个脚本,语法如下
EVAL "return redis.call('set', 'name', 'Kyle')" 0如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数
注意:在Lua中,数组下标从1开始
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Lucy- 那现在我们来使用Lua脚本来代替我们释放锁的逻辑
原逻辑:
@Override public void unlock() { // 获取当前线程的标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标识是否一致 if (threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }Lua脚本现在是写死了的,我们可以通过传参的方式来变成动态的Lua脚本
Lua脚本1:
-- 线程标识 local threadId = "UUID-31" -- 锁的key local key = "lock:order:userId" -- 获取锁中线程标识 local id = redis.call('get', key) -- 比较线程标识与锁的标识是否一致 if (threadId == id) then -- 一致则释放锁 del key return redis.call('del', key) end return 0简化Lua脚本1得Lua脚本2:
-- 这里的KEYS[1]就是传入锁的key -- 这里的ARGV[1]就是线程标识 -- 比较锁中的线程标识与线程标识是否一致 if (redis.call('get', KEYS[1]) == ARGV[1]) then -- 一致则释放锁 return redis.call('del', KEYS[1]) end return 0实现
新建一个unlock.lua脚本文件,将Lua脚本2写入:
修改释放锁的代码为:
private static final DefaultRedisScript UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public void unlock() { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }在RedisTemplate中,可以利用execute方法去执行lua脚本
public T execute(RedisScript script, List keys, Object... args) { return this.scriptExecutor.execute(script, keys, args); }分布式锁优化-Redisson
基于setnx实现的分布式锁(刚才的分布式锁)存在以下问题:
- 重入问题
- 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
- 不可重试
- 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
- 超时释放
- 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患
- 主从一致性
- 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
Redisson
- Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
- Redis提供了分布式锁的多种多样功能
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore)
- 可过期性信号量(PermitExpirableSemaphore)
- 闭锁(CountDownLatch)
Redisson入门
1.依赖
org.redisson redisson 3.13.62.配置Redisson客户端,在config包下新建RedissonConfig类
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://101.XXX.XXX.160:6379") .setPassword("root"); return Redisson.create(config); } }3.使用Redisson的分布式锁
@Resource private RedissonClient redissonClient; @Test void testRedisson() throws InterruptedException { //获取可重入锁 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位 boolean success = lock.tryLock(1,10, TimeUnit.SECONDS); //判断获取锁成功 if (success) { try { System.out.println("执行业务"); } finally { //释放锁 lock.unlock(); } } }
- Redis提供了分布式锁的多种多样功能
- 重入问题
- 那现在我们来使用Lua脚本来代替我们释放锁的逻辑
- 这里重点介绍Redis提供的调用函数,我们可以使用Lua去操作Redis,而且还能保证它的原子性,这样就可以实现拿锁,判断标识,删锁是一个原子性动作了
- Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:Lua 教程 | 菜鸟教程
- Redis内置了Lua脚本功能,通过调用函数,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性(保障多条Redis命令能同时完成,不被阻塞打断)。
- 逻辑说明
-







