irpas技术客

Redis_6 8

irpas 4816

Redis 1.Redis快速入门 1.1 什么是Redis

redis是一款非关系型(NoSql)键值对数据库

1.2 SQL 和 NOSQL的区别

1.3 Redis安装

1.Redis基于C语言编写 所以需要安装redis所需要的gcc依赖

yum install -y gcc tcl

2.下载redis安装包到指定目录并解压

3.进入redis目录 执行编译命令

make && make install

4.指定redis以后台方式启动 并开机自启

修改redis.conf配置文件 (先拷贝一份)

# 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0 bind 0.0.0.0 # 守护进程,修改为yes后即可后台运行 daemonize yes # 密码,设置后访问Redis必须输入密码 requirepass 123456

其它常见配置

# 监听的端口 port 6379 # 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录 dir . # 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15 databases 1 # 设置redis能够使用的最大内存 maxmemory 512mb # 日志文件,默认为空,不记录日志,可以指定日志文件名 logfile "redis.log"

redis命令

- redis-cli:是redis提供的命令行客户端 - redis-server:是redis的服务端启动脚本 - redis-sentinel:是redis的哨兵启动脚本

通过配置文件设置开机自启 首先新建一个系统服务文件

vi /etc/systemd/system/redis.service 内容如下 [Unit] Description=redis-server After=network.target [Service] Type=forking ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf PrivateTmp=true [Install] WantedBy=multi-user.target

重载系统服务 并设置开机自启

systemctl daemon-reload systemctl enable redis systemctl start redis

Redis 可视化工具

https://github.com/lework/RedisDesktopManager-Windows/releases

