irpas技术客

Redis入门到精通保姆级教程 ---> 【爆肝五万字】_YLi_Jing_redis入门到精通

网络投稿 4978

文章目录 1、NoSQL数据库1.1 NoSQL数据库概述1.2 NoSQL使用场景1.3 NoSQL不适用场景1.4 常见的NoSQL数据库1.4.1 Memcache1.4.2 Redis1.4.3 MongoDB 2、Redis的概述和安装2.1 概述2.2 安装2.2.1 Windows版本安装2.2.2 Linux版本安装 2.3:redis基础知识2.3.1: 端口号2.3.2:数据库2.3.3:基础命令2.3.4:线程 3、Redis常用五大数据类型3.1. Redis键(key)3.2 Redis字符串3.2.1: 常用命令 3.3 Redis列表(List)3.3.1:常用命令 3.4 Redis集合(Set)3.4.1:常用命令 3.5 Redis哈希(Hash)3.6: Redis有序集合Zset(sorted set) 4、Redis配置文件介绍4.1:Units单位4.2:INCLUDES包含4.3:NETWORD网路配置4.4:GENERAL通用4.5:限制4.6:SRCURITY安全 5、Redis的发布和订阅5.1:订阅发布消息图5.1:发布订阅命令行实现 6、Redsi新数据类型6.1、Bitmaps简介6.1.1、命令 6.2、HyperLogLog简介6.2.1、命令 6.3、Geospatial简介6.3.1、命令 7、Redis_Jedis测试7.1、步骤7.2、常用API 8、SpringBoot整合1、引入所需依赖2、添加配置3、封装关于redis的工具类4、进行测试 9、Redis事务\_锁机制9.1、Redis的事务定义9.2、multi、exex、discard9.3、悲观锁与乐观锁9.4、Watch监控9.5、事务的三个特性 10、Redis持久化之RDB(Redis DataBase)10.1、什么是RBD10.2、什么是Fork10.3、dump.rdb 11、Redis持久化之AOF(Append Only File)11.1、是什么11.2、配置部分11.3、AOF数据恢复11.4、Rewrite11.5、优缺点 12、Redis主从复制12.1、概念12.2、主从复制配置12.3、常用三招12.3.1、一主二仆12.3.2、薪火相传12.3.3、反客为主 12.4、复制原理12.5、哨兵模式 13、Redis集群13.1、问题13.2、什么是集群13.3、准备开始搭建集群13.4、什么是slots13.5、-c采用集群策略连接,设置的数据会自动切换到相应的写主机13.6、查询集群中的值13.7、故障修复13.8、优缺点 14、Redis应用问题14.1、缓存穿透14.2、缓存击穿14.3、缓存雪崩 15、分布式锁15.1、概念15.2、分布式锁的主流实现方案15.3、使用redis实现分布式锁15.3.1、设置锁和过期时间15.3.2、UUID防止误删15.3.3、LUA保证删除原子性15.3.4、分布式锁的几个必要条件

1、NoSQL数据库 1.1 NoSQL数据库概述

NoSQL(NoSQL = Not Only SQL), 不仅仅是SQL,泛指非关系型的数据库。

NoSQL不依赖业务逻辑方式存储,而已简单的key-value模式存储,因此大大的增加了数据库的扩展能力。

不遵循SQL标准。不支持ACID远超于SQL的性能。 1.2 NoSQL使用场景 对数据高并发的读写海量数据的读写对数据的高可扩展性 1.3 NoSQL不适用场景 需要事物支持复杂关系的查询 1.4 常见的NoSQL数据库 1.4.1 Memcache 很早出现的NoSQL数据库数据都在内存中,一般不持久化支持简单的key-value模式,支持类型单一一般作为缓存数据库辅助持久化数据库 1.4.2 Redis 几乎覆盖了Memcached的绝大部分功能数据都在内存中,支持持久化,主要用作备份恢复除了支持简单的key-value模式,还支持多种数据结构的存储,比如list、set、hash、zset等。一般作为缓存数据库辅助持久化的数据库 1.4.3 MongoDB 高性能、开源、模式自由(schema free)的文档型数据库数据都在内存中,如果内存不足,把不常用的数据保存到硬盘虽然是key-value模式,但是对value(尤其是Json)提供了丰富的查询功能支持二进制数据及大型对象可以根据数据的特点替代 RDBMS ,成为独立的数据库。 或者配合RDBMS,存储特定的数据。 2、Redis的概述和安装 2.1 概述

Redis是什么

Redis: REmote DIctionary Server (远程字典服务器)

是完全开源免费的,用C语言编写的,遵守BSD协议,是一个高性能的(key/value)分布式内存数据库,基于内存运行,并支持持久化的NoSQL数据库,是当前最热门的NoSQL数据库之一,也被人们称为数据库结构服务器

Redis与其他key-value缓存产品有以下三个特点

Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。Redis支持数据的备份,即master-slave(主从)模式的数据备份

Redis能干嘛

内存存储和持久化: redis支持异步将内存中的数据写到硬盘上,同时不影响继续服务发布、订阅消息系统地图信息分析定时器、计数器最新n个数据

特性

数据类型、基本操作和配置

持久化和复制,RDB、AOF

事物的控制

常用网站

https://redis.io/ 官网

http://· 中文网

2.2 安装 2.2.1 Windows版本安装

下载地址: https://gitee.com/resource-download/redis/tree/master/

下载完成后解压后即可。

双击启动redis-server.exe

双击启动redis-cli.exe

测试简单的存值取值

127.0.0.1:6379> set name 'Mr Qi' OK 127.0.0.1:6379> get name "Mr Qi" 127.0.0.1:6379> 2.2.2 Linux版本安装

官网下载地址 https://redis.io/download/

下载完成后放进我们的linux的/opt目录下

进入/opt目录

cd /opt

然后进行解压缩

tar -zxvf redis-6.2.6.tar.gz

安装gcc(gcc是linux下的一个编译程序,是c语言的编译工具)

yum install gcc

然后在解压后的目录下,执行make(将我们的文件编译成C文件)命令,

cd redis-6.2.6 make

执行make可能出现的问题:

emalloc/jemalloc.h: 没有那个文件或目录

我们需要执行以下命令

make distclean

执行完然后再次make即可。

到这里我们就编译完成了,我们要继续进行安装

make install

安装完成后,它的默认安装位置是usr/local/bin目录下,进入目录执行ls命令进行查看.

我们可以看到里面关于redis的有7个文件,各个文件说明如下

redis-benchmark: 性能测试工具

redis-check-aof: 修复有问题的aof文件

redis-check-rdb:修复有问题的rdb文件

redis-server:redis服务器启动命令

redis-cli:客户端操作入口

启动方式一(不推荐):

执行redis-server命令

这种方式有个弊端,就是我们只要关闭窗口,则服务就会自动关闭。

启动方式二(推荐):

我们需要修改redis.conf文件,将里面的daemonize no改为yes,为了不破坏原有文件,我们可以复制一份文件进行修改。

复制一份redis.conf,复制位置可自己指定

进入/opt/redis-6.2.6目录下执行以下命令

# mycopy 你自己放的具体位置,可自行修改 cp redis.conf /mycopy

进入/mycopy目录下修改redis.conf文件,执行命令如下

cd /mycopy vi redis.conf

将对应位置修改为yes即可,退出即可。

进行启动:

进入/usr/local/bin,执行以下命令

redis-server /mycopy/redis.conf

这样redis就在后台启动了,即使关掉窗口redis仍然可以正常使用

我们可以来查看我们的redis进程是否正常启动:

ps -ef | grep redis

启动成功。

使用客户端进行连接:

redis-cli

退出方式:

shutdown exit

到这里,我们的安装及其相关配置工作就完结了

2.3:redis基础知识 2.3.1: 端口号

redis默认端口号为6379:

由来:6379在是手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字。MERZ长期以来被Redis作者antirez及其朋友当作愚蠢的代名词。后来Redis作者在开发Redis时就选用了这个端口。

# 在redis.conf配置文件中 port 6379 2.3.2:数据库

redis默认16个数据库,类似数组下标从0开始,默认使用零号数据库。redis.conf里面有默认配置。

# Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT <dbid> where # dbid is a number between 0 and 'databases'-1 databases 16 2.3.3:基础命令

select命令切换数据库

# 不同的库可以存储不一样的数据 127.0.0.1:6379> select 1 OK 127.0.0.1:6379[1]> select 2 OK

dbsize查看当前库的key数量

127.0.0.1:6379[2]> set name 'qi' OK 127.0.0.1:6379[2]> dbsize (integer) 1 127.0.0.1:6379[2]> select 1 OK 127.0.0.1:6379[1]> dbsize (integer) 0

keys *可以查看具体的key

127.0.0.1:6379[2]> keys * 1) "name"

flushdb: 清空当前库

flushall:清空全部库

127.0.0.1:6379[2]> dbsize (integer) 1 127.0.0.1:6379[2]> flushdb OK 127.0.0.1:6379[2]> dbsize (integer) 0 2.3.4:线程

Redis是单线程+多路IO复用技术

IO多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。

3、Redis常用五大数据类型 3.1. Redis键(key) 命令作用keys *查看当前库所有keyexists key判断key是否存在type key查看key是什么类型的del删除指定的key数据expire key 10为key设置过期时间ttl key查询还有多久过期(-1:永不过期,-2:已经过期)
3.2 Redis字符串 String类型是Redis中最基本的类型,一个Redis中字符串value最多可以是512MString是二进制安全的.意味着Redis的String可以包含任何数据,比如jpg图片或者是序列化的对象。一个redis中字符串value最多可以是512M. 3.2.1: 常用命令

set <key> <value>:存值(key如若已存在则覆盖)

127.0.0.1:6379> set name qi OK

get <key>:取值

127.0.0.1:6379> get name "qi"

append <key> <value>将给定的value追加到原值的末尾

127.0.0.1:6379> append name jing (integer) 6 127.0.0.1:6379> get name "qijing"

strlen <key>获取值得长度

127.0.0.1:6379> strlen name (integer) 6

setnx key value:只有key不存在时,设置key的值

# 存在key则设置不成功 127.0.0.1:6379> setnx name zhang (integer) 0 # 不存在这个key,成功设置 127.0.0.1:6379> setnx age 10 (integer) 1

incr <key>: 只能对数字值操作+1,如果为空,新增值为1

# key值不存在,也就是key为空 127.0.0.1:6379> incr k1 (integer) 1 127.0.0.1:6379> get k1 "1" # key不为空 127.0.0.1:6379> set age 10 OK 127.0.0.1:6379> incr age (integer) 11 127.0.0.1:6379> get age "11"

decr <key>:只能对数字值操作-1,如果为空,则为0

127.0.0.1:6379> set age 10 OK 127.0.0.1:6379> decr age (integer) 9 127.0.0.1:6379> get age "9"

incrby <key> <步长> :可以设置每次增加多少

127.0.0.1:6379> set age 10 OK 127.0.0.1:6379> incrby age 10 (integer) 20 127.0.0.1:6379> get age "20"

decrby <key> <步长> :可以设置每次减少多少

127.0.0.1:6379> set age 10 OK 127.0.0.1:6379> decrby age 10 (integer) 0 127.0.0.1:6379> get age "0"

mset <key1> <value1> <key2> <value2>:可以设置多个key-value

127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 OK 127.0.0.1:6379> keys * 1) "k1" 2) "k2" 3) "k3"

同理:msetnx <key1> <value2> <key2> <value2>:给定所有key都 不存在设置key的值,一个失败都失败

mget <key1> <key2>:可以获取多个key的value值

127.0.0.1:6379> mget k1 k2 k3 1) "v1" 2) "v2" 3) "v3"

getrange <key> <起始位置> <结束位置>:获取值的范围,类似于java中的substring

127.0.0.1:6379> set name YangLi OK 127.0.0.1:6379> getrange name 0 3 "Yang"

setrange <key> <起始位置> <value>: 从key的起始位置加入值

127.0.0.1:6379> setrange name 3 aaa (integer) 7 127.0.0.1:6379> get name "123aaa7" 127.0.0.1:6379>

setex <key> <过期时间> <value>: 设置过期时间

127.0.0.1:6379> setex age 10 21 OK 127.0.0.1:6379> ttl age (integer) -2 127.0.0.1:6379> get age (nil)

getset <key> <value>:设置新值获取旧值

127.0.0.1:6379> getset name MaYangLi "qijingjing" 127.0.0.1:6379> get name "MaYangLi" 3.3 Redis列表(List) Redis列表是最简单的字符串列表,按照插入顺序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。它的底层是一个双向链表,对两端操作性很高,通过索引下标的操作中间点的性能很差。 3.3.1:常用命令

lpush/rpush <key> <v1> <v2> <v3>:从左边/右边插入一个或多个值

lrange <key> <开始位置> <结束位置>:按照索引下标获得元素

127.0.0.1:6379> lpush name qijingjing yangli (integer) 2 # 0 到 -1 代表全部元素 127.0.0.1:6379> lrange name 0 -1 1) "yangli" 2) "qijingjing"

lpop/rpop <key>: 从左边/右边取到一个值,取出后即值不存在了

127.0.0.1:6379> rpush name v1 v2 v3 (integer) 3 127.0.0.1:6379> lpop name "v1" 127.0.0.1:6379> rpop name "v3"

rpoplpush <key1> <key2>:从key1列表右边吐出一个值,插到key2列表左边

127.0.0.1:6379> rpush name1 v1 v2 v3 (integer) 3 127.0.0.1:6379> rpush name2 v4 v5 v6 (integer) 3 127.0.0.1:6379> rpoplpush name1 name2 "v3" 127.0.0.1:6379> lrange name1 0 -1 1) "v1" 2) "v2" 127.0.0.1:6379> lrange name2 0 -1 1) "v3" 2) "v4" 3) "v5" 4) "v6"

lindex key <下标>:按照索引下标获得元素(从左到右)

127.0.0.1:6379> rpush name v1 v2 v3 (integer) 3 127.0.0.1:6379> lindex name 0 "v1" 127.0.0.1:6379> lindex name 2 "v3"

llen <key>:获得列表长度

127.0.0.1:6379> rpush name v1 v2 (integer) 2 127.0.0.1:6379> llen name (integer) 2

linsert <key> <before>/<after> <value> <newvalue>:在value的前面/后面插入newvalue插入值

127.0.0.1:6379> rpush age v1 v2 v3 (integer) 3 127.0.0.1:6379> linsert age before v3 v4 (integer) 4 127.0.0.1:6379> lrange age 0 -1 1) "v1" 2) "v2" 3) "v4" 4) "v3"

lrem <key><n><value> :从左边删除n个value(从左到右)

127.0.0.1:6379> rpush k11 v1 v2 v3 v3 v3 v4 v5 (integer) 7 127.0.0.1:6379> lrem k11 2 v3 (integer) 2 127.0.0.1:6379> lrange k11 0 -1 1) "v1" 2) "v2" 3) "v3" 4) "v4" 5) "v5"

lset <key> <index> <value>:在指定下标位置换一个新值

127.0.0.1:6379> rpush key v1 v2 v3 (integer) 3 127.0.0.1:6379> lset key 0 vv OK 127.0.0.1:6379> lrange key 0 -1 1) "vv" 2) "v2" 3) "v3" 3.4 Redis集合(Set)

Redis set 对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择

Redis的Set是string类型的无序集合,它底层其实是一个value为null的hash表

3.4.1:常用命令

sadd <key><value1><value2>:将一个或多个元素加入到集合key中,已经存在的会被忽略

smembers<key>:取出该集合所有值

127.0.0.1:6379> sadd name qi qi a b c (integer) 4 127.0.0.1:6379> smembers name 1) "c" 2) "a" 3) "b" 4) "qi"

sismember<key><value>:判断集合key是否有含value值,有1,没有0

127.0.0.1:6379> sadd name qw df (integer) 2 127.0.0.1:6379> sismember name qw (integer) 1 127.0.0.1:6379>

scard<key>:返回该集合的元素个数。

