irpas技术客

Redis、ZooKeeper实现分布式锁_宇智波波奶茶_redis zookeeper分布式锁

未知 7024

1分布式锁

锁:就是对代码块背后的资源进行锁定,在同一时间段只允许有一个线程访问修改

常用的线程安全机制:

1、sychronized ?? jvm 自带的锁, 可以重入,没有超时时间,不能被外部释放 ? jdk 8对sychronized? 优化,性能更加友好 ? 线程间通信? sychronized? 和 wait() notify() 配合使用 2、lock(ReentrantLock可重入锁) lock 是基于java 代码实现的乐观锁 ,底层用到cas + aqs ? 可以重入,由超时机制,可以被外部释放 使用起来更加灵活,可以避免死锁 线程间通信 使用 Condition? 中的 await() signal() 两个程序配合使用? 3、cas 1.读取a 得值(旧值) 2.比较并替换 ? cas(旧值,要修改的值) ? ??? 如果旧值 和 此时此刻 的值 相等 ,则将 要修改的值 赋值给该变量a 3.修改变量a? 的值 为要修改的值 ? 如果旧值和当前值不相等,说明被其他线程修改过 ? 原子类: 线程的安全的自增 自减? 底层就是 cas 机制 AutomicInteger AutomicFloat Automic* ??? 4、ThreadLocal ThreadLocal 就是线程副本,会为每一个线程对该变量存储一个副本,每个线程都访问自己的变量 ?

5、volatile:不可以保证线程安全 只能保证 数据的可见性 ,禁止指令重排序

接下来咱们在一个例子中体会分布式锁

2.例子

秒杀例子

秒杀商品:

1.判断库存是否足够

2.库存充裕 则

生成订单,扣减库存

3.返回结构,生成订单

? ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; ? import javax.xml.ws.soap.Addressing; ? @RestController public class GoodsController { ? ? ? @Autowired ? private StringRedisTemplate redisTemplate; ? ? /** ? ? * 初始化 库存 订单 ? ? * ? ? * 将库存和 存储到 redis ? ? * @param goods ? ? * @return ? ? */ ? @RequestMapping("/init") ? public String init(String goods){ ? ? ? ? // 初始化库存 ? ? ? redisTemplate.opsForValue().set("stack_"+goods, 1000+""); ? ? ? ? // 初始化订单 ? ? ? redisTemplate.opsForValue().set("order_"+goods, 0+""); ? ? ? ? ? return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+ ? ? ? ? ? ? ? "--订单:"+ redisTemplate.opsForValue().get("order_"+goods); ? } ? ? ? /** ? ? * 秒杀商品 ? ? * @param goods ? ? * @return ? ? */ ? @RequestMapping("/killGoods") ? public synchronized String killGoods(String goods){ ? ? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods); ? ? ? ? int stack = Integer.valueOf(stackStr); ? ? ? ? // 1.判断库存 ? ? ? if (stack<1){// 说明没有库存 ? ? ? ? ? ? return "很遗憾商品已售罄"+goods; ? ? ? } ? ? ? ? // 生成订单 订单加1 自增1 ? ? ? redisTemplate.opsForValue().increment("order_"+goods); ? ? ? ? try { ? ? ? ? ? Thread.sleep(10); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? e.printStackTrace(); ? ? ? } ? ? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); ? ? ? ? ? return "当前抢购商品成功"+goods; ? ? } ? ? ? /** ? ? * 获取当前订单商品 状态 ? ? * @param goods ? ? * @return ? ? */ ? @GetMapping("/getState") ? public String getState(String goods){ ? ? ? return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+ ? ? ? ? ? ? ? "--订单:"+ redisTemplate.opsForValue().get("order_"+goods); ? ? ? } }

?

获取订单状态

?

秒杀

?

使用工具进行秒杀

下载ab压力测试

ab -n 请求数 -c 并发数 访问的路径 ab ? -n 5000 -c 20 http://localhost:8080/killGoods?goods=p50

?

?

可以使用 synchronized 解决线程安全

?

?

多台机器同时访问

synchronized无法解决多应用线程问题

?

?

3.使用分布式锁解决线程安全

使用redis 解决分布式线程安全问题 1.引入redis 锁

导入依赖

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.4.RELEASE</version> </parent> <dependencies> <!--springBoot 相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 引入redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>

?application.properties的创建 spring.redis.host=8.130.166.101 spring.redis.port=6379 ? import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; ? import java.util.concurrent.TimeUnit; ? @Component public class RedisLock { ? ? ? @Autowired ? public StringRedisTemplate redisTemplate; ? ? /** ? ? * 尝试 获取锁 ? ? * @param key ? ? * @param value ? ? * @param time ? ? * @return ? ? */ ? public boolean lock(String key,String value,int time){ ? ? ? // 通过 setnx 来判断key 的标记位是否存在 ? ? // 如果存在说明别人已经 持有锁 ? ? // ? ? 不存在 创建,并获得锁 ? ? return ? redisTemplate.opsForValue().setIfAbsent(key,value,time, TimeUnit.SECONDS); ? } ? ? ? /** ? ? * 释放锁 ? ? * @param key ? ? * @return ? ? */ ? public boolean unlock(String key){ ? ? ? return ? redisTemplate.delete(key); ? } ? } ? 2.使用 ? ? @Autowired ? private RedisLock redisLock; ? ? @RequestMapping("/redisKillGoods") ? public synchronized String redisKillGoods(String goods){ ? ? ? ? ? if (redisLock.lock("lock_"+goods,"0",5)){// 拿到锁 ? ? ? ? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods); ? ? ? ? ? int stack = Integer.valueOf(stackStr); ? ? ? ? ? ? // 1.判断库存 ? ? ? ? ? if (stack<1){// 说明没有库存 ? ? ? ? ? ? ? ? // 释放锁 ? ? ? ? ? ? ? redisLock.unlock("lock_"+goods); ? ? ? ? ? ? ? ? return "很遗憾商品已售罄"+goods; ? ? ? ? ? } ? ? ? ? ? ? // 生成订单 订单加1 自增1 ? ? ? ? ? redisTemplate.opsForValue().increment("order_"+goods); ? ? ? ? ? ? try { ? ? ? ? ? ? ? Thread.sleep(10); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); ? ? ? ? ? ? // 释放锁 ? ? ? ? ? redisLock.unlock("lock_"+goods); ? ? ? ? ? ? return "当前抢购商品成功"+goods; ? ? ? }else { ? ? ? ? ? ? return "很遗憾没有抢到宝贝"+goods; ? ? ? } ? ? ? }

使用redis setnx 完成分布式锁缺陷

1.setnx 本身是两条命令 a.set值 b.配置其过期时间

2.如果redis 是主从模式,有可能造成 主从数据延迟问题

3.setnx 有时间限制,如果在使用锁期间,key时间过期了,也会造成安全问题?

解决方案:

1.使用 redis 调用lua 脚本操作 setnx 保证原子性 操作复杂 ? 2.我们使用 redlock (红锁) 我们通过多个key 保证一个分布式锁,通过投票决定是否获取锁,配置的key还拥有看门狗机制(可以查看key是否即将过期,如果快要过期,可以续命) 解决 遇到到 1,2,3 问题 ?

使用zookeeper 实现分布式锁

1.引入zk 依赖

? ? ? ? <dependency> ? ? ? ? ? <groupId>org.apache.zookeeper</groupId> ? ? ? ? ? <artifactId>zookeeper</artifactId> ? ? ? ? ? <version>3.6.0</version> ? ? ? ? ? <exclusions> ? ? ? ? ? ? ? <exclusion> ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api</artifactId> ? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId> ? ? ? ? ? ? ? </exclusion> ? ? ? ? ? ? ? <exclusion> ? ? ? ? ? ? ? ? ? <artifactId>slf4j-log4j12</artifactId> ? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId> ? ? ? ? ? ? ? </exclusion> ? ? ? ? ? ? ? <exclusion> ? ? ? ? ? ? ? ? ? <artifactId>log4j</artifactId> ? ? ? ? ? ? ? ? ? <groupId>log4j</groupId> ? ? ? ? ? ? ? </exclusion> ? ? ? ? ? </exclusions> ? ? ? </dependency> ? ? ? ? <dependency> ? ? ? ? ? <groupId>org.apache.curator</groupId> ? ? ? ? ? <artifactId>curator-recipes</artifactId> ? ? ? ? ? <version>4.0.1</version> ? ? ? ? ? <exclusions> ? ? ? ? ? ? ? <exclusion> ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api</artifactId> ? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId> ? ? ? ? ? ? ? </exclusion> ? ? ? ? ? </exclusions> ? ? ? </dependency> ? 2.容器加入zk ? import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; ? @Configuration public class ZkConfig { ? ? ? /** ? ? * 在容器中加入CuratorFramework ? ? * ? ? * CuratorFramework zk 客户端 ? ? * @return ? ? */ ? @Bean ? public CuratorFramework getCuratorFramework(){ ? ? ? ? // 当客户端 和 服务端 连接异常时,会发起重试 每隔2s 重试1次,总共试 3次 ? ? ? RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000,3); ? ? ? ? CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() ? ? ? ? ? ? ? .retryPolicy(retryPolicy).connectString("192.168.12.145:2181;192.168.12.145:2182;192.168.12.145:2183").build(); ? ? ? ? ? curatorFramework.start(); ? ? ? ? return curatorFramework; ? } ? ? ? } ? 3.使用zk 分布式锁 ? @Autowired ? private CuratorFramework curatorFramework; ? ? /** ? ? * 基于zk 实现的分布式锁 ? ? * @param goods ? ? * @return ? ? */ ? @RequestMapping("/zkKillGoods") ? public synchronized String zkKillGoods(String goods) throws Exception { ? ? ? ? // 封装zk 的分布式锁 相当于 jvm synchronized 中的监视器 ? ? ? InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework,"/"+goods); ? ? ? ? // interProcessMutex.acquire(5, TimeUnit.SECONDS) 去获取锁,若果没有获取到,就等待5s,还没有则放弃 ? ? ? if (interProcessMutex.acquire(5, TimeUnit.SECONDS)){// 拿到锁 ? ? ? ? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods); ? ? ? ? ? int stack = Integer.valueOf(stackStr); ? ? ? ? ? ? // 1.判断库存 ? ? ? ? ? if (stack<1){// 说明没有库存 ? ? ? ? ? ? ? ? // 释放锁 ? ? ? ? ? ? ? interProcessMutex.release(); ? ? ? ? ? ? ? ? return "很遗憾商品已售罄"+goods; ? ? ? ? ? } ? ? ? ? ? ? // 生成订单 订单加1 自增1 ? ? ? ? ? redisTemplate.opsForValue().increment("order_"+goods); ? ? ? ? ? ? try { ? ? ? ? ? ? ? Thread.sleep(10); ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? } ? ? ? ? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+""); ? ? ? ? ? ? // 释放锁 ? ? ? ? ? interProcessMutex.release(); ? ? ? ? ? ? return "当前抢购商品成功"+goods; ? ? ? }else { ? ? ? ? ? ? return "很遗憾没有抢到宝贝"+goods; ? ? ? } ? ? ? }

?

总结:

使用zk 作为分布锁,比较可靠,但是效率太低,一般适用于并发量不大的场合 如果并发量太高,zk 就是最大的性能瓶颈


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redis #zookeeper分布式锁 #JVM #自带的锁 #jdk #8对sychronized