Redisson分布式锁的可重入、重试、续约机制原理
1. Redisson介绍
基于Redis的setnx实现的分布式锁存在下面的问题:
- 重入问题:重入问题是指获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,就会导致死锁。所以可重入锁他的主要意义是防止死锁,Java中synchronized和Lock锁都是可重入的。
- 不可重试:获取锁失败的话不会自动重新尝试获取锁
- 超时释放:使用setnx实现的分布式锁通过添加过期时间可以防止死锁,但是如果线程堵塞的时间过长,会导致锁超时释放,可能会导致安全隐患。
- 主从一致性:如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
Redisson的概念:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
2. Redisson可重入锁的原理
在Lock锁中,底层有一个被voaltile修饰的state变量来记录重入的次数,如果当前没有人持有锁,则state=0,当有人获取到锁时,state设置为1,如果持有锁的人再次获取到同一把锁,则state的计数会加1。如果锁被释放了一次,则state的计数减1,直到state=0说明锁已经全部释放掉,无人持有。synchronized的底层逻辑也是类似。
在Redisson中,使用一个hash结构来存储锁,其中key表示该锁是否存在,field标识线程的持有者,value为锁的重入次数。
在RLock.tryLock()方法中判断锁的lua表达式如下:
"if (redis.call('exists', KEYS[1]) == 0) then " + -- 判断锁是否已经存在 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + -- 如果锁不存在 创建一把新的锁并设置重入次数为1 "redis.call('pexpire', KEYS[1], ARGV[1]); " + -- 设置锁的过期时间 "return nil; " + "end; " + -- 如果锁已经存在 继续判断这把锁的field和自身的线程标识是否一致 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + -- 锁的重入次数+1 "redis.call('pexpire', KEYS[1], ARGV[1]); " + -- 刷新锁的超时时间 "return nil; " + "end; " + -- 锁的field与自身线程标识不一致,说明锁已经被别人持有 返回锁的剩余时间 "return redis.call('pttl', KEYS[1]);"其中的三个参数含义如下:
- KEYS[1]:锁的名称
- ARGV[1]:设置的锁的过期时间 默认为30秒
- ARGV[2]:id + “:” + threadId, 锁的field
3. Redisson重试的原理
redisson在尝试获取锁的时候,如果传了时间参数,就不会在获取锁失败时立即返回失败,而是会进行重试。
- waitTime:锁的最大重试时间
- leaseTime:锁的释放时间,默认为-1, 如果设置为-1,会触发看门狗机制,每过internalLockLeaseTime / 3 秒会续期,将锁设置为internalLockLeaseTime秒过期, internalLockLeaseTime默认为30,可通过setLockWatchdogTimeout()方法自定义
- unit:时间单位
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 尝试获取锁 获取失败会返回锁的ttl 成功返回null Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired 获取锁成功 直接返回 无需重试 if (ttl == null) { return true; } // 获取锁失败判断一下设置的等待时间是否还有剩余 time -= System.currentTimeMillis() - current; // 剩余时间小于0 则说明等待超时 不需要再重试 直接返回获取锁失败 if (time acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); // 订阅拿到锁的线程,该线程释放锁后会发布通知,其他锁得到消息就可以开始抢锁 RFuture // 如果time时间耗尽还未等到锁释放的消息 则尝试取消任务 if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) - { if (e == null) { // 取消任务失败则取消订阅任务 unsubscribe(subscribeFuture, threadId); } }); } // 返回获取锁失败的消息 acquireFailed(waitTime, unit, threadId); return false; } try { // 走到这里说明在超时时间内等到了锁释放的信号 // 判断设定的等待时间是否还有剩余 time -= System.currentTimeMillis() - current; if (time // 等待时间已经耗尽 直接返回获取锁失败的结果 acquireFailed(waitTime, unit, threadId); return false; } // 循环尝试获取锁 while (true) { long currentTime = System.currentTimeMillis(); // 尝试获取锁 获取失败会返回锁的ttl 成功返回null ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { // 获取到锁 直接返回 return true; } // 获取锁失败 再次判断等待时间是否还有剩余 time -= System.currentTimeMillis() - currentTime; if (time // 等待时间已经耗尽 直接返回获取锁失败的结果 acquireFailed(waitTime, unit, threadId); return false; } // 等待时间还有剩余 继续尝试获取锁 // waiting for message currentTime = System.currentTimeMillis(); if (ttl = 0 && ttl 在成功获取锁后,通过异步操作定期更新锁的超时时间,确保锁在使用期间不会过期。通过 scheduleExpirationRenewal方法调度续约任务,而 renewExpiration 方法负责执行异步续约操作。递归调用 renewExpiration在每次续约成功后继续下一次续约。
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!