127.0.0.1:6379> sadd age 1 2 3 (integer) 3 127.0.0.1:6379> scard age (integer) 3

srem<key><value1><value2>:删除集合中的某个元素(没有则不做处理)

127.0.0.1:6379> sadd name a b c d (integer) 1 127.0.0.1:6379> srem name a b (integer) 2 127.0.0.1:6379> smembers name 1) "c" 2) "d"

spop<key>:随机从该集合中吐出一个值

127.0.0.1:6379> spop name "a" 127.0.0.1:6379> smembers name 1) "c" 2) "b"

srandmember<key><n>:随机从该集合中取出n个值,不会从集合中删除。

127.0.0.1:6379> sadd name a b c d e (integer) 5 127.0.0.1:6379> srandmember name 3 1) "c" 2) "a" 3) "e"

smove<source><destination><value>:把一个集合的值移到另一个集合

127.0.0.1:6379> sadd age1 1 2 3 4 5 (integer) 5 127.0.0.1:6379> sadd age2 6 7 8 9 10 (integer) 5 127.0.0.1:6379> smove age1 age2 1 (integer) 1 127.0.0.1:6379> smembers age1 1) "2" 2) "3" 3) "4" 4) "5" 127.0.0.1:6379> smembers age2 1) "1" 2) "6" 3) "7" 4) "8" 5) "9" 6) "10"

sinter<key1><key2>:返回两个集合的交集元素

127.0.0.1:6379> sadd age1 1 2 3 4 (integer) 1 127.0.0.1:6379> sadd age2 1 3 567 (integer) 2 127.0.0.1:6379> sinter age1 age2 1) "1" 2) "3"

sunion<key1><key2>:返回两个集合的并集元素

127.0.0.1:6379> sadd age1 1 2 3 4 (integer) 4 127.0.0.1:6379> sadd age1 2 3 4 56 (integer) 1 127.0.0.1:6379> sunion age1 age2 1) "1" 2) "2" 3) "3" 4) "4" 5) "56"

sdiff<key1><key2>:返回两个集合的差集元素(key1中的,key2中没有的)

127.0.0.1:6379> sadd age1 1 3 5 7 (integer) 4 127.0.0.1:6379> sadd age2 1 2 4 6 7 (integer) 5 127.0.0.1:6379> sdiff age1 age2 1) "3" 2) "5" 3.5 Redis哈希(Hash) Redis hash 是一个键值对的集合Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。存储部分变更的数据,如用户信息等。

hset <key><field><value>:给key集合中的field键赋值value

hget <key><field>:从key集合field取出value。

127.0.0.1:6379> hset user name qijingjing (integer) 1 127.0.0.1:6379> hget user name "qijingjing"

hmset <key1><value1><key2><value2>..:批量设置hash的值

127.0.0.1:6379> hmset user age 1 city beijing OK 127.0.0.1:6379> hget user age "1" 127.0.0.1:6379> hget user city "beijing"

hexists<key1><field>:查看哈希表key中,给定field是否存在

127.0.0.1:6379> hset user name qijingjing (integer) 1 127.0.0.1:6379> hexists user name (integer) 1

hkeys <key>:列出该hash集合所有的filed

127.0.0.1:6379> hset user name qijing age 1 (integer) 2 127.0.0.1:6379> hkeys user 1) "name" 2) "age"

hvals <key>:列出hash集合所有的value

127.0.0.1:6379> hmset user age 1 name qijingjing OK 127.0.0.1:6379> hvals user 1) "1" 2) "qijingjing"

hincrby <key><field><increment>:为哈希表key中的field的值进行加减操作

127.0.0.1:6379> hset user age 1 (integer) 1 127.0.0.1:6379> hincrby user age 10 (integer) 11 127.0.0.1:6379> hincrby user age -1 (integer) 10

hsetnx <key><field><value>:将哈希表key中的field的值设置为value,当且仅当field不存在

127.0.0.1:6379> hset user name qijing (integer) 1 127.0.0.1:6379> hsetnx user age 1 (integer) 1 127.0.0.1:6379> hkeys user 1) "name" 2) "age" 3.6: Redis有序集合Zset(sorted set)

与set相比,sorted set增加了一个权重参数score,使得集合的元素可以按照score进行有序排列,可以用sorted set来做带权重的队列,比如普通消息的score为1,重要消息的score为2,让权重高的先执行即可。可以用于排行榜操作。

zadd <key><score1><value1><score2><value2>...:将一个或多个元素加入到有序集合key中

zrange<key><start><stop>[WITHSCORES]:返回有序集合key中,下标在start到stop之间的元素,带WITHSCORES可以让分数一起和值返回到结果集。

127.0.0.1:6379> zadd age 12 xiaohong 13 xiaowang 10 xiaoli (integer) 3 127.0.0.1:6379> zrange age 0 -1 1) "xiaoli" 2) "xiaohong" 3) "xiaowang"

zrangebyscore key min max [withscores][limit offset count]:返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员,有序集合成员按score值递增排列

zrevrangebyscore key max min:同上,只是从大到小排序

127.0.0.1:6379> zadd age 12 xiaohong 13 xiaowang 10 xiaoli 16 xiaohei (integer) 4 127.0.0.1:6379> zrangebyscore age 13 17 1) "xiaowang" 2) "xiaohei"

zincrby <key><increment><value>:为元素的score加上增量

127.0.0.1:6379> zadd age 10 xiaohong 11 xiaolan (integer) 2 127.0.0.1:6379> zincrby age 3 xiaohong "13"

zrem<key><value>:删除该集合下指定值的元素

127.0.0.1:6379> zadd age 10 xiaohong 11 xiaolan (integer) 2 127.0.0.1:6379> zrem age xiaohong (integer) 1 127.0.0.1:6379> zrange age 0 -1 1) "xiaolan"

zcount<key><min><max>:统计该集合分数区间的元素个数

127.0.0.1:6379> zadd age 2 zhangsan 4 lisi 6 wangwu 8 zhaoliu (integer) 4 127.0.0.1:6379> zcount age 2 6 (integer) 3

zrank<key><value>:返回该值在集合中的排名,从0开始

127.0.0.1:6379> zadd age 2 zhangsan 4 lisi 6 wangwu 8 laoliu (integer) 4 127.0.0.1:6379> zrank age laoliu (integer) 3 4、Redis配置文件介绍

Redis的配文件位于Redis安装目录下,文件名为redis.conf

4.1:Units单位

1、配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit

2、对大小写不敏感

4.2:INCLUDES包含

和spring配置文件类似,可以通过includes包含,redis.conf可以作为总文件,可以包含其他文件!

4.3:NETWORD网路配置 bind 127.0.0.1 #绑定的ip protected-mode yes #保护模式,no的话支持远程访问 port 6379 #默认端口 4.4:GENERAL通用 timeout 0 #默认永不超时 tcp-keepalive 300 #检测连接是否还活着 daemonize yes #改为yes,开启redis后台启动 logfile "" #日志文件的位置 databases 16 #设置数据库的数目,默认的数据库是零号数据库 loglevel notice #日志级别。(debug记录大量日志信息,适用于开发、测试阶段,verbose:有用的信息,notice: 生产环境,warning:警告信息) 4.5:限制 maxclients 10000 # 设置能连上redis的最大客户端连接数量 maxmemory <bytes> # redis配置的最大内存容量 maxmemory-policy noeviction # maxmemory-policy 内存达到上限的处理策略 #volatile-lru:利用LRU算法移除设置过过期时间的key。 #volatile-random:随机移除设置过过期时间的key。 #volatile-ttl:移除即将过期的key,根据最近过期时间来删除(辅以TTL) #allkeys-lru:利用LRU算法移除任何key。 #allkeys-random:随机移除任何key。 #noeviction:不移除任何key,只是返回一个写错误。 4.6:SRCURITY安全 # 启动redis # 连接客户端 # 获得和设置密码 config get requirepass config set requirepass "123456" #测试ping,发现需要验证 127.0.0.1:6379> ping NOAUTH Authentication required. # 验证 127.0.0.1:6379> auth 123456 OK 127.0.0.1:6379> ping PONG 5、Redis的发布和订阅

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。 Redis客户端可以订阅任意数量的频道。

