irpas技术客

Redisson分布式锁_lisin-lee-cooper_redisson分布式锁

网络 2009

一.什么是锁

锁主要是用来实现资源共享同步,只有获取到了锁才能访问该同步代码,否则等待其他线程使用结束释放锁。

二.redis实现分布式锁主要步骤

1.指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value; 2.当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。 3.设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。 4.当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁 。

三.Redisson实现分布式锁原理

加锁是通过lua脚本实现

if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"

加锁

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 尝试获取锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); /** * 订阅锁释放事件,并通过 await 方法阻塞等待锁释放, ,一旦锁释放会发消息通知待等待的线程进行竞争. * 当 this.await 返回 false,等待时间已经超出获取锁最大等待时间,取消订阅并返回失败. * 当 this.await 返回 true,进入循环尝试获取锁. */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } /** * 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁 * 获取锁成功,则立马返回 true, * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环 */ while (true) { long currentTime = System.currentTimeMillis(); // 再次尝试获取锁 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 超过最大等待时间则返回 false 结束循环,获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } /** * 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息): */ currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //则就在wait time 时间范围内等待可以通过信号量 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } }

续锁

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。Watch Dog前提是没有设置锁释放时间

锁释放

protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 判断锁 key 是否存在 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除 // 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

1.删除锁(这里注意可重入锁,可重入锁重入数减1)。 2.广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。 3.取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redisson分布式锁 #key #作为锁标记存入 #redis #中指定一个 #唯一的用户标识