irpas技术客

redis_bugmakers_

未知 3764

一.安装redis

1、安装 C 语言的编译环境

yum install centos-release-scl scl-utils-build yum install -y devtoolset-8-toolchain scl enable devtoolset-8 bash

2、通过 wget 下载

wget https://download.redis.io/releases/redis-6.2.6.tar.gz // 下载路径:/opt

3.解压至当前目录

tar -zxvf redis-6.2.6.tar.gz

4.解压完成后进入目录

cd redis-6.2.6

5.在当前目录下执行 make

make && make install

默认安装在 /usr/local/bin redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何 redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲 redis-check-dump:修复有问题的dump.rdb文件 redis-sentinel:Redis集群使用 redis-server:Redis服务器启动命令 redis-cli:客户端,操作入口

前台启动:***/usr/local/bin*** 目录下启动 redis

redis-server(前台启动)

后台启动:

安装 redis 的目录 /opt/redis-6.2.6 中将 redis.conf 复制到任意一个文件夹下 cp redis.conf /etc/redis.conf // 将redis.conf复制到/etc/下

-修改 /etc/redis.conf 配置文件

vim redis.conf# daemonize no 修改为 daemonize yes /usr/local/bin 目录下启动 redis redis-server /etc/redis.conf 关闭 redis kill 进程 命令 shutdown

二、redis相关知识 单线程+多路IO复用 默认端口号:6379 2.1、 key的操作

keys * 查看当前库所有keyset key value 设置key值与valueexists key 判断key是否存在type key 查看key是什么类型del key 删除指定的key数据unlink key 根据value选择非阻塞删除expire key 10 10秒钟:为给定的key设置过期时间ttl key 查看还有多少秒过期,-1表示永不过期,-2表示已过期

2.2、库的选择:

select 命令切换数据库dbsize 查看当前数据库的key数量flushdb 清空当前库flushall 通杀全部库

2.3、string字符串

一个key对应一个value二进制安全的,即可包含任何数据value最多可以是512m

参数设置:

set key value 设置key值get key 查询key值append key value 将给定的value追加到原值末尾strlen key 获取值的长度setnx key value 只有在key不存在的时候,设置key值incr key 将key值存储的数字增1,只对数字值操作,如果为空,新增值为1decr key 将key值存储的数字减1,只对数字值操作,如果为空,新增值为1incrby/decrby key <步长> 将key值存储的数字增减如步长

补充: 原子操作 不会被打断,从开始到结束 单线程不会被打断

补充额外的字符串参数:

mset key value key value…同时设置一个或者多个key-valuemget key key…同时获取一个或多个valuemsetnx key value key value…同时设置一个或者多个key-value.当且仅当所有给定key都不存在getrange key <起始位置> <结束位置> 获取key的起始位置和结束位置的值setrange key <起始位置> value 将value的值覆盖起始位置开始setex key <> value 设置键值的同时,设置过期时间getset key value 用新值换旧值

2.4、list列表(quickList)

lpush/rpush key value value…从左或者右插入一个或者多个值(头插与尾插)lpop/rpop key 从左或者右吐出一个或者多个值(值在键在,值都没,键都没)rpoplpush key1 key2 从key1列表右边吐出一个值,插入到key2的左边lrange key start stop 按照索引下标获取元素(从左到右)lrange key 0 -1 获取所有值lindex key index 按照索引下标获得元素llen key 获取列表长度linsert key before/after value newvalue 在value的前面插入一个新值lrem key n value 从左边删除n个value值lset key index value 在列表key中的下标index中修改值value

2.5、set集合 字典,哈希表 自动排重且为无序的 常用命令:

sadd key value value… 将一个或者多个member元素加入集合key中,已经存在的member元素被忽略smembers key 取出该集合的所有值sismember key value 判断该集合key是否含有该值scard key 返回该集合的元素个数srem key value value 删除集合中的某个元素spop key 随机从集合中取出一个元素srandmember key n 随即从该集合中取出n个值,不会从集合中删除smove <一个集合a><一个集合b>value 将一个集合a的某个value移动到另一个集合bsinter key1 key2 返回两个集合的交集元素sunion key1 key2 返回两个集合的并集元素sdiff key1 key2 返回两个集合的差集元素(key1有的,key2没有)

2.6、hash哈希 数据结构:当field-value长度较短且个数较少时,使用ziplist(压缩列表),否则使用hashtable 键值对集合,特别适合用于存储对象类型 常用命令:

hset key field value 给key集合中的filed键赋值valuehget key1 field 集合field取出valuehmset key1 field1 value1 field2 value2 批量设置hash的值hexists key1 field 查看哈希表key中,给定域field是否存在hkeys key 列出该hash集合的所有fieldhvals key 列出该hash集合的所有valuehincrby key field increment 为哈希表key中的域field的值加上增量1 -1hsetnx key field value 将哈希表key中的域field的值设置为value,当且仅当域field不存在

2.7、zset 数据结构:map+跳跃表 没有重复元素的字符串集合,按照相关的分数进行排名,排名从低到高,排名可重复

有序集合 zset 与普通集合 set 非常相似,是一个没有重复元素的字符串集合

常用命令:

zadd key score1 value1 score2 value2 将一个或多个member元素及其score值加入到有序key中zrange key start stop (withscores) 返回有序集key,下标在start与stop之间的元素,带withscores,可以让分数一起和值返回到结果集。zrangebyscore key min max(withscores) 返回有序集key,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score的值递增次序排列zrevrangebyscore key max min (withscores)同上,改为从大到小排列zincrby key increment value 为元素的score加上增量zrem key value 删除该集合下,指定值的元素zcount key min max 统计该集合,分数区间内的元素个数zrank key value 返回该值在集合中的排名,从0开始