5.1:订阅发布消息图

下图展示了频道channel1,以及订阅这个频道的三个客户端–client2、client5、和client1之间的关系:

当有新消息通过PUBLISH命令发送给频道channel1时,这个消息会被发送给订阅它的三个客户端:

5.1:发布订阅命令行实现

1、打开一个客户端订阅一个频道channel1频道

SUBSCRIBE <频道名>:

2、打开另一个客户端,给channel1发布消息hello

publish channell hello:

注意:这里的1是返回订阅该频道的数量

3、然后我们再打开另外一个客户端可以看到发送的消息

6、Redsi新数据类型 6.1、Bitmaps简介

在开发中,可能会遇到这种情况:需要统计用户的某些信息,如活跃或不活跃,登录或者不登录;又如 需要记录用户一年的打卡情况,打卡了是1, 没有打卡是0,如果使用普通的 key/value存储,则要记录 365条记录,如果用户量很大,需要的空间也会很大,所以 Redis 提供了 Bitmap 位图这中数据结构, Bitmap 就是通过操作二进制位来进行记录,即为 0 和 1;如果要记录 365 天的打卡情况,使用 Bitmap 表示的形式大概如下:0101000111000111…,这样有什么好处呢?当然就是节约内存 了,365 天相当于 365 bit,又 1 字节 = 8 bit , 所以相当于使用 46 个字节即可。 BitMap 就是通过一个 bit 位来表示某个元素对应的值或者状态, 其中的 key 就是对应元素本身,实际上 底层也是通过对字符串的操作来实现。Redis 从 2.2 版本之后新增了setbit, getbit, bitcount 等几个 bitmap 相关命令。

6.1.1、命令

setbit key offset value: 设置key的第offset位为value(1或0)

# 案例,记录一周打卡情况 # 周一:1,周二:0,周三:0,周四:1,周五:1,周六:0,周天:0 (1 为打卡,0 为不打卡) 127.0.0.1:6379> setbit sign 0 1 (integer) 0 127.0.0.1:6379> setbit sign 1 0 (integer) 0 127.0.0.1:6379> setbit sign 2 0 (integer) 0 127.0.0.1:6379> setbit sign 3 1 (integer) 0 127.0.0.1:6379> setbit sign 4 1 (integer) 0 127.0.0.1:6379> setbit sign 5 0 (integer) 0 127.0.0.1:6379> setbit sign 6 0 (integer) 0

getbit key offset:获取offset设置的值,未设置的默认返回0

127.0.0.1:6379> getbit sign 3 # 查看周四是否打卡 1 127.0.0.1:6379> getbit sign 6 # 查看周七是否打卡 0

bitcoun key[start,end]:统计key上位为1的个数

# 统计这周打卡的记录,可以看到只有3天是打卡的状态: 127.0.0.1:6379> bitcount sign 3 6.2、HyperLogLog简介

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积 非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。 HyperLogLog则是一种算法,它提供了不精确的去重计数方案。

什么是基数

比如数据集{1,3,5,7,5,7,8},那么这个数据集的基数集为{1,3,5,7,8},基数(不重复元素为5),基数估计就是在误差可接受的范围内,快速计算基数。

6.2.1、命令

pfadd<key><element>[element...]:添加指定元素到HyperLogLog中

pfcount <key>[key…]:计算name的近似基数

pfmerge destkey sourcekey[sourcekey...]:将多个HyperLogLog合并为一个HyperLogLog,并集计算

127.0.0.1:6379> pfadd name qi (integer) 1 127.0.0.1:6379> pfadd name wang (integer) 1 # 重复的加不进去 127.0.0.1:6379> pfadd name qi (integer) 0 127.0.0.1:6379> pfadd name a b c (integer) 1 127.0.0.1:6379> pfcount name (integer) 5 127.0.0.1:6379> pfadd name1 a b c d e e f (integer) 1 127.0.0.1:6379> pfmerge name_result name name1 OK 127.0.0.1:6379> pfcount name_result (integer) 8 6.3、Geospatial简介

Redis3.2中增加了多GEO类型的支持,GEO,Geographic,地理信息的缩写。该类型就是元素的二维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。

6.3.1、命令

geoadd

解析:

语法 geoadd key longitude latitude member ... # 将给定的空间元素(纬度、经度、名字)添加到指定的键里面。 # 这些数据会以有序集合的形式被储存在键里面,从而使得georadius和georadiusbymember这样的命令可以在之后通过位置查询取得这些元素。 # geoadd命令以标准的x,y格式接受参数,所以用户必须先输入经度,然后再输入纬度。 # geoadd能够记录的坐标是有限的:非常接近两极的区域无法被索引。 # 有效的经度介于-180-180度之间,有效的纬度介于-85.05112878 度至 85.05112878 度之间。, #当用户尝试输入一个超出范围的经度或者纬度时,geoadd命令将返回一个错误。

测试:百度搜索经纬度查询,模拟真实数据

127.0.0.1:6379> geoadd china:city 116.23 40.22 beijing (integer) 1 127.0.0.1:6379> geoadd china:city 121.48 31.40 shanghai 113.88 22.55 shenzhen 120.21 30.20 hangzhou (integer) 3 127.0.0.1:6379> geoadd china:city 106.54 29.40 chongqing 108.93 34.23 xian 114.02 30.58 wuhan (integer) 3

geopos

解析:

# 语法 geopos key member[member...] # 从key里返回所有给定位置元素的位置(经度和维度)

测试:

127.0.0.1:6379> geopos china:city beijing 1) 1) "116.23000055551528931" 2) "40.2200010338739844" 127.0.0.1:6379> geopos china:city shanghai chongqing 1) 1) "121.48000091314315796" 2) "31.40000025319353938" 2) 1) "106.54000014066696167" 2) "29.39999880018641676"

geodist

解析:

# 语法 geodist key member1 member2 [unit] # 返回两个给定位置之间的距离,如果两个位置之间的其中一个不存在,那么命令返回空值。 # 指定单位的参数unit必须是以下单位的其中一个: # m表示单位为米 # km表示单位为千米 # mi表示单位为英里 # ft表示单位为英尺 # 如果用户没有显式地指定单位参数,那么geodist默认使用米作为单位。 #geodist命令在计算距离时会假设地球为完美的球形,在极限情况下,这一假设最大会造成0.5%的误差。 127.0.0.1:6379> geodist china:city beijing shanghai km "1088.7854" 127.0.0.1:6379> geodist china:city wuhan chongqing "732286.2747"

georadius

解析

#语法 georadius key longitude latitude radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count] # 以给定的经纬度为中心, 找出某一半径内的元素

测试:

# china:city 中寻找坐标 100 30 半径为 1000km 的城市 127.0.0.1:6379> georadius china:city 100 30 1000 km 1) "chongqing" 2) "xian" # withdist 返回位置名称和中心距离 127.0.0.1:6379> georadius china:city 100 30 1000 km withdist 1) 1) "chongqing" 2) "635.2850" 2) 1) "xian" 2) "963.3171" # withcoord 返回位置名称和经纬度 127.0.0.1:6379> georadius china:city 100 30 1000 km withcoord 1) 1) "chongqing" 2) 1) "106.54000014066696167" 2) "29.39999880018641676" 2) 1) "xian" 2) 1) "108.92999857664108276" 2) "34.23000121926852302" # withdist withcoord 返回位置名称 距离 和经纬度 count 限定寻找个数 127.0.0.1:6379> georadius china:city 100 30 1000 km withdist withcoord count 1 1) 1) "chongqing" 2) "635.2850" 3) 1) "106.54000014066696167" 2) "29.39999880018641676"

georadiusbymember

解析

#语法 georadiusbymember key member radius m|km|ft|mi [withcoord][withdist] [withhash][asc|desc][count count] # 找出位于指定范围内的元素,中心点是由给定的位置元素决定

测试:

127.0.0.1:6379> georadiusbymember china:city beijing 1000 km 1) "beijing" 2) "xian"

geohash

解析:

