irpas技术客

Redis分布式锁_飞翔荷兰号_redis分布式锁

网络投稿 4480

一、Redis锁 多个客户端,通过watch一个键-值,然后开启事务如果在开启事务的期间,watch的值没有被其他客户端修改过,则执行成功如果在开启事务的期间,watch的值被其他客户端修改了,则执行失败 set name erick set age 10 watch name age # 监控对应的属性 multi set address shanxi set year 2022 exec # 如果事务在执行期间,没有其他客户端去操作被watch的值,则A客户端的事物可以执行成功 # 如果想解除监控,unwatch # watch必须在开启事务之前 二、分布式锁 在集群模式下,synchronized只能保证单个JVM内部的线程互斥,不能保证跨JVM的互斥 1. 单个JVM

2. 多个JVM

3. 分布式锁 满足分布式系统或集群模式下多进程可见并互斥的锁 # 分布式锁特点 1. 多进程可见: 必须多个jvm都能去访问到该锁资源 2. 互斥: 锁资源必须是互斥 3. 高可用: 锁的稳定性要得到保证 4. 高性能: 加锁本来就会降低系统性能,如何保证 5. 安全性: 锁假如无法释放怎么办 三、Redis分布式锁 1. 基础版本 单线程保证一定只有一个线程来获取锁 # 场景一: 假如锁匙放失败怎么半? 1. 获取: SETNX k v 2. 执行业务 3. 释放锁 DEL k # 场景二: 1. 获取锁,并添加过期时间 SET K V EX 10 NX 2. 执行业务 3. 释放锁