三、配置文件 redis.conf 3.1、Units

单位,配置大小单位,开头定义了一些基本的度量单位,只支持 bytes,不支持 bit。 大小写不敏感。

3.2、INCLUDES 包含,多实例的情况可以把公用的配置文件提取出来。

3.3、NETWORK 网络相关配置。

bind

默认情况 bind=127.0.0.1 只能接受本机的访问请求。

不写的情况下,无限制接受任何 ip 地址的访问。

生产环境肯定要写你应用服务器的地址,服务器是需要远程访问的,所以需要将其注释掉。

如果开启了protected-mode,那么在没有设定 bind ip 且没有设密码的情况下,Redis 只允许接受本机的响应。 protected-mode

将本机访问保护模式设置 no。 port

端口号,默认 6379。 tcp-backlog

设置 tcp 的 backlog,backlog 其实是一个连接队列,backlog 队列总和 = = = 未完成三次握手队列 + + + 已经完成三次握手队列。

在高并发环境下你需要一个高 backlog 值来避免慢客户端连接问题。 timeout

一个空闲的客户端维持多少秒会关闭,0 表示关闭该功能。即永不关闭。 tcp-keepalive

对访问客户端的一种心跳检测,每个 n 秒检测一次。

单位为秒,如果设置为 0,则不会进行 Keepalive 检测,建议设置成 60。 pidfile

存放 pid 文件的位置,每个实例会产生一个不同的 pid 文件。 loglevel

指定日志记录级别,Redis 总共支持四个级别:debug、verbose、notice、warning,默认为 notice。 3.4、SECURITY

安全。 访问密码的查看、设置和取消。 在命令中设置密码,只是临时的。重启 redis 服务器,密码就还原了。 永久设置,需要在配置文件中进行设置。

3.5、LIMITS 限制。

maxclients

设置 redis 同时可以与多少个客户端进行连接。

默认情况下为 10000 个客户端。

如果达到了此限制,redis 则会拒绝新的连接请求,并且向这些连接请求方发出 max number of clients reached 以作回应。

maxmemory

建议必须设置,否则,将内存占满,造成服务器宕机。

设置 redis 可以使用的内存量。一旦到达内存使用上限,redis 将会试图移除内部数据,移除规则可以通过 maxmemory-policy 来指定。

如果 redis 无法根据移除规则来移除内存中的数据,或者设置了不允许移除,那么 redis 则会针对那些需要申请内存的指令返回错误信息,比如 SET、LPUSH 等。

maxmemory-policy

volatile-lru:使用 LRU 算法移除 key,只对设置了过期时间的键(最近最少使用)。

allkeys-lru:在所有集合 key 中,使用 LRU 算法移除 key。

volatile-random:在过期集合中移除随机的 key,只对设置了过期时间的键。

allkeys-random:在所有集合 key 中,移除随机的 key。

volatile-ttl:移除那些 TTL 值最小的 key,即那些最近要过期的 key。

noeviction:不进行移除。针对写操作,只是返回错误信息。

四、发布订阅 Redis 发布订阅( pub/sub )是一种消息通信模式:发送者( pub )发送消息,订阅者( sub )接收消息。

Redis 客户端可以订阅任意数量的频道。

1、 客户端可以订阅频道 2、当给这个频道发布消息后,消息就会发送给订阅的客户端 subscribe channel # 订阅频道

publish channel hello # 频道发送信息

新数据类型 Bitmaps 1.合理使用操作位可以有效地提高内存使用率和开发使用率 2.本身是一个字符串,不是数据类型,数组的每个单元只能存放0和1,数组的下标在Bitmaps叫做偏移量 3.节省空间,一般存储活跃用户比较多

命令参数: 1.设置值

setbit key offset value

第一次初始化bitmaps,如果偏移量比较大,那么整个初始化过程执行会比较慢,还可能会造成redis的堵塞

2.getbit取值

getbit key offset

3.bitcount 统计数值

bitcount key (start end)

redis的setbit设置或清除的是bit位置,而bitcount计算的是byte的位置

4.bitop 复合操作,交并非异或,结果保存在destkey

bitop and(or/not/xor)destkey key

HyperLogLog 1.统计网页中页面访问量 2.只会根据输入元素来计算基数,而不会储存输入元素本身,不能像集合那样,返回输入的各个元素 3.基数估计是在误差可接受的范围内,快速计算(不重复元素的结算)

命令参数: 1.添加指定的元素到hyperloglog中

pfadd key element

列如 pfadd progame “java” 成功则返回1,不成功返回0

2.计算key的近似基数

pfcount key

即这个key的键位添加了多少个不重复元素

3.一个或多个key合并后的结果存在另一个key

pfmerge destkey sourcekey sourcekey

Geographic 提供经纬度设置,查询范围,距离查询等

命令参数: 1.添加地理位置(经度纬度名称) 当坐标超出指定的范围,命令会返回一个错误 已经添加的数据,无法再添加

geoadd key longitude latitude member

例如 geoadd china:city 121.47 31.23 shanghai

2.获取指定地区的坐标值

geopos key member

例如 geopos china:city shanghai

3.获取两个位置之间的直线距离

geodist key member1 member2 (m km ft mi)

4.以给定的经纬度为中心,找出某一半径的内元素

georadius key longitude latitude radius (m km ft mi)

五、Jedis操作Redis 即 Java 操作 Redis。 1、依赖

<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency>

2、连接 Redis