#语法 geohash key member [member...] # Redis使用geohash将二维经纬度转换为一维字符串,字符串越长表示位置更精确,两个字符串越相似,表示距离越近。

测试:

127.0.0.1:6379> geohash china:city beijing chongqing 1) "wx4sucu47r0" 2) "wm5z22h53v0" 127.0.0.1:6379> geohash china:city beijing shanghai 1) "wx4sucu47r0" 2) "wtw6sk5n300"

zrem

GEO没有提供删除成员的命令,但是因为GEO的底层实现是zset,所以可以借用zrem命令实现对地理位 置信息的删除

127.0.0.1:6379> zrange china:city 0 -1 #查看所有元素 1) "chongqing" 2) "xian" 3) "shenzhen" 4) "wuhan" 5) "hangzhou" 6) "shanghai" 7) "beijing" 127.0.0.1:6379> zrem china:city beijing # 移除一个元素 (integer) 1 127.0.0.1:6379> zrange china:city 0 -1 1) "chongqing" 2) "xian" 3) "shenzhen" 4) "wuhan" 5) "hangzhou" 6) "shanghai" 7、Redis_Jedis测试 7.1、步骤

1、创建一个普通的maven项目,引入项目所需要jar包

<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.2.2</version> </dependency>

2、编写测试类进行测试

public class JedisDemo1 { public static void main(String[] args) { // 创建Jedis对象 Jedis jedis = new Jedis("101.34.254.161", 6379); // 验证密码,没有设置的话忽略即可。 jedis.auth("password"); // 测试 String ping = jedis.ping(); System.out.println(ping); } }

注意:链接不上的,需要将redis.conf文件这两个地方做个变动

#bind 127.0.0.1 -::1 这个需要注释掉 protocted-mode no 这个需要改为no

并且需要在防火墙上添加6379端口,或者彻底关闭防火墙。

并且需要在服务器安全组添加6379端口放行即可。

测试类结果:

PONG 7.2、常用API

基本操作

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); //验证密码,如果没有设置密码这段代码省略 // jedis.auth("password"); jedis.connect(); //连接 jedis.disconnect(); //断开连接 jedis.flushAll(); //清空所有的key } }

对key操作的命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); System.out.println("清空数据:" + jedis.flushDB()); System.out.println("判断某个键是否存在:" + jedis.exists("username")); System.out.println("新增<'username','qi'>的键值 对:" + jedis.set("username", "qi")); System.out.println("新增<'password','password'>的键值 对:" + jedis.set("password", "password")); System.out.print("系统中所有的键如下:"); Set<String> keys = jedis.keys("*"); System.out.println(keys); System.out.println("删除键password:" + jedis.del("password")); System.out.println("判断键password是否存在:" + jedis.exists("password")); System.out.println("查看键username所存储的值的类 型:" + jedis.type("username")); System.out.println("随机返回key空间的一个:" + jedis.randomKey()); System.out.println("重命名key:" + jedis.rename("username", "name")); System.out.println("取出改后的name:" + jedis.get("name")); System.out.println("按索引查询:" + jedis.select(0)); System.out.println("删除当前选择数据库中的所有key:" + jedis.flushDB()); System.out.println("返回当前数据库中key的数目:" + jedis.dbSize()); System.out.println("删除所有数据库中的所有key:" + jedis.flushAll()); } }

对String操作的命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.flushDB(); System.out.println("===========增加数据==========="); System.out.println(jedis.set("key1", "value1")); System.out.println(jedis.set("key2", "value2")); System.out.println(jedis.set("key3", "value3")); System.out.println("删除键key2:" + jedis.del("key2")); System.out.println("获取键key2:" + jedis.get("key2")); System.out.println("修改key1:" + jedis.set("key1", "value1Changed")); System.out.println("获取key1的值:" + jedis.get("key1")); System.out.println("在key3后面加入值:" + jedis.append("key3", "End")); System.out.println("key3的值:" + jedis.get("key3")); System.out.println("增加多个键值对:" + jedis.mset("key01", "value01", "key02", "value02", "key03", "value03")); System.out.println("获取多个键值对:" + jedis.mget("key01", "key02", "key03")); System.out.println("获取多个键值对:" + jedis.mget("key01", "key02", "key03", "key04")); System.out.println("删除多个键值对:" + jedis.del("key01", "key02")); System.out.println("获取多个键值对:" + jedis.mget("key01", "key02", "key03")); jedis.flushDB(); System.out.println("===========新增键值对防止覆盖原先值=============="); System.out.println(jedis.setnx("key1", "value1")); System.out.println(jedis.setnx("key2", "value2")); System.out.println(jedis.setnx("key2", "value2-new")); System.out.println(jedis.get("key1")); System.out.println(jedis.get("key2")); System.out.println("===========新增键值对并设置有效时间============="); System.out.println(jedis.setex("key3", 2, "value3")); System.out.println(jedis.get("key3")); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(jedis.get("key3")); System.out.println("===========获取原值,更新为新值=========="); System.out.println(jedis.getSet("key2", "key2GetSet")); System.out.println(jedis.get("key2")); System.out.println("获得key2的值的字串:" + jedis.getrange("key2", 2, 4)); } }

对List操作命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.flushDB(); System.out.println("===========添加一个list==========="); jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WeakHashMap", "LinkedHashMap"); jedis.lpush("collections", "HashSet"); jedis.lpush("collections", "TreeSet"); jedis.lpush("collections", "TreeMap"); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1));//-1代表倒数第一个元素,-2代表倒数第二个元素,end为-1表示查询全部 System.out.println("collections区间0-3的元素:" + jedis.lrange("collections", 0, 3)); System.out.println("==============================="); // 删除列表指定的值 ,第二个参数为删除的个数(有重复时),后add进去的值先被删,类似于出栈 System.out.println("删除指定元素个数:" + jedis.lrem("collections", 2, "HashMap")); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("删除下表0-3区间之外的元素:" + jedis.ltrim("collections", 0, 3)); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("collections列表出栈(左端):" + jedis.lpop("collections")); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("collections添加元素,从列表右端,与lpush相对应:" + jedis.rpush("collections", "EnumMap")); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("collections列表出栈(右端):" + jedis.rpop("collections")); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("修改collections指定下标1的内容:" + jedis.lset("collections", 1, "LinkedArrayList")); System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1)); System.out.println("==============================="); System.out.println("collections的长度:" + jedis.llen("collections")); System.out.println("获取collections下标为2的元素:" + jedis.lindex("collections", 2)); System.out.println("==============================="); jedis.lpush("sortedList", "3", "6", "2", "0", "7", "4"); System.out.println("sortedList排序前:" + jedis.lrange("sortedList", 0, -1)); System.out.println(jedis.sort("sortedList")); System.out.println("sortedList排序后:" + jedis.lrange("sortedList", 0, -1)); } }