1.4 Redis数据结构的使用![

1.4.1 String常用命令 1.String字符串类型

2.Hash 哈希类型 ----------> 无序字典

3.List 列表 ------------> 双向链表结构

4.Set 无序集合

5.Zest有序集合 -----常用于排行榜

1.5 Redis客户端

SpringDataRedis快速入门

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https://spring.io/projects/spring-data-redis 提供了对不同Redis客户端的整合(Lettuce和Jedis) 提供了RedisTemplate统一API来操作Redis 支持Redis的发布订阅模型 支持Redis哨兵和Redis集群 支持基于Lettuce的响应式编程 支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化 支持基于Redis的JDKCollection实现

1.导入依赖

// redis依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> // 连接池依赖 <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>

? 2.配置文件

spring: redis: host: port: 6379 password: lettuce: pool: max-active: 8 #最大连接 max-idle: 8 #最大空闲连接 min-idle: 0 #最小空闲连接 max-wait: 100 #连接等待时间 @SpringBootTest public class JedisTest { @Autowired private RedisTemplate redisTemplate; @Test void testString() { // 插入String类型数据 redisTemplate.opsForValue().set("name", "abc"); // 读取一条String类型数据 Object name = redisTemplate.opsForValue().get("name"); System.out.println("name = " + name); } }

得到的结果却是字节形式?

因为RedisTemplate可以接收到任意Object作为写入Redis,但采用的是JDK序列化,

方案一 : 自定义序列化方式

@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)throws UnknownHostException { // 创建Template RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); // 设置连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); // 设置序列化工具 GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); // key和 hashKey采用 string序列化 redisTemplate.setKeySerializer(RedisSerializer.string()); redisTemplate.setHashKeySerializer(RedisSerializer.string()); // value和 hashValue采用 JSON序列化 redisTemplate.setValueSerializer(jsonRedisSerializer); redisTemplate.setHashValueSerializer(jsonRedisSerializer); return redisTemplate; }

弊端:会将类的class类型写入json结果中存入redis 带来额外的内存开销

方案二 : 使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化。

@SpringBootTest public class JedisTest { @Autowired private RedisTemplate redisTemplate; private static final ObjectMapper mapper = new ObjectMapper(); @Test void testString() throws Exception { User user = new User("ab", 22); // 序列化 String json = mapper.writeValueAsString(user); redisTemplate.opsForValue().set("user", json); // 反序列化 String o = (String) redisTemplate.opsForValue().get("user"); User user1 = mapper.readValue(o, User.class); System.out.println(user1); } } 2.Redis实战篇 2.1 redis常见问题

缓存穿透:指用户请求的数据缓存中和数据库都没有,这样缓存永远不会生效,请求都会打到数据库从而导致数据库压力过大而宕机

解决方案:

增加id的复杂度,给数据做好基础格式校验缓存null值引入布隆过滤器(数据库的数据会通过某种哈希算法得到哈希值 然后进行二进制转换存储到过滤器中)

缓存雪崩:指同一时间大量key失效,或者redis宕机,导致大量请求到数据库带来数据库的巨大压力

解决方案:

key随机设置过期时间利用redis集群提高服务的可用性给缓存业务做限流处理给业务添加多级缓存

缓存击穿:被高并发访问并且缓存重建业务较复杂的key突然失效了,无数请求在瞬间给数据库进行巨大冲击

解决方案:

互斥锁

优点: 没有额外的内存消耗 保证一致性 实现简单 缺点: 线程需要等待 性能受影响 可能会有死锁风险

代码实现

String key = RedisConstants.CACHE_SHOP + id; String shop = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shop)) { return JSONUtil.toBean(shop, Shop.class); } // 不为空 返回错误信息 if (shop != null) { return null; } // 获取每个店铺的锁 String lockKey = RedisConstants.LOCK_SHOP + id; Shop shopDo = null; try { boolean b = tryLock(lockKey); if (!b) { // 获取锁失败 重试 Thread.sleep(50); queryWithMutex(id); } // 成功 查询数据库 shopDo = getById(id); if (Objects.isNull(shopDo)) { // 防止缓存穿透 stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP + shopDo.getId(), "", 1, TimeUnit.MINUTES); return null; } stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP + shopDo.getId(), JSONUtil.toJsonStr(shopDo), 30, TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException(); } finally { // 释放锁 unLock(lockKey); } return shopDo;

逻辑过期

优点:线程无需等待,性能较好 缺点:不保证一致性 有额外内存消耗 实现复杂

代码实现

String key = RedisConstants.CACHE_SHOP + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(shopJson)) { return null; } RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); if (redisData.getExpiration().isAfter(LocalDateTime.now())) { // 缓存时间在当前时间之后 未过期 直接返回 return shop; } // 过期 获取互斥锁 String lockKey = RedisConstants.LOCK_SHOP + id; boolean lock = tryLock(lockKey); if (lock) { // 开启独立线程查询数据 cache_rebuild_executor.submit(() -> { try { // 设置数据到缓存中 this.saveRedisShop(id, 20L); } catch (Exception e) { throw new RuntimeException(); } finally { unLock(lockKey); } }); } return shop; 2.2 基于Redis生成分布式全局唯一ID LocalDateTime localDateTime = LocalDateTime.now(); long nowSecond = localDateTime.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 获取当前时间 String date = localDateTime.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 自增长 long count = redisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 拼接并返回 return timestamp << 32 | count; 2.3 Redisson实现分布式锁 @Resource private RedissonClient redissonClient; void testRedisson() throws InterruptedException { // 获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("redissonLock"); // 尝试获取锁 获取锁的最大等待时间(期间会重试),锁到时间自动释放 boolean tryLock = lock.tryLock(1, 10, TimeUnit.SECONDS); if (tryLock) { try { System.out.println("执行业务"); } finally { // 释放锁 lock.unlock(); } } } 2.4 Redis基于Stream实现消息队列

Stream 是Redis 5.0 引入的新的数据类型,可以实现一个功能非常完善的消息队列

127.0.0.1:6379> xadd s1 * name aaa "1648951747995-0"

127.0.0.1:6379> XREAD count 1 block 0 streams s1 $ 1) 1) "s1" 2) 1) 1) "1648951747995-0" 2) 1) "name" 2) "aaa" (7.19s)