public class JedisDemo { public static void main(String[] args) { Jedis jedis = new Jedis("192.168.57.101", 6379); String pong = jedis.ping(); System.out.println("连接成功:" + pong); jedis.close(); } }

Key

jedis.set("k1", "v1"); jedis.set("k2", "v2"); jedis.set("k3", "v3"); Set<String> keys = jedis.keys("*"); System.out.println(keys.size()); for (String key : keys) { System.out.println(key); } System.out.println(jedis.exists("k1")); System.out.println(jedis.ttl("k1")); System.out.println(jedis.get("k1"));

String

jedis.mset("str1","v1","str2","v2","str3","v3"); System.out.println(jedis.mget("str1","str2","str3"));

List

List<String> list = jedis.lrange("mylist",0,-1); for (String element : list) { System.out.println(element); }

Set

jedis.sadd("orders", "order01"); jedis.sadd("orders", "order02"); jedis.sadd("orders", "order03"); jedis.sadd("orders", "order04"); Set<String> smembers = jedis.smembers("orders"); for (String order : smembers) { System.out.println(order); } jedis.srem("orders", "order02");

Hash

jedis.hset("hash1","userName","lisi"); System.out.println(jedis.hget("hash1","userName")); Map<String,String> map = new HashMap<String,String>(); map.put("telphone","13810169999"); map.put("address","atguigu"); map.put("email","abc@163.com"); jedis.hmset("hash2",map); List<String> result = jedis.hmget("hash2", "telphone","email"); for (String element : result) { System.out.println(element); }

zset

jedis.zadd("zset01", 100d, "z3"); jedis.zadd("zset01", 90d, "l4"); jedis.zadd("zset01", 80d, "w5"); jedis.zadd("zset01", 70d, "z6"); Set<String> zrange = jedis.zrange("zset01", 0, -1); for (String e : zrange) { System.out.println(e); }

Jedis实例-手机验证码 要求: 1、输入手机号,点击发送后随机生成6位数字码,2分钟有效 2、输入验证码,点击验证,返回成功或失败 3、每个手机号每天只能输入3次

思路:

生成随机6位数字验证码:Random 验证码在2分钟内有效:把验证码放到redis里面,设置过期时间120秒 判断验证码是否一致:从redis获取验证码和输入的验证码进行比较 每个手机每天只能发送3次验证码:incr每次发送后+1,大于2的时候,提交不能发送

生成六位的验证码:

//1.生成6位数字验证码 public static String getCode() { Random random = new Random(); String code = ""; for(int i=0;i<6;i++) { int rand = random.nextInt(10); code += rand; } return code; }

验证码只能发送三次: 验证码只能发送三次,通过incr进行

//2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120 public static void verifyCode(String phone) { //连接redis Jedis jedis = new Jedis("172.22.109.205",6379); //拼接key //手机发送次数key String countKey = "VerifyCode"+phone+":count"; //验证码key String codeKey = "VerifyCode"+phone+":code"; //每个手机每天只能发送三次 String count = jedis.get(countKey); if(count == null) { //没有发送次数,第一次发送 //设置发送次数是1 jedis.setex(countKey,24*60*60,"1"); } else if(Integer.parseInt(count)<=2) { //发送次数+1 jedis.incr(countKey); } else if(Integer.parseInt(count)>2) { //发送三次,不能再发送 System.out.println("今天发送次数已经超过三次"); jedis.close(); } //发送验证码放到redis里面 String vcode = getCode(); jedis.setex(codeKey,120,vcode);//120秒 jedis.close(); }

验证码验证相同与否: 验证码的key 与code是否相等

//3 验证码校验 public static void getRedisCode(String phone,String code) { //从redis获取验证码 Jedis jedis = new Jedis("172.22.109.205",6379); //验证码key String codeKey = "VerifyCode"+phone+":code"; String redisCode = jedis.get(codeKey); //判断 if(redisCode.equals(code)) { System.out.println("成功"); }else { System.out.println("失败"); } jedis.close(); }

完整功能代码展示