对set操作命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.flushDB(); System.out.println("============向集合中添加元素(不重复============"); System.out.println(jedis.sadd("eleSet", "e1", "e2", "e4", "e3", "e0", "e8", "e7", "e5")); System.out.println(jedis.sadd("eleSet", "e6")); System.out.println(jedis.sadd("eleSet", "e6")); System.out.println("eleSet的所有元素为:" + jedis.smembers("eleSet")); System.out.println("删除一个元素e0:" + jedis.srem("eleSet", "e0")); System.out.println("eleSet的所有元素为:" + jedis.smembers("eleSet")); System.out.println("删除两个元素e7和e6:" + jedis.srem("eleSet", "e7", "e6")); System.out.println("eleSet的所有元素为:" + jedis.smembers("eleSet")); System.out.println("随机的移除集合中的一个元素:" + jedis.spop("eleSet")); System.out.println("随机的移除集合中的一个元素:" + jedis.spop("eleSet")); System.out.println("eleSet的所有元素为:" + jedis.smembers("eleSet")); System.out.println("eleSet中包含元素的个数:" + jedis.scard("eleSet")); System.out.println("e3是否在eleSet中:" + jedis.sismember("eleSet", "e3")); System.out.println("e1是否在eleSet中:" + jedis.sismember("eleSet", "e1")); System.out.println("e1是否在eleSet中:" + jedis.sismember("eleSet", "e5")); System.out.println("================================="); System.out.println(jedis.sadd("eleSet1", "e1", "e2", "e4", "e3", "e0", "e8", "e7", "e5")); System.out.println(jedis.sadd("eleSet2", "e1", "e2", "e4", "e3", "e0", "e8")); System.out.println("将eleSet1中删除e1并存入eleSet3中:" + jedis.smove("eleSet1", "eleSet3", "e1"));//移到集合元素 System.out.println("将eleSet1中删除e2并存入eleSet3中:" + jedis.smove("eleSet1", "eleSet3", "e2")); System.out.println("eleSet1中的元素:" + jedis.smembers("eleSet1")); System.out.println("eleSet3中的元素:" + jedis.smembers("eleSet3")); System.out.println("============集合运算================="); System.out.println("eleSet1中的元素:" + jedis.smembers("eleSet1")); System.out.println("eleSet2中的元素:" + jedis.smembers("eleSet2")); System.out.println("eleSet1和eleSet2的交集:" + jedis.sinter("eleSet1", "eleSet2")); System.out.println("eleSet1和eleSet2的并集:" + jedis.sunion("eleSet1", "eleSet2")); System.out.println("eleSet1和eleSet2的差集:" + jedis.sdiff("eleSet1", "eleSet2"));//eleSet1中有,eleSet2中没有 jedis.sinterstore("eleSet4", "eleSet1", "eleSet2");//求交集并将交集保存到dstkey的集合 System.out.println("eleSet4中的元素:" + jedis.smembers("eleSet4")); } }

对Hash的操作命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.flushDB(); Map<String, String> map = new HashMap<>(); map.put("key1", "value1"); map.put("key2", "value2"); map.put("key3", "value3"); map.put("key4", "value4"); //添加名称为hash(key)的hash元素 jedis.hmset("hash", map); //向名称为hash的hash中添加key为key5,value为value5元素 jedis.hset("hash", "key5", "value5"); System.out.println("散列hash的所有键值对为:" + jedis.hgetAll("hash"));//return Map<String,String> System.out.println("散列hash的所有键为:" + jedis.hkeys("hash"));//returnSet<String> System.out.println("散列hash的所有值为:" + jedis.hvals("hash"));//returnList<String> System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:" + jedis.hincrBy("hash", "key6", 6)); System.out.println("散列hash的所有键值对为:" + jedis.hgetAll("hash")); System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:" + jedis.hincrBy("hash", "key6", 3)); System.out.println("散列hash的所有键值对为:" + jedis.hgetAll("hash")); System.out.println("删除一个或者多个键值对:" + jedis.hdel("hash", "key2")); System.out.println("散列hash的所有键值对为:" + jedis.hgetAll("hash")); System.out.println("散列hash中键值对的个数:" + jedis.hlen("hash")); System.out.println("判断hash中是否存在key2:" + jedis.hexists("hash", "key2")); System.out.println("判断hash中是否存在key3:" + jedis.hexists("hash", "key3")); System.out.println("获取hash中的值:" + jedis.hmget("hash", "key3")); System.out.println("获取hash中的值:" + jedis.hmget("hash", "key3", "key4")); } }

对zset操作命令

public class JedisDemo1 { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1","6379"); jedis.zadd("china",100d,"shanghai"); Set<String> zrange =jedis.zrange("china",0,-1); for(String e : zrange){ System.out.println(e); } } } 8、SpringBoot整合 1、引入所需依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 2、添加配置 spring: redis: host: 127.0.0.1 port: 6379 password: database: 0 timeout: 10s lettuce: pool: # 最大空闲连接 max-idle: 8 # 连接池最小空闲连接 min-idle: 0 # 连接池最大连接数(负值表示没有限制) max-active: 8 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1 3、封装关于redis的工具类 package com.lili.util; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author YLi_Jing */ @EnableCaching @Configuration public class RedisConfig { @Bean @SuppressWarnings("all") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { //我们为了自己的开发方便,一般直接使用<String, Objecj>类型 RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(factory); //Json序列化配置 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); //String的序列化 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } } package com.lili.util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @Component public final class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; // =============================common============================ /** * 指定缓存失效时间 * * @param key 键 * @param time 时间(秒) */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } } // ============================String============================= /** * 普通缓存获取 * * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * * @param key 键 * @param delta 要增加几(大于0) */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * * @param key 键 * @param delta 要减少几(小于0) */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } // ================================Map================================= /** * HashGet * * @param key 键 不能为null * @param item 项 不能为null */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * * @param key 键 * @return 对应的多个键值 */ public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * * @param key 键 * @param map 对应多个键值 */ public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * * @param key 键 * @param item 项 * @param by 要增加几(大于0) */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * * @param key 键 * @param item 项 * @param by 要减少记(小于0) */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } // ============================set============================= /** * 根据key获取Set中的所有值 * * @param key 键 */ public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) { expire(key, time); } return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * * @param key 键 */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } // ===============================list================================= /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 */ public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * * @param key 键 */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } } 4、进行测试 @SpringBootTest class RedisSpringbootApplicationTests { @Autowired private RedisUtil redisUtil; @Test void contextLoads() { redisUtil.set("name","qijingjing"); System.out.println(redisUtil.get("name")); } } 9、Redis事务_锁机制 9.1、Redis的事务定义

Redis事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程中,会按照顺序串化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。

9.2、multi、exex、discard

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行、直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。这个组队过程中可以通过discard来放弃组队。

案例:

127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> exec 1) OK 2) OK

注意:

如果组队的时候命令失败,则所有命令都不执行 127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED # 组队的时候出错 127.0.0.1:6379> set k3 (error) ERR wrong number of arguments for 'set' command 127.0.0.1:6379> exec (error) EXECABORT Transaction discarded because of previous errors. 如果在执行的时候,组队命令错误,则错误的不执行,其他的都执行 127.0.0.1:6379> multi OK 127.0.0.1:6379> set k1 v1 QUEUED 127.0.0.1:6379> incr k1 QUEUED 127.0.0.1:6379> set k2 v2 QUEUED 127.0.0.1:6379> exec 1) OK 2) (error) ERR value is not an integer or out of range 3) OK 9.3、悲观锁与乐观锁

悲观锁:

悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿到这个数据就会block直到它拿到锁。传统的关系型数据库里面就用到了很多这种锁机制、比如行锁,表锁,读锁,写锁等,都是在操作之前先上锁。

乐观锁:

乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁。但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读的应用类型,这样可以提高吞吐量,乐观锁策略:提交版本必须大于记录当前版本才能执行更新。

9.4、Watch监控

在执行multi之前,先执行watch key1[key 2],可以监视一个或多个key,如果事务执行之前key被其他命令所改动,那么事务将被打断。

1、设置一个年龄信息

127.0.0.1:6379> set age 12 OK

2、使用watch检测age,事务期间age数据未改动,事务执行成功

127.0.0.1:6379> set age 12 OK 127.0.0.1:6379> watch age OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> incr age QUEUED 127.0.0.1:6379> exec 1) (integer) 13

3、使用watch检测age,事务期间age数据变动,事务执行失败

窗口一:

然后在窗口二进行变动age的值

再到窗口一执行事务:

注意:一旦执行exec开启事务的执行后,无论事务是否执行成功,watch对变量的监控都将被取消。所以事务执行失败后,需要重新执行watch命令对变量进行监控,并开启新的事务进行操作。

小结:

watch指令类似于乐观锁,在事务提交时,如果watch监控的多个key中任何key的值已经被其他客户端更改,则使用exec执行事务时,事务队列不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。

9.5、事务的三个特性

单独的隔离操作:

事务中的所有命令都会序列化,按顺序执行,事务在执行过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念:

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。

不保证原子性:

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚 10、Redis持久化之RDB(Redis DataBase) 10.1、什么是RBD

在指定时间间隔内将内存中的数据集写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。在整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那么RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化数据可能丢失。

10.2、什么是Fork

Fork的作用是复制一个与当前进程一样的进程。新进程的所有数据都可原进程一致,但是是一个全新的进程,并作为原进程的子进程。

10.3、dump.rdb

1、rdb保存的是dump.rdb文件