Stream类型消息队列XREAD的特点及缺点

消息可回溯(持久化)一条消息可被多个消费者读取可以阻塞读取

缺点:

会造成消息漏读($在处理最新消息时 后面又连续发了好几条消息 再去读取最新消息时 消息会漏掉)

解决:

127.0.0.1:6379> XGROUP create s1 k1 0 OK

127.0.0.1:6379> XREADGROUP group k1 c2 count 1 block 2000 streams s1 > 1) 1) "s1" 2) 1) 1) "1648953762321-0" 2) 1) "k1" 2) "v3"

确认消息是否被消费过

127.0.0.1:6379> xack s1 k1 1648953760733-0 1648953762321-0 (integer) 2

pending list中查询未被确认的消息

127.0.0.1:6379> XPENDING s1 k1 - + 10

基于Stream实现异步秒杀

private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 在服务启动之前初始化lua脚本 static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } // 创建一个单列线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable { @Override public void run() { String queueName = "stream.orders"; while (true) { try { // 从消息队列中获取消息 List<MapRecord<String, Object, Object>> mapRecordList = stringRedisTemplate.opsForStream() .read(Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed())); // 判断订单消息是否为空 if (CollectionUtil.isEmpty(mapRecordList)) { continue; } // 解析数据 创建订单 MapRecord<String, Object, Object> entries = mapRecordList.get(0); Map<Object, Object> value = entries.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); createVoucherOrder(voucherOrder); // 确定消息 stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", entries.getId()); } catch (Exception e) { log.error("处理订单异常", e); // 从pendlist中查询未消费的消息 handlePendingList(); } } } private void handlePendingList() { String groupName = "stream.orders"; while (true) { try { // 从消息队列中获取消息 List<MapRecord<String, Object, Object>> mapRecordList = stringRedisTemplate.opsForStream() .read(Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(groupName, ReadOffset.from("0"))); // 判断订单消息是否为空 if (CollectionUtil.isEmpty(mapRecordList)) { break; } // 解析数据 创建订单 MapRecord<String, Object, Object> entries = mapRecordList.get(0); Map<Object, Object> value = entries.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); createVoucherOrder(voucherOrder); // 确定消息 stringRedisTemplate.opsForStream().acknowledge(groupName, "g1", entries.getId()); } catch (Exception e) { log.error("处理订单异常", e); } } } } @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getUserId(); Long orderId = redisWorker.nextId("order"); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), orderId.toString() ); int value = result.intValue(); if (value != 0) { // 不为0 代表没有购买资格 return Result.fail(value == 1 ? "库存不足" : "不可重复下单"); } // 返回订单id return Result.ok(orderId); } private void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); RLock lock = redissonClient.getLock("lock:order" + userId); boolean tryLock = lock.tryLock(); // 获取锁 if (!tryLock) { log.error("不允许重复下单"); return; } try { // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("不允许重复下单!"); return; } boolean update = seckillVoucherService.update().setSql("stock = stock -1") .eq("voucher_id", voucherId).gt("stock", 0) .update(); if (!update) { log.error("库存不足"); return; } save(voucherOrder); } finally { lock.unlock(); } } -- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if (tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if (redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0 2.5 GEO数据结构

常见命令

127.0.0.1:6379> GEOADD g1 116.378248 39.865275 x1 116.42803 39.903738 x2 116.322287 39.893729 x3 (integer) 3 // 查询距离 127.0.0.1:6379> GEOADD g1 116.378248 39.865275 x1 116.42803 39.903738 x2 116.322287 39.893729 bjx (integer) 3 // 查询指定范围的数据 127.0.0.1:6379> GEOSEARCH g1 FROMLONLAT 116.378248 39.865275 BYRADIUS 10 km

使用GEO实现附近搜索功能