public class PhoneCode { public static void main(String[] args) { //模拟验证码发送 verifyCode("13678765435"); //模拟验证码校验 //getRedisCode("13678765435","4444"); } //3 验证码校验 public static void getRedisCode(String phone,String code) { //从redis获取验证码 Jedis jedis = new Jedis("172.22.109.205",6379); //验证码key String codeKey = "VerifyCode"+phone+":code"; String redisCode = jedis.get(codeKey); //判断 if(redisCode.equals(code)) { System.out.println("成功"); }else { System.out.println("失败"); } jedis.close(); } //2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120 public static void verifyCode(String phone) { //连接redis Jedis jedis = new Jedis("172.22.109.205",6379); //拼接key //手机发送次数key String countKey = "VerifyCode"+phone+":count"; //验证码key String codeKey = "VerifyCode"+phone+":code"; //每个手机每天只能发送三次 String count = jedis.get(countKey); if(count == null) { //没有发送次数,第一次发送 //设置发送次数是1 jedis.setex(countKey,24*60*60,"1"); } else if(Integer.parseInt(count)<=2) { //发送次数+1 jedis.incr(countKey); } else if(Integer.parseInt(count)>2) { //发送三次,不能再发送 System.out.println("今天发送次数已经超过三次"); jedis.close(); return;//超过三次之后就会自动退出不会再发送了,不添加这一行,即使显示发送次数,但还会有验证码接收到 } //发送验证码放到redis里面 String vcode = getCode();//调用生成的验证码 jedis.setex(codeKey,120,vcode);//设置生成的验证码只有120秒的时间 jedis.close(); } //1 生成6位数字验证码,code是验证码 public static String getCode() { Random random = new Random(); String code = ""; for(int i=0;i<6;i++) { int rand = random.nextInt(10); code += rand; } return code; } }

Spring Boot整合Redis 创建一个工程为Spring Initializr 1.整合依赖文件 springboot和连接池

<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- spring2.X集成redis所需common-pool2--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.0</version> </dependency>

2.application.properties配置redis配置 主要是配置其ip地址端口号的一些关键信息 其他信息是超时时间,最大连接数等

#Redis服务器地址 spring.redis.host=172.22.109.205 #Redis服务器连接端口 spring.redis.port=6379 #Redis数据库索引(默认为0) spring.redis.database= 0 #连接超时时间(毫秒) spring.redis.timeout=1800000 #连接池最大连接数(使用负值表示没有限制) spring.redis.lettuce.pool.max-active=20 #最大阻塞等待时间(负数表示没限制) spring.redis.lettuce.pool.max-wait=-1 #连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle=5 #连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle=0

3.添加redis配置类 @EnableCaching开启缓存类 @Configuration配置类

@EnableCaching @Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setConnectionFactory(factory); //key序列化方式 template.setKeySerializer(redisSerializer); //value序列化 template.setValueSerializer(jackson2JsonRedisSerializer); //value hashmap序列化 template.setHashValueSerializer(jackson2JsonRedisSerializer); return template; } @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); //解决查询缓存转换异常的问题 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(600)) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)) .disableCachingNullValues(); RedisCacheManager cacheManager = RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); return cacheManager; } }

4.测试类

通过注入RedisTemplate类 调用这个类的opsForValue().set设置属性,以及opsForValue().get获取属性

@RestController @RequestMapping("/redisTest") public class RedisTestController { @Autowired private RedisTemplate redisTemplate; @GetMapping public String testRedis() { //设置值到redis redisTemplate.opsForValue().set("name","bugmaker"); //从redis获取值 String name = (String)redisTemplate.opsForValue().get("name"); return name; } }

六、事务和锁机制 Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis 事务的主要作用就是串联多个命令防止别的命令插队。 Multi、Exec、Discard

Multi Exec Discard 从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入 Exec 后,Redis 会将之前的命令队列中的命令依次执行。 组队的过程中可以通过 Discard 来放弃组队。 组队成功,提交成功 放弃组队 组队中有命令错误,不会执行(当组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。) 组队中不报错,执行时报错

悲观锁 悲观锁(Pessimistic Lock),即每次去拿数据的时候都认为有其他线程会修改,所以每次在拿数据的时候都会上锁,这样其他线程想要拿到这个数据就会被 block 直到成功拿到锁。(效率低)

乐观锁 乐观锁(Optimistic Lock),即每次去拿数据的时候都认为其他线程不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间有没有其他线程去更新这个数据,可以使用版本号等机制。

乐观锁适用于多读的应用类型,这样可以提高吞吐量。

Redis 就是利用这种 check-and-set 机制实现事务的。

Watch、unwatch 在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个 )key 。如果在事务执行之前这个 key 被其他命令所改动,那么事务将被打断。

取消 WATCH 命令对所有 key 的监视。如果在执行 WATCH 命令之后,EXEC 命令或 DISCARD 命令先被执行,那么就不需要再执行 UNWATCH 。

事务三特性

单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。

不保证原子性

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚 。

事务的秒杀示例 解决计数器和人员记录的事务操作 主要的实现功能是 在redis存入商品数,设定秒杀时间,提供用户秒杀窗口,用户秒杀成功,redis中商品数-1,用户信息也存入redis中(为了相同用户只能秒杀一次)

jsp <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://·/xml/ns/javaee" xmlns:xsi="http://·/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> <servlet> <description></description> <display-name>doseckill</display-name> <servlet-name>doseckill</servlet-name> <servlet-class>com.atguigu.SecKillServlet</servlet-class> </servlet> <servlet-mapping> <servlet-name>doseckill</servlet-name> <url-pattern>/doseckill</url-pattern> </servlet-mapping> </web-app> 点击按钮之后,会到一个主页面中 通过一个随机数生成一个用户id,以及获取一个商品的id进行秒杀,秒杀的功能逻辑在其他文件中 /** * 秒杀案例 */ public class SecKillServlet extends HttpServlet { private static final long serialVersionUID = 1L; public SecKillServlet() { super(); } protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String userid = new Random().nextInt(50000) +"" ; String prodid =request.getParameter("prodid"); //boolean isSuccess=SecKill_redis.doSecKill(userid,prodid); boolean isSuccess= SecKill_redisByScript.doSecKill(userid,prodid); response.getWriter().print(isSuccess); } } 具体秒杀的实现功能如下

传过来的参数有用户账号uid(这个形参是通过上面的random的实参传进来的,保证每个用户都不一样),商品id名称(商品id此处已经在jsp界面上写死了,只有一个商品秒杀)

具体的核心步骤以及思路:

先判断这两个有一个为空,则返回false

通过jedis来连接服务器的redis

用户库存的名称和数量为后台服务器redis给出,通过set 用户名称 数量

判断库存的数量是否为0,为0代表还没开始,结束其jedis连接

在判断用户是不是有所重复,通过java代码模块的sismember,判断现在订单的id与服务器id是否有重复的,有重复就结束它的jedis连接

判断商品数量是否为0,为0就结束jedis的连接

核心代码也就是库存数减1个,用户数加1个,多个用户来秒杀库存

public class SecKill_redis { public static void main(String[] args) { Jedis jedis =new Jedis("172.22.109.205",6379); System.out.println(jedis.ping()); jedis.close(); } //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null) { return false; } //2 连接redis Jedis jedis = new Jedis("172.22.109.205",6379); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null) { System.out.println("秒杀还没有开始,请等待"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)) { System.out.println("已经秒杀成功了,不能重复秒杀"); jedis.close(); return false; } //因为kc为字符串,所以先转换城integer类型的 //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc)<=0) { System.out.println("秒杀已经结束了"); jedis.close(); return false; } //7.1 库存-1 jedis.decr(kcKey); //7.2 把秒杀成功用户添加清单里面 jedis.sadd(userKey,uid); System.out.println("秒杀成功了.."); jedis.close(); return true; } }

但是这部分代码有个缺陷,是没有同时按下去的,一步一步进行点击不会出错,但是如果遇到了高并发的数据,就会出现bug

高并发测试 通过使用ab工具进行测试

具体下载方式通过联网进行下载 可以使用命令apt install apache2-utils或者是apt install httpd-tools,如果apt-get不能安装,换成yum(系统版本问题)

其具体参数设计,主要有几个比较重要 -n 请求数量 -c 当前请求次数的并发请求 -T 设计的类型,可以是post,get -p 提交的参数

具体的命令格式可以是 下面这个ip地址是window的。

ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://172.22.109.30:8081/Seckill/doseckill

-p 后面的是文件,通过gedit或者vim postfile, 模拟表单提交参数,以&符号结尾,存放当前目,内容:prodid=0101& -T 是前端界面中

之后执行命令会出现超卖的问题 数量会直接显示负值

关于这个问题会出现连接超时的问题以及商品遗留问题 问题解决

解决出现的超卖问题以及连接超时问题以及商品的遗留问题 增加连接池

节省每次连接redis服务带来的消耗,把连接好的实例反复利用

具体其连接参数如下:

MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;

解决连接超时问题

可以设置一个连接池,进行计数等 具体连接池的代码如下

public class JedisPoolUtil { private static volatile JedisPool jedisPool = null; private JedisPoolUtil() { } public static JedisPool getJedisPoolInstance() { if (null == jedisPool) { synchronized (JedisPoolUtil.class) { if (null == jedisPool) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(200); poolConfig.setMaxIdle(32); poolConfig.setMaxWaitMillis(100*1000); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(true); // ping PONG,判断是否还存在 jedisPool = new JedisPool(poolConfig, "172.22.109.205", 6379, 60000 ); } } } return jedisPool; } public static void release(JedisPool jedisPool, Jedis jedis) { if (null != jedis) { jedisPool.returnResource(jedis); } } }

之后再核心主题类,通过创建连接池再来获取数据

//2 连接redis //Jedis jedis = new Jedis("192.168.44.168",6379); //通过连接池得到jedis对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource();

增加乐观锁 通过设置一个乐观锁 监视并且使用三个常用命令 通过判断版本号,判断是否有更改kckey的混乱使用

具体秒杀的过程通过事务来解决

//加入一个监视的watch jedis.watch(kcKey); //7 秒杀过程 //使用事务 Transaction multi = jedis.multi(); //组队操作 multi.decr(kcKey); multi.sadd(userKey,uid); //执行 List<Object> results = multi.exec(); if(results == null || results.size()==0) { System.out.println("秒杀失败了...."); jedis.close(); return false; } //7.1 库存-1 //jedis.decr(kcKey); //7.2 把秒杀成功用户添加清单里面 //jedis.sadd(userKey,uid); System.out.println("秒杀成功了.."); jedis.close(); return true;

增加lua脚本

增加了乐观锁之后,确实可以解决高并发问题,不会出现超卖的问题 但是由于增加了乐观锁之后,假设一个人买了之后,版本改变了,下一个人都不能买了,所以出现了商品遗留的问题,都卖不出

商品遗留问题

解决该问题通过引入lua脚本 具体lua脚本,嵌入式脚本语言,很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数,提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作,但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

利用lua脚本淘汰用户,解决超卖问题。

redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

通过单线程任务排队的机制解决多个任务的高并发问题 具体lua脚本的逻辑代码如下

local userid=KEYS[1]; local prodid=KEYS[2]; local qtkey="sk:"..prodid..":qt"; local usersKey="sk:"..prodid.":usr'; local userExists=redis.call("sismember",usersKey,userid); if tonumber(userExists)==1 then return 2; end local num= redis.call("get" ,qtkey); if tonumber(num)<=0 then return 0; else redis.call("decr",qtkey); redis.call("sadd",usersKey,userid); end return 1;

与java代码结合在一起如下

public class SecKill_redisByScript { private static final org.slf4j.Logger logger =LoggerFactory.getLogger(SecKill_redisByScript.class) ; public static void main(String[] args) { JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis=jedispool.getResource(); System.out.println(jedis.ping()); Set<HostAndPort> set=new HashSet<HostAndPort>(); // doSecKill("201","sk:0101"); } static String secKillScript ="local userid=KEYS[1];\r\n" + "local prodid=KEYS[2];\r\n" + "local qtkey='sk:'..prodid..\":qt\";\r\n" + "local usersKey='sk:'..prodid..\":usr\";\r\n" + "local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + "if tonumber(userExists)==1 then \r\n" + " return 2;\r\n" + "end\r\n" + "local num= redis.call(\"get\" ,qtkey);\r\n" + "if tonumber(num)<=0 then \r\n" + " return 0;\r\n" + "else \r\n" + " redis.call(\"decr\",qtkey);\r\n" + " redis.call(\"sadd\",usersKey,userid);\r\n" + "end\r\n" + "return 1" ; static String secKillScript2 = "local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" + " return 1"; public static boolean doSecKill(String uid,String prodid) throws IOException { JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis=jedispool.getResource(); //String sha1= .secKillScript; String sha1= jedis.scriptLoad(secKillScript); Object result= jedis.evalsha(sha1, 2, uid,prodid); String reString=String.valueOf(result); if ("0".equals( reString ) ) { System.err.println("已抢空!!"); }else if("1".equals( reString ) ) { System.out.println("抢购成功!!!!"); }else if("2".equals( reString ) ) { System.err.println("该用户已抢过!!"); }else{ System.err.println("抢购异常!!"); } jedis.close(); return true; } }

七、持久化 RDB 在指定的时间间隔内将内存中的数据集快照写入磁盘, 即 Snapshot 快照,恢复时是将快照文件直接读到内存里。 Redis 会单独创建一个子进程(fork)来进行持久化。

先将数据写入到一个临时文件中,待持久化过程完成后,再将这个临时文件内容覆盖到 dump.rdb。

整个过程中,主进程是不进行任何 IO 操作的,这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB 方式要比 AOF 方式更加的高效。

RDB 的缺点是最后一次持久化后的数据可能丢失。

Fork

作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程在 Linux 程序中,fork() 会产生一个和父进程完全相同的子进程,但子进程在此后多会 exec 系统调用,出于效率考虑,Linux 中引入了 写时复制技术一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程

配置 dump 文件名字

在 redis.conf 中配置文件名称,默认为 dump.rdb。 dump 保存位置

rdb 文件的保存路径可以修改。默认为 Redis 启动时命令行所在的目录下。 stop-writes-on-bgsave-error

即当 redis 无法写入磁盘,关闭 redis 的写入操作。 rdbcompression

持久化的文件是否进行压缩存储。 rdbchecksum

完整性的检查,即数据是否完整性、准确性。 save

表示写操作的次数。 格式:save 秒 写操作次数

优点

适合大规模的数据恢复;对数据完整性和一致性要求不高更适合使用;节省磁盘空间;恢复速度快。

缺点

Fork 的时候,内存中的数据被克隆了一份,大致 2 倍的膨胀性需要考虑;虽然 Redis 在 fork 时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能;在备份周期在一定间隔时间做一次备份,所以如果 Redis 意外 down 掉的话,就会丢失最后一次快照后的所有修改。

AOF 以日志的形式来记录每个写操作(增量保存),将 Redis 执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,Redis 启动之初会读取该文件重新构建数据,换言之,如果 Redis 重启就会根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

执行流程

客户端的请求写命令会被 append 追加到 AOF 缓冲区内;

AOF 缓冲区根据 AOF 持久化策略 [always,everysec,no] 将操作 sync 同步到磁盘的 AOF 文件中;

AOF 文件大小超过重写策略或手动重写时,会对 AOF 文件 Rewrite 重写,压缩 AOF 文件容量;

Redis 服务重启时,会重新 load 加载 AOF 文件中的写操作达到数据恢复的目的。

AOF 和 RDB 同时开启时,系统默认读取 AOF 的数据(数据不会存在丢失)

配置 AOF 默认不开启 文件名字 AOF 同步频率设置 appendfsync always

? 始终同步,每次 Redis 的写入都会立刻记入日志;

? 性能较差但数据完整性比较好。

appendfsync everysec

? 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

appendfsync no

? Redis 不主动进行同步,把同步时机交给操作系统。

Rewrite 压缩

当 AOF 文件的大小超过所设定的阈值时,Redis 就会启动 AOF 文件的内容压缩,只保留可以恢复数据的最小指令集。可以使用命令 bgrewriteaof。

优点

备份机制更稳健,丢失数据概率更低;可读的日志文本,通过操作 AOF 稳健,可以处理误操作。

缺点

比起 RDB 占用更多的磁盘空间;恢复备份速度要慢;每次读写都同步的话,有一定的性能压力;存在个别 Bug,造成不能恢复。

选择

官方推荐两个都启用。 如果对数据不敏感,可以选单独用 RDB。 不建议单独用 AOF,因为可能会出现 Bug。 如果只是做纯内存缓存,可以都不用。

八、主从复制 主机数据更新后根据配置和策略, 自动同步到备机的 master/slaver 机制,Master 以写为主,Slaver 以读为主。

读写分离,性能扩展容灾快速恢复一主多从!

搭建一主两从 1、创建文件目录

mkdir /opt/etc

2、将 redis.conf 复制到当前目录

cp /etc/redis.conf /opt/etc/

3、创建 3 个 redis.conf 配置文件

redis6379.conf redis6380.conf redis6381.conf # redis6379.conf include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid port 6379 dbfilename dump6379.rdb # redis6380.conf include /opt/etc/redis.conf pidfile /var/run/redis_6380.pid port 6380 dbfilename dump6380.rdb # redis6381.conf include /opt/etc/redis.conf pidfile /var/run/redis_6381.pid port 6381 dbfilename dump6381.rdb

4、启动 3 台 redis 服务器

info replication

6、配从不配主

slaveof <ip><port> # 成为某个实例的从服务器

7、再次查看主机运行情况 一主二仆 主机 6379,从机 6380 和 6381。

1、 假设从机 6380 挂掉。

当6380重启后,6380不再是6379的从机,而是作为新的master;当再次把6380作为6379的从机加入后,从机会把数据从头到尾复制。

2、假设主机 6379 挂掉。

6380和6381仍然是6379的从机,不会做任何事;当6379重启后,既然是主服务器。

薪火相传 上一个 slave 可以是下一个 slave 的 master,slave 同样可以接收其他 slave的连接和同步请求,那么该 slave 作为了链条中下一个的 master,可以有效减轻 master 的写压力,去中心化降低风险。

slaveof <ip><port>

中途变更转向:会清除之前的数据,重新建立拷贝最新的。

当某个 slave 宕机,后面的 slave 都没法备份。

即当主机挂掉,从机还是从机,但是无法继续写数据。

反客为主 当一个 master 宕机后,后面的 slave 可以立刻升为 master,其后面的 slave 不用做任何修改。

slaveof no one

哨兵模式 反客为主的自动版,即能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

1、创建 sentinel.conf 文件 /opt/etc/sentinel.conf

2、配置哨兵 sentinel monitor mymaster 172.16.88.169 6379 1

mymaster:监控对象起的服务器名称 1:至少有多少个哨兵同意迁移的数量。

3、启动哨兵 redis-sentinel /opt/etc/sentinel.conf 主机挂掉,会从机选举中产生新的主机。选举的规则。

选举规则

根据优先级别,slave-priority/replica-priority,优先选择优先级靠前的。 根据偏移量,优先选择偏移量大的。 根据 runid,优先选择最小的服务。

复制延时 由于所有的写操作都是先在 master 上操作,然后同步更新到 slave 上,所以从 master 同步到 slave 从机有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,slave 机器数量的增加也会使这个问题更加严重。

复制原理

slave 启动成功连接到 master 后会发送一个 sync 命令(同步命令)。master接到命令启动后台的存盘进程,对数据进行持久化操作,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件(rdb)到 slave,以完成一次完全同步。当主服务进行写操作后,和从服务器进行数据同步。全量复制:而 slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。增量复制:master 继续将新的所有收集到的修改命令依次传给 slave,完成同步。只要是重新连接 master,一次完全同步(全量复制)将被自动执行。

九、集群 容量不够,redis 如何进行扩容?

并发写操作, redis 如何分摊?

主从模式,薪火相传模式,主机宕机,导致 ip 地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

无中心化集群配置( redis3.0 ) Redis 集群实现了对 Redis 的水平扩容,即启动 N 个 Redis 节点,将整个数据库分布存储在这 N 个节点中,每个节点存储总数据的 1/N 。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability),即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

搭建 Redis 集群 1、创建配置文件 以redis6379.conf为例 include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid # 更改 port 6379 # 更改 dbfilename dump6379.rdb # 更改 cluster-enabled yes # 打开集群模式 cluster-config-file nodes-6379.conf # 设置节点配置文件名称,需要更改 cluster-node-timeout 15000 # 设置节点失联事件,超过该时间(ms),集群自动进行主从切换

2、启动 3、将 6 个节点合成一个集群 组合之前请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

进入redis安装目录 /opt/redis-6.2.6/src

执行 redis-cli --cluster create --cluster-replicas 1 172.16.88.168:6379 172.16.88.168:6380 172.16.88.168:6381 172.16.88.168:6389 172.16.88.168:6390 172.16.88.168:6391

4、采用集群策略连接 redis-cli -c -p PORT cluster nodes # 命令查看集群信息

redis cluster 如何分配这六个节点? 一个集群至少要有三个主节点。

选项 --cluster-replicas 1,表示希望为集群中的每个主节点创建一个从节点。

分配原则尽量保证每个主数据库运行在不同的 IP 地址,每个从库和主库不在一个 IP 地址上。 什么是 slots? 一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个。 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。 集群中的每个节点负责处理一部分插槽。 例如, 如果一个集群可以有主节点, 其中:

节点 A 负责处理 0 号至 5460 号插槽。节点 B 负责处理 5461 号至 10922 号插槽。节点 C 负责处理 10923 号至 16383 号插槽。

如何在集群中录入值?

在 redis-cli 每次录入、查询键值,redis 都会计算出该 key 应该送往的插槽,如果不是该客户端对应服务器的插槽,redis 会报错,并告知应前往的 redis 实例地址和端口。 redis-cli 客户端提供了 –c 参数实现自动重定向。 例如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。

如何查询集群中的值?

每个主机只能查询自己范围内部的插槽。 cluster keyslot <key>:查询某个 key 的 **slot **。 cluster countkeysinslot <slot>:查询某个 slot 是否有值。 CLUSTER GETKEYSINSLOT <slot><count>:返回 count 个 slot 槽中的键。

故障恢复? 如果主节点下线?从节点能否自动升为主节点?注意:15 秒超时。

当 6379 挂掉后,6389 成为新的主机。

主节点恢复后,主从关系会如何?主节点回来变成从机。

当 6379 重启后,6379 成为 6389 的从机。

如果所有某一段插槽的主从节点都宕掉,redis 服务是否还能继续?

如果某一段插槽的主从都挂掉,而 cluster-require-full-coverage=yes,那么 ,整个集群都挂掉。

如果某一段插槽的主从都挂掉,而 cluster-require-full-coverage=no,那么,该插槽数据全都不能使用,也无法存储。

redis.conf 中的参数 cluster-require-full-coverage

优点

实现扩容; 分摊压力; 无中心配置相对简单。

缺点 多键操作是不被支持的; 多键的 Redis 事务是不被支持的。lua 脚本不被支持; 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

集群的 Jedis 开发

public class JedisClusterTest { public static void main(String[] args) { Set<HostAndPort>set =new HashSet<HostAndPort>(); set.add(new HostAndPort("172.16.88.168",6379)); // 任何一个端口 JedisCluster jedisCluster = new JedisCluster(set); jedisCluster.set("k1", "v1"); System.out.println(jedisCluster.get("k1")); } }

十、应用问题 1、缓存穿透 现象

key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。

比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

造成:

应用服务器压力变大。 redis 命中率下降

如何解决

对空值缓存

如果一个查询返回的数据为空(不管是数据是否不存在),仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。

设置可访问的名单(白名单):

使用 bitmaps 类型定义一个可以访问的名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在 bitmaps 里面,进行拦截,则不允许访问。

采用布隆过滤器

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。

布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

将所有可能存在的数据哈希到一个足够大的 bitmaps 中,一个一定不存在的数据会被这个 bitmaps 拦截掉,从而避免了对底层存储系统的查询压力。

进行实时监控

当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。

缓存击穿 key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。

数据库访问压力瞬间增大。

redis 中没有出现大量 key 过期,redis 正常运行。

(即某个经常访问的 key 过期,突然有大量访问这个数据)

如何解决

预先设置热门数据

在 redis 高峰访问之前,把一些热门数据提前存入到 redis 里面,加大这些热门数据 key 的时长。

实时调整

现场监控哪些数据热门,实时调整 key 的过期时长。

使用锁

缓存雪崩 key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key。

数据库压力变大。即极少的时间段,查询大量 key 的集中过期情况。

如何解决

构建多级缓存架构

nginx 缓存 + redis 缓存 + 其他缓存(ehcache等)

使用锁或队列:

用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况。

设置过期标志更新缓存:

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存。

将缓存失效时间分散开:

比如我们可以在原有的失效时间基础上增加一个随机值,比如 1~5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

分布式锁 由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问

也就是在这个机器上了锁,另外一个机器也要可以识别到这个锁,也就是共享锁,都是同一把锁

解决方案如下:

基于数据库实现分布式锁 基于缓存(Redis等) 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

性能:redis最高 可靠性:zookeeper最高

这里,我们就基于redis实现分布式锁

setnx 上锁,通过del 解释 锁一直没有释放,可以通过设置过期时间来自动释放

但是如果上锁之后就断电了

解决方法为

可以边上锁边设置过期时间,通过命令set users 10 nx ex 12,nx为上锁,ex为过期时间 ttl查看过期时间还有多久 java代码

@GetMapping("testLock") public void testLock(){ //1获取锁,setne ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123",3,TimeUnit.SECONDS); //2获取锁成功、查询num的值 if(lock){ Object value = redisTemplate.opsForValue().get("num"); //2.1判断num为空return if(StringUtils.isEmpty(value)){ return; } //2.2有值就转成成int int num = Integer.parseInt(value+""); //2.3把redis的num加1 redisTemplate.opsForValue().set("num", ++num); //2.4释放锁,del redisTemplate.delete("lock"); }else{ //3获取锁失败、每隔1秒再获取 try { Thread.sleep(1000); testLock(); } catch (InterruptedException e) { e.printStackTrace(); } } }

UUID防止误删 为此应该多一个判断是否是你的锁,虽然是共享锁,都是一样的,但是可以上锁之后在设置时间,还要给每个用户的这把锁都来一个uuid

@GetMapping("testLock") public void testLock(){ String uuid = UUID.randomUUID().toString(); //1获取锁,setne ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3,TimeUnit.SECONDS); //2获取锁成功、查询num的值 if(lock){ ... String lockUuid = (String)redisTemplate.opsForValue().get("lock"); if(uuid.equals(lockUuid)){ //2.4释放锁,del redisTemplate.delete("lock"); } }else{ ... } }

lua脚本保证原子性

问题又来了 如果在判断它的uuid相等之后,正准备解锁,发现又误解他人锁 所以引入lua脚本保证它的原子性 代码如下

@GetMapping("testLockLua") public void testLockLua() { //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中 String uuid = UUID.randomUUID().toString(); //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除! String skuId = "25"; // 访问skuId 为25号的商品 100008348542 String locKey = "lock:" + skuId; // 锁住的是每个商品的数据 // 3 获取锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS); // 第一种: lock 与过期时间中间不写任何的代码。 // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间 // 如果true if (lock) { // 执行的业务逻辑开始 // 获取缓存中的num 数据 Object value = redisTemplate.opsForValue().get("num"); // 如果是空直接返回 if (StringUtils.isEmpty(value)) { return; } // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在! int num = Integer.parseInt(value + ""); // 使num 每次+1 放入缓存 redisTemplate.opsForValue().set("num", String.valueOf(++num)); /*使用lua脚本来锁*/ // 定义lua 脚本 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 使用redis执行lua执行 DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); // 设置一下返回值类型 为Long // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型, // 那么返回字符串与0 会有发生错误。 redisScript.setResultType(Long.class); // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。 redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid); } else { // 其他线程等待 try { // 睡眠 Thread.sleep(1000); // 睡醒了之后,调用方法。 testLockLua(); } catch (InterruptedException e) { e.printStackTrace(); } } }

十一、Redis6.0新功能 ACL ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。

? 在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制: (1)接入权限:用户名和密码

(2)可以执行的命令

(3)可以操作的 KEY

acl list命令展现用户权限列表 acl cat,查看添加权限指令类别 acl whoami命令查看当前用户 acl set user命令创建和编辑用户ACL

IO多线程 IO多线程其实指客户端交互部分的网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程

? Redis 6 加入多线程,但跟 Memcached 这种从 IO处理到数据访问多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题

另外,多线程IO默认也是不开启的,需要再配置文件中配置

io-threads-do-reads yes io-threads 4


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redis #一安装redis1安装 #C #语言的编译环境yum #install