Skip to content

Redisson分布式锁

为什么要使用分布式锁

分布式锁是一种在分布式系统中用于协调多个节点对共享资源访问的机制。使用分布式锁的原因主要包括:

  • 避免数据不一致:在分布式系统中,多个节点可能同时对同一数据进行操作,如果没有适当的同步机制,可能会导致数据不一致。

  • 防止资源竞争:多个节点可能会同时请求同一个资源,如数据库记录、文件等,如果没有锁机制,可能会导致资源的并发访问冲突。

  • 实现任务的串行执行:在某些情况下,需要确保任务按照特定的顺序执行,分布式锁可以保证任务在分布式环境中的串行化。

  • 提高系统的可靠性:通过锁机制,可以防止系统在高并发情况下出现不可预测的错误,从而提高系统的稳定性和可靠性。

  • 实现限流和计数:分布式锁可以用来控制对某些资源的访问频率,实现限流功能,或者在分布式环境中进行计数。

  • 解决分布式事务问题:在分布式系统中,事务的一致性和隔离性更加难以保证,分布式锁可以作为实现分布式事务的一种手段。

Redisson 是一个基于 Java 的 Redis 客户端库,它提供了包括分布式锁在内的多种分布式数据结构和服务。Redisson 的分布式锁具有以下特点:

  • 高性能:利用 Redis 的高性能特性,Redisson 提供了快速的锁获取和释放操作。
  • 可重入:支持同一线程多次获取同一锁,确保代码的可重入性。
  • 公平锁:支持公平锁,按照请求锁的顺序来分配锁,避免饥饿问题。
  • 锁超时:可以设置锁的超时时间,防止死锁的发生。
  • 锁续期:在锁的持有期间,可以自动续期,以应对长时间运行的任务。
  • 锁的可中断获取:支持尝试获取锁,并且可以设置超时时间,避免长时间等待。

Redission原理

tryLock加锁机制

  • 线程去获取锁,获取成功则执行lua脚本,保存数据到redis数据库。
  • 如果获取失败: 一直通过while循环尝试获取锁(可自定义等待时间,超时后返回失败), 获取成功后,执行lua脚本,保存数据到redis数据库。
java
/**
 * @param leaseTime 锁的过期时间, 注意只有设置为小于0时才会让看门狗机制生效(默认30秒,不够2/3秒后执行续约)
 */
@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            // 进程已经抢到锁了
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
    }

使用lua脚本保证执行的原子性。这是一个可重入锁的实现方式,如果redis.call('exists', KEYS[1]) == 0就说明这把锁还没有被别的进程占用,设置key和等待时间进行加锁。相反, 被别的线程占有时会返回一个等待时间。

java
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

粤ICP备20009776号