package com.erick.redis; import redis.clients.jedis.Jedis; public class Demo01 { public static final String LOCK_NAME = "LOCK"; public static final String LOCK_VALUE = "ERICK"; public static final int EXPIRE_SECS = 5; private static Jedis getJedis() { return new Jedis("60.205.229.31", 6381); } public static void main(String[] args) throws InterruptedException { new Thread(() -> secondLock()).start(); new Thread(() -> secondLock()).start(); } /*场景一: 假如释放锁失败,则后面永远无法执行*/ public static void firstLock() { //1.上锁 Jedis redis = getJedis(); Long lockResult = redis.setnx(LOCK_NAME, LOCK_VALUE); if (1 == lockResult) { // 2. 执行业务 executeBusiness(); // 3. 释放锁 redis.del(LOCK_NAME); } else { // 获取锁失败 System.out.println("Can not get lock"); } } /*场景二: 释放锁失败,通过自动过期来保证*/ public static void secondLock() { Jedis redis = getJedis(); String lockResult = redis.set(LOCK_NAME, LOCK_VALUE, "NX", "EX", EXPIRE_SECS); if ("OK".equalsIgnoreCase(lockResult)) { executeBusiness(); redis.del(LOCK_NAME); } else { System.out.println("Can not get lock"); } } private static void executeBusiness() { System.out.println("Business execution....."); } } 2. 增强版本 上面分布式锁存在问题: 误删,删已失效 解决方法一:设置超时时间远大于业务执行时间,但是会带来性能问题 解决方法二:删除锁的时候要判断,是不是自己的,如果是再删除 UUID 1. 其中key可以用业务名称来表示 2. value用uuid来表示 2.1 删除锁时,先通过value来判断锁是不是自己线程的 2.2 如果是,则删除,如果不是,就不要删除 package com.erick.redis; import redis.clients.jedis.Jedis; import java.util.UUID; public class Demo11 { private static Jedis getJedis() { return new Jedis("60.205.229.31", 6381); } private static String getLockValue() { return UUID.randomUUID().toString(); } private static final String LOCK_KEY = "LOCK"; private static final int EXPIRE_SECS = 5; public static void main(String[] args) { new Thread(() -> firstMethod(LOCK_KEY, getLockValue())).start(); } private static void firstMethod(String lockKey, String lockValue) { Jedis redis = getJedis(); String lockResult = redis.set(lockKey, lockValue, "NX", "EX", EXPIRE_SECS); if ("OK".equalsIgnoreCase(lockResult)) { executeBusiness(); String presentValue = redis.get(lockKey); /*判断是否是自己的,是自己的再删除*/ if (lockValue.equalsIgnoreCase(presentValue)) { redis.del(lockKey); System.out.println("lock deleted"); } } else { System.out.println("Can not get lock"); } } private static void executeBusiness() { System.out.println("Business execution....."); } } 3. Lua脚本 3.1 存在的问题 判断锁是否能释放,和锁真正释放的代码中间,假如存在full gc,那么就会依然出现问题 判断锁是否该释放锁和释放锁,应该做成一个原子性的动作但是redis的事务机制不是强一致性 3.2 Lua脚本 Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令的原子性 # 1. Redis内部函数 redis.call('命令名称','key','其他参数', ......) # 2. 无参数, 0代表key的参数 EVAL "return redis.call('set','name','erick')" 0 # 3. 带参数 EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 age 20 KEYS[1]: redis的key值个数 ARGV[1]: redis的value的值个数 1: 具体包含几个key age: 实际传递的key值 20: 实际传递的value值 获取流程 -- 获取锁中的线程标示,动态传递参数 local keyName = redis.call('get',KEYS[1]) -- 比较线程标示与锁中的是否一直 if (ARGV[1] == keyName) then -- 释放锁 redis.call('del',KEYS[1]) return 1 -- 如果不一致,则返回结果为0 else return 0 end private static boolean deleteLockIfMy(Jedis redis, String lockKey, String lockValue) { /*用lua脚本来保证*/ String luaScript = "-- 获取锁中的线程标示,动态传递参数\n" + "local keyName = redis.call('get',KEYS[1])\n" + "\n" + "-- 比较线程标示与锁中的是否一直\n" + "if (keyName == ARGV[1]) then\n" + " -- 释放锁\n" + " redis.call('del',KEYS[1])\n" + " return 1\n" + " -- 如果不一致,则返回结果为0\n" + "else\n" + " return 0\n" + "end"; /*加载脚本*/ String script = redis.scriptLoad(luaScript); /*向脚本中传递参数*/ Object delResult = redis.evalsha(script, Arrays.asList(lockKey), Arrays.asList(lockValue)); /*上面的结果是Long类型*/ if (delResult.equals(1L)) { return true; } else { return false; } } 4. 存在的问题 某些业务场景,需要对锁有更高的要求极端情况下出现的问题

四、Redisson 一个用来进行分布式锁的工具类org.redisson:redisson:3.16.8 0. 入门案例 package com.erick.redis; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; public class Demo03 { private static final String LOCK_KEY = "COMMERCE-BUSINESS"; /*Redisson的配置类*/ private static RedissonClient redissonClient() { Config config = new Config(); /* Redis 单节点*/ config.useSingleServer().setAddress("redis://60.205.229.31:6381"); return Redisson.create(config); } private static void executeBusiness() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Business executing....."); } public static void main(String[] args) { new Thread(() -> lockMethodWithRetry()).start(); new Thread(() -> lockMethodWithRetry()).start(); } /*基本使用*/ private static void lockMethod() { RedissonClient redissonClient = redissonClient(); /* RLock extends Lock*/ RLock lock = redissonClient.getLock(LOCK_KEY); /*可重入锁: 默认超时时间喂30s*/ if (lock.tryLock()) { try { executeBusiness(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("Lock Released"); } } else { System.out.println("Can not get lock"); } } /*等待超时的锁*/ private static void lockMethodWithRetry() { RedissonClient redissonClient = redissonClient(); /*获取对应的key的锁*/ RLock lock = redissonClient.getLock(LOCK_KEY); // 内部包含 重试机制,通过Redis的发布订阅者模式来实现 /* 参数一:最长等待时间,超时则不再等待 * 参数二:锁超时释放时间 * 参数三:时间单位 */ boolean hasLok = false; try { hasLok = lock.tryLock(6, 20, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if (hasLok) { try { executeBusiness(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("Lock Released"); } } else { System.out.println("Can not get lock"); } } } 1. 可重入 1.1 不可重入锁

1.2 可重入锁 存储的键值对用Hash结构来保存为了保证多条命令的原子性,必须采取lua脚本来做

1.3 Lua脚本

2. 重试机制 通过等待时间结合,发布以及订阅模式来实现不会立即触发重试机制,而是订阅当前锁的使用者发布的消息 3. 锁超时释放 业务执行期间,不断有定时任务去更新过期时间业务执行完毕后,取消定时任务

4 . 主从一致性 4.1 主节点宕机

4.2 联锁 设立多个redis作为主节点只有每个都获取成功的时候,才会去执行 package com.erick.redis; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; public class Test04 { public static void main(String[] args) { businessWithLock(); } private static void businessWithLock() { String lockKey = "BUSINESS"; RedissonClient firstClient = redissonClient01(); RedissonClient secondClient = redissonClient02(); RedissonClient thirdClient = redissonClient03(); RLock firstLock = firstClient.getLock(lockKey); RLock secondLock = secondClient.getLock(lockKey); RLock thirdLock = thirdClient.getLock(lockKey); /*获取到多把锁*/ RLock multiLock = firstClient.getMultiLock(firstLock, secondLock, thirdLock); boolean hasLock = multiLock.tryLock(); try{ if (hasLock) { business(); } else { System.out.println("未获取到锁,业务没有执行"); } }finally { multiLock.unlock(); } } private static void business() { System.out.println("执行业务"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } /*Redis的配置类*/ private static RedissonClient redissonClient01() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6379"); return Redisson.create(config); } private static RedissonClient redissonClient02() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6380"); return Redisson.create(config); } private static RedissonClient redissonClient03() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6381"); return Redisson.create(config); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Redis分布式锁 #Redis锁基础版本获取锁 #1