2、在redis.conf配置中关于rbd的配置

rbd是时间间隔内将数据集写入磁盘,可以在这里配置相关触发条件。

默认触发条件:

一分钟内改了1万次5分钟内改了10次15分钟内改了1次

如果想禁用RDB的持久化策略,只要不设置任何save指令,或者给save传入一个空字符串参数也可以。

其余命令解析

Stop-writes-on-bgsave-error:当Redis无法写入磁盘的话,直接关掉redis的写操作。推荐yes

rbdcompression:对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用 LZF算法进行压缩,如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。

rdbchecksum:在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约 10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能。默认为yes。

如何触发RDB快照

1、save命令或者bgsave命令

save只管保存,其他不管,全部阻塞bgsave,Redis会在后台异步进行快照操作,快照同时还可以相应客户端请求。可以通过lastsave命令获取左后一次成功执行快照的时间。

2、执行flushall命令的时候,也会触发dump.rdb文件,因为需要将里面的内容清空

3、shutdown退出的时候也会触发dump.rbd文件

rdb的备份

1、复制一份

2、将备份文件移动到redis安装目录并启动即可

127.0.0.1:6379> config get dir dir /usr/local/bin

优缺点

优点:

适合大规模的数据恢复对数据完整性和一致性要求不高

缺点:

在一定间隔内做一次备份,如果redis意外down掉的话,就会丢失最后一次快照后的所有修改Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑。

小结

11、Redis持久化之AOF(Append Only File) 11.1、是什么

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

aop保存的是appendonly.aof文件

11.2、配置部分 appendonly no # 是否以append only模式作为持久化方式,默认使用的是rdb方式持久化,这种方式在许多应用中已经足够用了 appendfilename "appendonly.aof" # appendfilename AOF 文件名称 appendfsync everysec # appendfsync aof持久化策略的配置 # no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快。 # always表示每次写入都执行fsync,以保证数据同步到磁盘。 # everysec表示每秒执行一次fsync,可能会导致丢失这1s数据。 No-appendfsync-on-rewrite #重写时是否可以运用Appendfsync,用默认no即可,保证数据安全性 Auto-aof-rewrite-min-size 64mb # 设置重写的基准值 Auto-aof-rewrite-percentage 100 #设置重写的基准值 # 以上两个,表示重写触发机制是大于64mb的100%也就是128mb触发 11.3、AOF数据恢复

正常恢复:

appendonly no 改为yes将有数据的aof进行复制一份保存到对应目录(config get dir)重启即可

异常恢复:

appendonly no 设置yes

redis-check-aof --fix appendonly.aof 进行修复

重启redis然后重新加载

11.4、Rewrite

是什么:

AOF 采用文件追加方式,文件会越来越大,为避免出现此种情况,新增了重写机制,当AOF文件的大小 超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集,可以 使用命令 bgrewriteaof !

重写原理:

AOF 文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再 rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧 的aof文件,这点和快照有点类似!

触发机制:

Redis会记录上次重写时的AOF大小,默认配置是当AOF文件是上次rewrite后大小的一倍且文件大 于64M的触发。

AOF持久化流程

1、客户端的请求写命令会被append追加到AOF缓冲区内;

2、AOF缓冲区根据AOF持久化策略【always,everysec,no】将操作sync同步到磁盘的AOF文件中;

3、AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;

11.5、优缺点

优点:

1、每修改同步:appendfsync always 同步持久化,每次发生数据变更会被立即记录到磁盘,性能较差 但数据完整性比较好

2、每秒同步: appendfsync everysec 异步操作,每秒记录 ,如果一秒内宕机,有数据丢失

3、不同步: appendfsync no 从不同步

缺点:

1、相同数据集的数据而言,aof 文件要远大于 rdb文件,恢复速度慢于 rdb。

2、Aof 运行效率要慢于 rdb,每秒同步策略效率较好,不同步效率和rdb相同。

小结

12、Redis主从复制 12.1、概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redsi服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。

默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。

主从复制的作用主要包括:

1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复。

3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务,分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大地提高redis服务器的并发量。

4、高可用:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是redis高可用的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:

1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较 大;

2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有 内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。 电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。 对于这种场景,我们可以使如下这种架构:

12.2、主从复制配置

1、新建一个文件夹,把redis.conf文件copy一份放进去

[root@VM-4-12-centos /]# mkdir myredis [root@VM-4-12-centos /]# cd myredis [root@VM-4-12-centos myredis]# cp /usr/local/bin/redis.conf /myredis/redis.conf

2、配置一主两从,需要创建三个配置文件

redis6380.conf(主)redis6381.conf(从)redis6382.conf(从)

3、在三个配置中写入配置

# 第一个配置文件内容 include /myredis/redis.conf pidfile /var/run/redis_6380.pid port 6380 dbfilename dump6380.rdb # 第二个配置文件内容 include /myredis/redis.conf pidfile /var/run/redis_6381.pid port 6381 dbfilename dump6381.rdb # 第一个配置文件内容 include /myredis/redis.conf pidfile /var/run/redis_6382.pid port 6382 dbfilename dump6382.rdb

现在目录下面已经有刚才建立的三个配置文件了

4、启动三台服务器

5、查看三台主机运行状况

info replication

我们会发现三台主机的运行情况都是一样的,没有主机丛机的概念。这就需要我们有一些配置。

6、配从(从库)不配主(主库)

slaveof <ip><port> (成为某个实例的从服务器)

1、在6381和6382上执行:slaveof 127.0.0.1 6380,然后重新查看主机运行状况

7、测试效果(在主机上进行写操作,在从机上读取)

注意:从机中不能做写操作,只能做读操作。

12.3、常用三招 12.3.1、一主二仆

**问题一:**在一主二仆的情况下,如果其中一个从机down掉,在主机里面添加新数据,当down掉的主机重新启动的话,是否还是从机?down掉的过程中如果主机又添加新数据,重新启动后是否可以复制?

实验:

1、现在主机里面有一个key,然后我们将6382从机shutdown掉,然后查看主机运行信息。

2、在主机里面新添加几个key,然后重新启动6382

3、我们会发现从机重新启动后变成了主机,我们继续手动把它变回从机

slaveof ip port

总结:

从机down掉再重新连接会自动变回主机从机down掉再变回从机会从头开始复制主机的数据。

**问题二:**当主机down掉,从机该何去何从?

实验:

1、将主服务器给shutdown,然后查看从服务器的运行信息

**发现:**主机down掉后,从机知道主机down掉,不做任何事情,之前数据仍然保留

2、将主服务器重新启动,然后查看主服务器的信息

**发现:**主机down掉再重新启动仍然是主机,主仆关系没有发生改变

12.3.2、薪火相传

简单来说就是一个主机下面有一个从机,而这个从机也可以作为主机下面继续跟一个从机。这样的层层链路,可以减轻master的写压力

演示:将6381作为中间链路

如图:6380的从机是6381,而6381的从机是6382,

6380设置的值6381和6382都可以获取得到

12.3.3、反客为主

当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。

12.4、复制原理

Slave启动成功连接到master后会发送一个sync命令

Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步。

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行。

12.5、哨兵模式

概述

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

原理

哨兵通过发送命令,等待Redis服务器的响应,从而监控运行的多个Redis实例。

配置测试

1、重新调整为6380为主,下面带着6381和6382

2、自定义的/myredis目录下新建sentinel.conf文件,名字千万不要错

3、配置哨兵,填写内容

sentinel monitor mymaster 127.0.0.1 6379 1其中mymaster为监控对象起的服务器名称,1 :至少有多少个哨兵同一迁移的数量

4、启动哨兵

redis-sentinel sentinel.conf

5、正常主从演示

6、down掉主机也就是6380

7、查看6381主机运行情况

8、重启之前的主机进行观察

9、可以设置从服务器的优先级

在redis.conf文件中:默认: slave-priority 100 ,值越小优先级越高

优缺点

优点:

1、主从可以切换,故障可以转移,系统可用性好

2、哨兵模式是主从模式的升级,更健壮、可用

缺点:

1、配置繁琐

2、复制延时,在master上写然后同步,当系统很繁忙的时候,这个问题更加严重。

