Redisson分布式锁的可重入、重试、续约机制原理

2024-07-13 1358阅读

1. Redisson介绍

基于Redis的setnx实现的分布式锁存在下面的问题:

  • 重入问题:重入问题是指获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,就会导致死锁。所以可重入锁他的主要意义是防止死锁,Java中synchronized和Lock锁都是可重入的。
  • 不可重试:获取锁失败的话不会自动重新尝试获取锁
  • 超时释放:使用setnx实现的分布式锁通过添加过期时间可以防止死锁,但是如果线程堵塞的时间过长,会导致锁超时释放,可能会导致安全隐患。
  • 主从一致性:如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

    Redisson分布式锁的可重入、重试、续约机制原理

    Redisson的概念:

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

    Redisson分布式锁的可重入、重试、续约机制原理

    2. Redisson可重入锁的原理

    在Lock锁中,底层有一个被voaltile修饰的state变量来记录重入的次数,如果当前没有人持有锁,则state=0,当有人获取到锁时,state设置为1,如果持有锁的人再次获取到同一把锁,则state的计数会加1。如果锁被释放了一次,则state的计数减1,直到state=0说明锁已经全部释放掉,无人持有。synchronized的底层逻辑也是类似。

    在Redisson中,使用一个hash结构来存储锁,其中key表示该锁是否存在,field标识线程的持有者,value为锁的重入次数。

    在RLock.tryLock()方法中判断锁的lua表达式如下:

    "if (redis.call('exists', KEYS[1]) == 0) then " +   -- 判断锁是否已经存在
      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  -- 如果锁不存在  创建一把新的锁并设置重入次数为1
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +     -- 设置锁的过期时间
      "return nil; " +
    "end; " +  
    -- 如果锁已经存在 继续判断这把锁的field和自身的线程标识是否一致
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  -- 锁的重入次数+1
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +     -- 刷新锁的超时时间
      "return nil; " +
    "end; " +
    -- 锁的field与自身线程标识不一致,说明锁已经被别人持有 返回锁的剩余时间
    "return redis.call('pttl', KEYS[1]);"
    

    其中的三个参数含义如下:

    • KEYS[1]:锁的名称
    • ARGV[1]:设置的锁的过期时间 默认为30秒
    • ARGV[2]:id + “:” + threadId, 锁的field

      Redisson分布式锁的可重入、重试、续约机制原理

      3. Redisson重试的原理

      redisson在尝试获取锁的时候,如果传了时间参数,就不会在获取锁失败时立即返回失败,而是会进行重试。

      Redisson分布式锁的可重入、重试、续约机制原理

      • waitTime:锁的最大重试时间
      • leaseTime:锁的释放时间,默认为-1, 如果设置为-1,会触发看门狗机制,每过internalLockLeaseTime / 3 秒会续期,将锁设置为internalLockLeaseTime秒过期, internalLockLeaseTime默认为30,可通过setLockWatchdogTimeout()方法自定义
      • unit:时间单位
        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            // 尝试获取锁 获取失败会返回锁的ttl 成功返回null
            Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired  获取锁成功 直接返回 无需重试
            if (ttl == null) {
                return true;
            }
            // 获取锁失败判断一下设置的等待时间是否还有剩余
            time -= System.currentTimeMillis() - current;
            // 剩余时间小于0 则说明等待超时 不需要再重试 直接返回获取锁失败
            if (time 
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            
            current = System.currentTimeMillis();
            // 订阅拿到锁的线程,该线程释放锁后会发布通知,其他锁得到消息就可以开始抢锁
            RFuture
                // 如果time时间耗尽还未等到锁释放的消息 则尝试取消任务
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) - {
                        if (e == null) {
                            // 取消任务失败则取消订阅任务
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                // 返回获取锁失败的消息
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            try {
                // 走到这里说明在超时时间内等到了锁释放的信号
                // 判断设定的等待时间是否还有剩余
                time -= System.currentTimeMillis() - current;
                if (time 
                    // 等待时间已经耗尽 直接返回获取锁失败的结果
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
                // 循环尝试获取锁
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    // 尝试获取锁 获取失败会返回锁的ttl 成功返回null
                    ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        // 获取到锁 直接返回
                        return true;
                    }
                    // 获取锁失败 再次判断等待时间是否还有剩余
                    time -= System.currentTimeMillis() - currentTime;
                    if (time 
                        // 等待时间已经耗尽 直接返回获取锁失败的结果
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
                    // 等待时间还有剩余 继续尝试获取锁
                    // waiting for message
                    currentTime = System.currentTimeMillis();
                    if (ttl = 0 && ttl  
         
         

        ​ 在成功获取锁后,通过异步操作定期更新锁的超时时间,确保锁在使用期间不会过期。通过 scheduleExpirationRenewal方法调度续约任务,而 renewExpiration 方法负责执行异步续约操作。递归调用 renewExpiration在每次续约成功后继续下一次续约。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]