@Override public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) { if (x == null || y == null) { // 根据类型分页查询 Page<Shop> page = query() .eq("type_id", typeId) .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE)); // 返回数据 return Result.ok(page.getRecords()); } int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE; int end = current * SystemConstants.DEFAULT_PAGE_SIZE; String key = "shop:geo:" + typeId; // 按照距离排序、分页。结果:shopId、distance GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo().search(key, GeoReference.fromCoordinate(x, y), new Distance(5000), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) // 无法进行分页 所以需要手动逻辑分页 ); if (results == null) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = results.getContent(); if (content.size() <= from) { return Result.ok(Collections.emptyList()); } // 4.1.截取 from ~ end的部分 List<Long> ids = new ArrayList<>(content.size()); Map<String, Distance> distanceMap = new HashMap<>(content.size()); content.stream().skip(from).forEach(result -> { // 4.2.获取店铺id String shopIdStr = result.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); // 4.3.获取距离 Distance distance = result.getDistance(); distanceMap.put(shopIdStr, distance); }); // 5.根据id查询Shop String idStr = StrUtil.join(",", ids); List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list(); for (Shop shop : shops) { shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()); } // 6.返回 return Result.ok(shops); } 2.6 BitMap数据结构 主要场景签到表

常见命令

实现签到表功能

@Override public Result sign() { Long userId = UserHolder.getUser().getUserId(); LocalDateTime currentTime = LocalDateTime.now(); String datetime = currentTime.format(DateTimeFormatter.ofPattern(":yyyyMM")); // key 用户id+年月 String key = "sign:" + userId + datetime; stringRedisTemplate.opsForValue().setBit(key, currentTime.getDayOfMonth() - 1, true); return Result.ok(); }

实现查看连续签到功能

@Override public Result signCount() { Long userId = UserHolder.getUser().getUserId(); LocalDateTime currentTime = LocalDateTime.now(); String datetime = currentTime.format(DateTimeFormatter.ofPattern(":yyyyMM")); // key 用户id+年月 String key = "sign:" + userId + datetime; // 获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202204 GET u14 0 List<Long> longList = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create() .get(BitFieldSubCommands.BitFieldType.unsigned(currentTime.getDayOfMonth())).valueAt(0)); // 没有签到结果 if (CollectionUtil.isEmpty(longList)) { return Result.ok(0); } Long num = longList.get(0); if (num == null || num == 0) { return Result.ok(0); } // 6.循环遍历 int count = 0; while (true) { // 6.1.让这个数字与1做与运算,得到数字的最后一个bit位 // 判断这个bit位是否为0 if ((num & 1) == 0) { // 如果为0,说明未签到,结束 break; } else { // 如果不为0,说明已签到,计数器+1 count++; } // 把数字右移一位,抛弃最后一个bit位,继续下一个bit位 num >>>= 1; } return Result.ok(count); } 3.高级篇 3.1数据丢失问题

1.RDB redis数据备份文件 也叫Redis数据快照 将内存中所有的数据记录到磁盘中,当redis实例重启后 快速从磁盘中读取数据

? RDB 使用bgsave命令 fork主进程得到子进程,子进程共享主进程的内存数据,完成fock后读取内存数据并写入rdb文件

? 具体操作:

? 因为在linux系统中主进程无法直接操作物理内存,所以系统会给每个进程分配一个虚拟内存,虚拟内存和物理内存形成的映射关系表叫页表,子进程会复制主进程的页表从而实现与主进程的数据共享,然后子进程读取内存中的数据并且写入rdb文件中。

2.AOF 命令日志文件

默认策略 everysec 每隔1秒将缓冲区数据写入AOF文件。 因为是记录文件,aof文件会被RDB文件大的多,且aop会记录对同一个key的多次操作,但只有最后一次操作才有意义,所以通过执行bgrewriteaof命令 可以使文件重写,用最少的命令达到相同效果


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redis #SQL # #NOSQL的区别13