13、Redis集群 13.1、问题 容量不够,redis如何进行扩容?并发写操作,redis如何分摊?主从模式,薪火相传,主机宕机,导致ip发生变化,需要修改很多配置,相当繁琐,但是redis3.0中提供了解决方案,就是无中心化集群配置。 13.2、什么是集群

Redis集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N

Redis集群通过分区(partiition)来提供一定程度的可用性(availablity):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求。

13.3、准备开始搭建集群

1、创建/myredis-sluster文件夹来存放我们的相关文件

mkdir myredis-cluster

2、复制一份redis.conf放进去(跟主从那块一样)

[root@VM-4-12-centos /]# cd myredis-cluster/ [root@VM-4-12-centos myredis-cluster]# cp /usr/local/bin/redis.conf /myredis-cluster/redis.conf

3、创建6个配置文件,一个主一个从,也就是三组主从关系

redis6390.conf , redis6391.conf

redis6392.conf , redis6393.conf

redis6394.conf , redis6395.conf

先创建一个文件,其他的进行复制修改端口即可,执行命令:

vi redis6390.conf

然后在文件中加入以下内容

include /myredis-cluster/redis.conf pidfile "/var/run/redis_6390.pid" port 6390 dbfilename "dump6390.rdb" cluster-enabled yes cluster-config-file nodes-6390.conf cluster-node-timeout 15000

配置解释:

cluster-enabled yes:打开集群模式

cluster-config-file:设定节点配置文件名

cluster-node-timeout: 节点失联时间,超过后集群自动进行主从切换

然后复制5份不同端口的文件,只许改动端口号即可。完成后如下所示:

[root@VM-4-12-centos myredis-cluster]# ls redis6390.conf redis6392.conf redis6394.conf redis.conf redis6391.conf redis6393.conf redis6395.conf

4、启动这6个端口

redis-server redis6390.conf redis-server redis6391.conf redis-server redis6392.conf redis-server redis6393.conf redis-server redis6394.conf redis-server redis6395.conf

5、将6个节点合成一个集群

进入你的src目录下,也就是redis安装位置下面的src文件夹

cd redis-6.2.6/src

执行以下命令即可:

redis-cli --cluster create --cluster-replicas 1 10.0.4.12:6390 10.0.4.12:6392 10.0.4.12:6394 10.0.4.12:6391 10.0.4.12:6393 10.0.4.12:6395

**注意:**此处不要用127.0.0.1,请求真实ip地址。

replicas 1:代表每台主机下面从机的个数,一台主机一台从机,正好三组

执行命令后出现以下提示:

会给你分配好主机从机的对应关系,输入yes即可。

注意这里:[ok] All 16384 slots covered,稍后在slots进行分析

13.4、什么是slots

上面我们将6个节点合并为集群后,最后的结果有一句话

``[ok] All 16384 slots covered`

解释:

一个Redis集群包含16384个插槽(hash slot),数据中的每个键都属于这16384个插槽的其中一个。

集群使用公式CRC16(key)% 16384来计算键key属于哪个槽,其中CRC16(key)语句用于计算键key的CRC16校验和。

集群中的每一部分负责处理一部分插槽。

举例:

比如有A、B、C三个节点,A负责【0-5460】,B负责【5461-10922】,C负责【10923-16383】

13.5、-c采用集群策略连接,设置的数据会自动切换到相应的写主机

如果想设置多个值呢

不在一个slot下的键值,是不能使用mget,mset等多键操作

可以通过{}来定义组的概念,从而使key中{}内相同内容的键值放到一个slot中去

13.6、查询集群中的值

查询某个键的插槽值

10.0.4.12:6390> cluster keyslot k2 (integer) 449

查看某个插槽中有几个key(注意:只能在自己的范围插槽内进行查询)

10.0.4.12:6390> set city beijing -> Redirected to slot [11479] located at 10.0.4.12:6394 OK 10.0.4.12:6394> cluster countkeysinslot 11479 (integer) 1

在指定插槽里面返回指定的个数的key

10.0.4.12:6390> set city beijing -> Redirected to slot [11479] located at 10.0.4.12:6394 OK 10.0.4.12:6394> cluster getkeysinslot 11479 1 1) "city" 13.7、故障修复

shutdown一个主节点,从节点会自动升为主节点

1、将6394主节点shutdown掉,然后重新集群策略连接一个主节点

2、cluster nodes:查看集群主从机相关信息

结论:从节点自动升为主节点,原来的主节点fail掉

主节点恢复后,主从关系如何?主节点变回从机

1、新开一个窗口重新启动6394

redis-server redis6394.conf

2、集群信息变化

结论:主节点恢复后,主节点会变成从机

如果某个节点的主从节点全部宕掉,redis服务是否可以继续

结论:

与redis.conf中的参数 cluster-require-full-coverage有关

如果cluster-require-full-coverage为yes,那么所有集群都挂掉

反之,其他插槽依然可以使用,该插槽全部数据不能使用,也无法存储。

13.8、优缺点

优点:

实现扩容分摊压力无中心配置

缺点:

多键操作不被支持多键的Redis事务不被支持 14、Redis应用问题 14.1、缓存穿透

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败,当用户很多的时候,缓存都没有命中,全部请求了持久层数据库,这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

解决方案

缓存空值

如果一个查询返回的数据为空,我们仍然对这个空结果进行缓存,设置空结果的过期时间会很短,最长不超过5分钟

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。

14.2、缓存击穿

概念

缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这个点进行访问,当在这个key失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在屏障上凿开一个洞

解决方案

设置热点时间永不过期

加互斥锁

分布式锁:保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可

14.3、缓存雪崩

概念

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。

解决方案

redis高可用

就是多搭建redis集群

限流

缓存失效后,通过加锁或者队列来控制数据库写缓存的线程数量,比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

15、分布式锁 15.1、概念

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同的机器上,这将使原单机部署的情况并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题

15.2、分布式锁的主流实现方案 基于数据库实现分布式锁基于缓存(Redis等)基于Zookeeper

优缺点

性能:redis最高可靠性:zookeeper最高 15.3、使用redis实现分布式锁 15.3.1、设置锁和过期时间

setnx key value`:设置key后,不可修改,除非删除后(或者过期后),也就是释放锁后才能再次操作

127.0.0.1:6380> setnx k1 v1 (integer) 1 # 不能修改 127.0.0.1:6380> setnx k1 v2 (integer) 0 127.0.0.1:6380> del k1 (integer) 1 127.0.0.1:6380> setnx k1 v1 (integer) 1

**问题一:**如果刚上完锁突然断点了,那过期时间无法设置了,那锁岂不是这个时间段一直没释放?

解决:我们可以在上锁的时候同时设置过期时间

set key value nx ex 12:即上锁又设置过期时间

127.0.0.1:6380> set k2 v2 nx ex 12 OK 15.3.2、UUID防止误删

问题:

在redis分布式锁的时候,会去设置一个key,如果可以设置成功(也就是这个key不存在)则说明拿到锁,反之没有拿到锁(这个key已经存在,代表已经被上锁),但是会不会有这么一种情况?一个没有拿到锁的线程,把这个key给删掉,也就是释放了别人的锁,这种情况是肯定不能出现的。

解决:

在拿锁的时候,key对应的value值可以用UUID生成,每次在释放锁的时候都要先进行判断,如果对应的value是你自己设置的value,就成功释放,防止误删操作。

15.3.3、LUA保证删除原子性

问题描述

a拿到锁—>执行具体操作—>比较uuid(一样)—>准备删除锁,但是还没有删除的时候锁到了过期时间自动释放—>这个时候b拿到锁—>a继续删除操作—>最终a释放了b的锁

解决方案

使用LUA脚本,类似于redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

15.3.4、分布式锁的几个必要条件 互斥性:任意时刻,只有一个客户端能持有锁不会发生死锁,即使一个客户端持有锁期间崩溃而没有主动解锁,也能保证后续客户端可以加锁加锁解锁必须是同一个客户端加锁和解锁必须具有原子性


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #redis入门到精通 #1NoSQL数据库11 #not #only #SQL