1 Star 0 Fork 165

ElonChung / Java-Review

forked from flatfish / Java-Review 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Redis详解.md 84.83 KB
一键复制 编辑 原始数据 按行查看 历史
icanci 提交于 2020-10-27 11:15 . 解决Redis图片拦截问题

Redis 详解

  • 上手就用
  • 基本的理论,然后将知识融汇贯通

NoSQL

为什么要用NoSQL

现在处于大数据时代,一般的数据库无法进行分析处理。

  • 单机MySQL的时代
    • App -> DAL -> MySQL
    • 1594599496167
  • 90年代,一个基本的网站访问量一般不会很大,单个数据库完全足够,哪个时候使用静态网页
  • 这种网站的瓶颈
    • 数据量如果太大,一个机器放不下了
    • 数据的索引 超过300w数据 数据的索引(B+ Tree)一个机器内存也放不下
    • 访问量(读写混合)一个服务器受不了
  • 缓存 MemcacMehed + MySQL + 垂直拆分 (读写分离)
    • 1594599653070
    • 网站 80% 的情况都是在读,每次都要去查数据库就很不友好,所以为了减轻数据压力,引入缓存
    • 1594599750628
    • 发展过程:优化数据结构和索引 -> 文件缓存(IO)-> MemcacMehed
  • 分库分表+水平拆分+MySQL集群
    • 1594600176802
    • 数据库的本质:读和写
    • 早些年使用MyISAM:表锁(100w数据,查一条,但是锁住100w数据)
    • 现在InnoDB:行锁 (只锁一行)
    • 慢慢的就开始使用分库分表来解决写的压力
  • 最近的年代:技术爆炸
    • MySQL 等关系型数据库就不够使用了
    • 图形数据库、文件数据库等
  • 目前的一个基本的互联网项目
    • 1594601054009
  • 为什么要用NoSQL
  • 用户的个人信息,社交网络,地理位置。用户产生的数据,用户日志的爆发增长
  • 这个时候就需要使用NoSQL数据库
什么是NoSQL

Not Only SQL = 不仅仅是SQL

泛指非关系型数据库的,随着web2.0互联网的诞生,传统的关系型数据库很难对付web2.0时代,尤其是超大规模的高并发的社区,这个问题暴露的就越加明显,NoSQL现在发展十分迅速。

很多数据类型用户的个人信息,社交网络,地理位置。这些数据类型的存储不需要一个固定的格式,不需要多余的操作就可以横向拓展的 Map<String,Object>

NoSQL特点:

解耦

  • 方便拓展 数据之间没有数据,很好拓展
  • 大数据高性能 (Redis 一秒写8万次,读取11万次,NoSQL的缓存记录级,是一种细粒度的缓存)
  • 数据类型是多样的 不需要事先设计数据库 随去随用
  • 传统的RDBMS和NoSQL
  • 传统的RDBMS
    • 结构化组织
    • SQL
    • 数据和关系都存在单独的表中
    • 操作语言,数据定义语言
    • 严格的一致性
    • 基础的事务
  • NoSQL
    • 不仅仅是数据库
    • 没有固定的查询语言
    • 键值对、列存储、文档存储、图形数据库
    • 最终一致性
    • CAP原理和BASE (异地多活)
    • 高性能、高可用、高可拓

了解:3V+3高

  • 3V:主要是描述问题的
    • 海量Velume
    • 多样Variety
    • 实时Velocity
  • 3高:主要是对程序的要求
    • 高性能 (保证用户体验)
    • 高可用 -> 高并发
    • 高可拓 (随时水平拆分)
  • 真正的公司中的实现,是NoSQL+RDBMS
阿里巴巴架构演进

敏捷开发、极限编程

开源才是技术的王道

如果未来要做架构师:没有什么事加一层是解决不了的,如果有,就加2层

# 商品的基本信息
	名称、价格、商家信息
	关系型数据库就可以解决了 MySQL/Oracle 
 	淘宝早年去 IOE:王坚:推荐文章- 阿里云的这群疯子 https://www.huxiu.com/article/267100.html
 	淘宝内部使用的MySQL是AliSQL
# 商品的描述、评论(文字比较多)
	文档型数据库:MongoDB
# 图片
	分布式文件系统:FastDFS
	- 淘宝自己的 TFS
	- Google的 GFS
	- Hadoop HDFS
	- 阿里云的 OSS
# 商品的关键字(搜索)
	- 搜索引擎 solr ElasticSearch
	- ISearch 多隆
	所有nb的人都有一段苦逼的岁月,但是要向SB一样去坚持
# 商品热门的波段信息
	- 内存数据库
	- Redis Tair Memache
# 商品的交易、外部的接口
	- 第三方应用

大型互联网的问题:

  • 数据类型太多了
  • 数据源繁多,经常重构
  • 数据要改造,大面积改造
NoSQL的四大分类
KV键值对
  • 新浪:Redis
  • 美团:Redis + Tair
  • 阿里、百度:Redis+Memecache
文档型数据库
  • MongoDB (一般必须要使用)
    • MongoDB 是一个基于分布式文件存储的数据库,C++编写,主要用来存储大量的文档
    • MongoDB 是一个介于关系型数据库和非关系型数据库中间产品,MongoDB是非关系型数据中功能最丰富,最像关系型数据库的数据库
    • ConthDB
列存储结构
  • HBase
  • 分布式文件系统
图关系数据库
  • 不是存图形,而是存放的是关系:如社交网络
关系型数据库对比

推荐文章:https://cloud.tencent.com/developer/article/1114538

推荐文章: https://blog.csdn.net/dopamy_busymonkey/article/details/90770799

Redis入门

概述

Redis 是什么 Remote Dictionary Server

1594605421126

是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 也被称为结构化数据库

能干嘛?

  • 内存存储、持久化,内存中是断电就失去,持久化很重要 (rdf、aof)
  • 效率高,可以用于高速缓存
  • 发布订阅系统
  • 地图信息分析
  • 计时器、计数器(浏览量)
  • ......

特性

  • 多样的数据类型
  • 持久化
  • 集群
  • 事务
  • .....

学习中需要用到的东西

Windows 安装
  • 下载安装包 : https://github.com/dmajkic/redis/releases
  • 下载之后得到安装包
  • 解压到磁盘目录即可
  • 1594606419947
  • 开启Redis ,运行服务即可
  • 1594606463953
  • 使用Redis客户端连接 Redis ping 测试连接
  • 1594606544262
  • Windows使用简单,但是Redis使用推荐Linux
Linux 安装
  • 下载安装包

    • 去官网下载
  • 解压Redis安装包

    • 程序一般放在 /opt 目录下

    • [root@ali-icanci ic]# cd /opt/
      [root@ali-icanci opt]# ls
      containerd  redis-5.0.5.tar.gz
      [root@ali-icanci opt]# tar -zxvf redis-5.0.5.tar.gz
    • 1594791691398

    • 进入解压后的文件就是如上图

    • 执行基本的环境安装 [root@ali-icanci redis-5.0.5]# yum install gcc-c++

    • 执行 make命令 进行安装 会下载依赖 然后再执行 make install

    • [root@ali-icanci redis-5.0.5]# make
      cd src && make all
      make[1]: Entering directory `/opt/redis-5.0.5/src'
          CC Makefile.dep
      make[1]: Leaving directory `/opt/redis-5.0.5/src'
      make[1]: Entering directory `/opt/redis-5.0.5/src'
      
      Hint: It's a good idea to run 'make test' ;)
      
      make[1]: Leaving directory `/opt/redis-5.0.5/src'
      [root@ali-icanci redis-5.0.5]# make install
      cd src && make install
      make[1]: Entering directory `/opt/redis-5.0.5/src'
      
      Hint: It's a good idea to run 'make test' ;)
      
          INSTALL install
          INSTALL install
          INSTALL install
          INSTALL install
          INSTALL install
      make[1]: Leaving directory `/opt/redis-5.0.5/src'
      [root@ali-icanci redis-5.0.5]#
    • redis 默认的安装路径在 usr/local/bin 目录下 如下操作

    • [root@ali-icanci redis-5.0.5]# cd /usr/
      [root@ali-icanci usr]# ls
      apache-tomcat-8.5.57  apache-tomcat-8.5.57.tar.gz  bin  etc  games  include  java  lib  lib64  libexec  local  sbin  share  src  tmp
      [root@ali-icanci usr]# cd local/
      [root@ali-icanci local]# ls
      aegis  bin  etc  games  include  lib  lib64  libexec  sbin  share  src
      [root@ali-icanci local]# cd bin/
      [root@ali-icanci bin]# ls
      chardetect  cloud-init      easy_install      easy_install-3.8  jsonpatch    jsonschema       redis-check-aof  redis-cli       redis-server
      cloud-id    cloud-init-per  easy_install-3.6  jsondiff          jsonpointer  redis-benchmark  redis-check-rdb  redis-sentinel
      [root@ali-icanci bin]#
    • 将 redis 的配置文件,复制到我们的当前目录下

    • [root@ali-icanci bin]# ls
      chardetect  cloud-init      easy_install      easy_install-3.8  jsondiff   jsonpointer  redis-benchmark  redis-check-rdb  redis-sentinel
      cloud-id    cloud-init-per  easy_install-3.6  iconfig           jsonpatch  jsonschema   redis-check-aof  redis-cli        redis-server
      [root@ali-icanci bin]# cd iconfig/
      [root@ali-icanci iconfig]# ls
      redis.conf
      [root@ali-icanci iconfig]#
    • 以后就使用这个移动过来的配置文件进行启动

    • redis 默认不是后台启动的 需要进行修改配置文件 修改为yes之后就是默认后台启动了

    • 1594792345201

    • 启动 redis 服务 通过指定配置文件启动服务

    • [root@ali-icanci bin]# redis-server iconfig/redis.conf
      1899:C 15 Jul 2020 13:54:57.740 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
      1899:C 15 Jul 2020 13:54:57.740 # Redis version=5.0.5, bits=64, commit=00000000, modified=0, pid=1899, just started
      1899:C 15 Jul 2020 13:54:57.740 # Configuration loaded
      [root@ali-icanci bin]#
    • 查看是否连接

    • [root@ali-icanci bin]# ls
      chardetect  cloud-init      easy_install      easy_install-3.8  jsondiff   jsonpointer  redis-benchmark  redis-check-rdb  redis-sentinel
      cloud-id    cloud-init-per  easy_install-3.6  iconfig           jsonpatch  jsonschema   redis-check-aof  redis-cli        redis-server
      [root@ali-icanci bin]# redis-cli -p 6379
      127.0.0.1:6379> ping
      PONG
      127.0.0.1:6379> set name ic
      OK
      127.0.0.1:6379> get name
      "ic"
      127.0.0.1:6379>
    • keys * 查看所有的key

    • 查看 redis 的进程是否开启

    • SD[root@ali-icanci bin]# ps -ef | grep redis
      root      1900     1  0 13:54 ?        00:00:00 redis-server 127.0.0.1:6379
      root      2078 29200  0 13:58 pts/1    00:00:00 grep --color=auto redis
      [root@ali-icanci bin]#
    • 关闭服务 SHUTDOWN

    • 127.0.0.1:6379> SHUTDOWN
      not connected>
      not connected> exit
      [root@ali-icanci bin]#
    • 再次查看进程是否存在

    • 后来会使用单机多redis

测试性能
  • redis-benchmark 是一个压力测试工具
  • 官方自带的性能工具
  • redis-benchmark - 命令 如下
序号 选项 描述 默认值
1 -h 指定服务器主机名 127.0.0.1
2 -p 指定服务器端口 6379
3 -s 指定服务器 socket
4 -c 指定并发连接数 50
5 -n 指定请求数 10000
6 -d 以字节的形式指定 SET/GET 值的数据大小 2
7 -k 1=keep alive 0=reconnect 1
8 -r SET/GET/INCR 使用随机 key, SADD 使用随机值
9 -P 通过管道传输 请求 1
10 -q 强制退出 redis。仅显示 query/sec 值
11 --csv 以 CSV 格式输出
12 -l 生成循环,永久执行测试
13 -t 仅运行以逗号分隔的测试命令列表。
14 -I Idle 模式。仅打开 N 个 idle 连接并等待。
  • 简单测试一下

  • # 测试 100 个并发连接 100000请求
    redis-benchmark -h localhost -p 6379 -c 100 -n 100000
    
    # 具体输出如下
  • [root@ali-icanci bin]# redis-benchmark -h localhost -p 6379 -c 100 -n 100000
    ====== PING_INLINE ======
      100000 requests completed in 1.50 seconds
      100 parallel clients
      3 bytes payload
      keep alive: 1
    
    78.62% <= 1 milliseconds
    98.75% <= 2 milliseconds
    99.83% <= 3 milliseconds
    99.90% <= 4 milliseconds
    99.90% <= 5 milliseconds
    99.97% <= 6 milliseconds
    100.00% <= 6 milliseconds
    66622.25 requests per second
    
    ====== PING_BULK ======
      100000 requests completed in 1.53 seconds
      100 parallel clients
      3 bytes payload
      keep alive: 1
    
    80.14% <= 1 milliseconds
    98.64% <= 2 milliseconds
    99.97% <= 3 milliseconds
    100.00% <= 3 milliseconds
    65402.22 requests per second
    
    ====== SET ======
      100000 requests completed in 1.44 seconds
      100 parallel clients
      3 bytes payload
      keep alive: 1
    
    84.50% <= 1 milliseconds
    98.74% <= 2 milliseconds
    99.93% <= 3 milliseconds
    100.00% <= 4 milliseconds
    100.00% <= 4 milliseconds
    69396.25 requests per second
    
    ......
  • 如何查看分析

  • 1594793623988

基础的知识

redis 默认有16个数据库

1594793753603

默认使用的是第0个数据库

可以使用 select number进行切换数据库

127.0.0.1:6379> select 3 # 切换数据库
OK
127.0.0.1:6379[3]> DBSIZE # 查看大小
(integer) 0
127.0.0.1:6379[3]>
127.0.0.1:6379[3]> FLUSHDB # 清除当前数据库
OK
127.0.0.1:6379[3]> FLUSHALL # 清除所有的数据库
OK

思考:为什么是 redis 端口默认是 6379

Redis 是单线程的

Redis 是很快的,是基于内存操作,CPU不是Redis的性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽决定,既然可以 使用单线程来实现,那就使用单线程了。

Redis是C语言写的,官方提供的数据是 10w+的QPS,这个不必Memecache差

Redis为什么是单线程还这么快 ?

  • 误区1:高性能的服务器一定是多线程的
  • 误区2:多线程(CPU 会上下文切换)一定比单线程效率高

核心:Redis是将所有的数据放在内存中的,所以使用单线程操作效率是最高的,多线程(CPU上下文切换,上下文切换是昂贵的),对于内存系统来说,如果没有上下文切换效率就是最高的。多次读写都是在一个CPU上的,在内存情况下,这个就是最佳方案。

Redis五大基本数据类型

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings)散列(hashes)列表(lists)集合(sets)有序集合(sorted sets) 与范围查询, bitmapshyperloglogs地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication)LUA脚本(Lua scripting)LRU驱动事件(LRU eviction)事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

Redis - key
 # 移除命令 move 字段名 1  (1指的是当前数据库)
 
 127.0.0.1:6379> move name 1
 
 # 设置过期时间 EXPIRE 字段名 时间 (时间单位是秒)
 
127.0.0.1:6379> EXPIRE age 10

# 查看所有的信息
keys *

# 查看当前的key是否存在
EXISTS name 

# set k v 
# get v

# ttl name 查看时间

# 查看 key 类型 
type key

# 后面遇到不会的命令就在官网查看
String(字符串类型)
127.0.0.1:6379> FLUSHALL # 清除所有的值
OK
127.0.0.1:6379> clear
127.0.0.1:6379> set key1 v1 # 设置
OK
127.0.0.1:6379> get key1 # 获取
"v1"
127.0.0.1:6379> EXISTS key1 # 判断是否存储
(integer) 1
127.0.0.1:6379> type key1 # 判断类型
string
127.0.0.1:6379> APPEND key1 "hello" # 追加字符串,如果当前的 key 不存在,相当于创建一个值
(integer) 7
127.0.0.1:6379> get key1 # 获取 key
"v1hello"
127.0.0.1:6379> STRLEN key1 # 获取长度
(integer) 7
################################################################################
127.0.0.1:6379> set views 0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views # 自增1 打印在页面上
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> incr views
(integer) 3
127.0.0.1:6379> incr views
(integer) 4
127.0.0.1:6379> incr views
(integer) 5
127.0.0.1:6379> incr views 
(integer) 6
127.0.0.1:6379> decr views#  自减1 打印在页面上
(integer) 5

127.0.0.1:6379> INCRBY views 10 # 自增的步长 10
(integer) 15
127.0.0.1:6379> DECRBY views 10 # 自减的步长 10
(integer) 5
127.0.0.1:6379>
################################################################################
# 字符串的范围  range
127.0.0.1:6379> set k "hello,demo"
OK
127.0.0.1:6379> get k
"hello,demo"
127.0.0.1:6379> GETRANGE k 0 3 # 获取0到3下标,包含0和3 [0,3]
"hell"
127.0.0.1:6379> GETRANGE k 0 -1 #  获取全部的数据
"hello,demo"

127.0.0.1:6379> set k2 "asdasdasfre"
OK
127.0.0.1:6379> get k2
"asdasdasfre"
127.0.0.1:6379> SETRANGE k2 1 xx  # 替换指定位置的字符串
(integer) 11
127.0.0.1:6379> get k2
"axxasdasfre"
127.0.0.1:6379>
################################################################################
# setex (set with expire) # 设置过期时间

# setnx (set if not exist) # 不存在在设置 (在分布式锁中会使用)

127.0.0.1:6379> setex k3 30 "leoooo" #  设置k3的值,在30秒之后过期
OK
127.0.0.1:6379> ttl k3
(integer) 27
127.0.0.1:6379> setnx k4 "redis" # 如果不存在会创建,如果存在设置失败
(integer) 1
127.0.0.1:6379> keys *
1) "k4"
2) "k2"
3) "k"
127.0.0.1:6379> setnx k4 "hashmap"
(integer) 0
127.0.0.1:6379> get k4
"redis"
127.0.0.1:6379>
################################################################################

# mset 创建多个 k-v
# gset 获取多个 k 的值
# msetnx 如果有就设置失败 是一个原子性的操作

127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 k4 v4
OK
127.0.0.1:6379> mget k1 k2 k3 k4
1) "v1"
2) "v2"
3) "v3"
4) "v4"
127.0.0.1:6379> msetnx k1 v11 k2 v22
(integer) 0
127.0.0.1:6379> keys *
1) "k3"
2) "k1"
3) "k4"
4) "k2"
127.0.0.1:6379> mget k1 k2 k3 k4
1) "v1"
2) "v2"
3) "v3"
4) "v4"
127.0.0.1:6379>

# 设置对象
set user:1{name:zhangsan,age:3} # 设置一个user:1对象,值为json来保存一个对象

127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name
1) "zhangsan"
127.0.0.1:6379> mget user:1:age
1) "2"
127.0.0.1:6379>
################################################################################
getset #  先获取在赋值  如果不存在则返回 nil 如果存在值就设置值

127.0.0.1:6379> set k1 k2
OK
127.0.0.1:6379> get k1
"k2"
127.0.0.1:6379> getset k1 9999
"k2"
127.0.0.1:6379> get k1
"9999"

数据结构是相通的

String 类似的使用场景,其中value除了是字符串还可以是数字

  • 计数器
  • 统计多单位的数量
  • 粉丝数
  • 对象缓存存储
List

基本的数据类型,列表

在redis里面,我们可以把list玩成栈、队列、阻塞队列

所有的List命令都是使用l开头的

################################################################################
127.0.0.1:6379> keys * 
(empty list or set)
127.0.0.1:6379> lpush list one # 将一个值或者多个值插入到列表的头部 (左)
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3 
127.0.0.1:6379> lrange list 0 -1  # 获取 list 中的值,通过区间获得具体的值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> lrange list 0 1
1) "three"
2) "two"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> rpush list hello # 将一个值或者多个值插入到列表的尾部 (右)
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "hello"

127.0.0.1:6379> lrange list 0 0 # 获取某个单独的值
1) "three"
127.0.0.1:6379>

################################################################################
# Lpop 左边移除
# Rpop 右边移除

127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "hello"
127.0.0.1:6379> lpop list # 移除 list 的第一个元素
"three"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
3) "hello"
127.0.0.1:6379> rpop list # 移除 list 的最后一个元素
"hello"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379>
################################################################################
# Lindex 获取某一个值 通过下标获取一个值
127.0.0.1:6379> lindex list 2
(nil)
127.0.0.1:6379> lindex list 1
"one"
127.0.0.1:6379>
# Llen 获取 list 长度

127.0.0.1:6379> llen list
(integer) 2
127.0.0.1:6379>
################################################################################
# 移除指定的值
# 取关 就是取出指定的值 uid

127.0.0.1:6379> lrange list 0 -1
1) "six"
2) "two"
3) "one"
4) "two"
5) "one"
127.0.0.1:6379> lrem list 1 one   # 移除list集合中指定个数的 value 精确匹配
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "six"
2) "two"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 2 three
(integer) 0
127.0.0.1:6379> lrange list 0 -1
1) "six"
2) "two"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 2 two
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "six"
2) "one"
127.0.0.1:6379>
################################################################################
trim 修剪 :list 截断元素

127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello1"
2) "hello2"
3) "hello"
4) "hell2"
5) "hell3"
127.0.0.1:6379> ltrim mylist 1 2 # 通过下标截取指定的长度,这个list已经被改变了,截断了只剩下截取的元素
OK
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello2"
2) "hello"
127.0.0.1:6379>
################################################################################
rpoplpush # 移除列表最后一个,并移除到新的列表中

127.0.0.1:6379> rpush myli hello
(integer) 1
127.0.0.1:6379> rpush myli hell1
(integer) 2
127.0.0.1:6379> rpush myli hell2
(integer) 3
127.0.0.1:6379> rpush myli hell3
(integer) 4
127.0.0.1:6379> rpush myli hell4
(integer) 5
127.0.0.1:6379> rpoplpush myli myotherlist
"hell4"
127.0.0.1:6379> lrange myli 0 -1 # 查看原来的列表
1) "hello"
2) "hell1"
3) "hell2"
4) "hell3"
127.0.0.1:6379> lrange myotherlist 0 -1 # 移除之后的列表
1) "hell4"
127.0.0.1:6379>
#################################################################################
lset # 相当于是一个更新操作 

127.0.0.1:6379> EXISTS list # 判断列表是不是存在
(integer) 0
127.0.0.1:6379> lset list 0 item # 如果不存在更新就会报错
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> lset list 0 item  # 如果存在,替换list下标为目标的值
OK
127.0.0.1:6379> LRANGE list 0 -1
1) "item"
127.0.0.1:6379>
#################################################################################
linsert  # 将某个元素插入到某个具体的位置前后

127.0.0.1:6379> rpush list 12
(integer) 1
127.0.0.1:6379> linsert list before 12 qwe
(integer) 2
127.0.0.1:6379> LRANGE list 0 -1
1) "qwe"
2) "12"
127.0.0.1:6379> linsert list after 12 qwe
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1
1) "qwe"
2) "12"
3) "qwe"
127.0.0.1:6379>

小结:

  • 其实是一个链表,before Node after left right 都可插入值
  • 如果key不存在,创建新的链表
  • 如果key存在,则新增内容
  • 如果移除了所有的值,代表空链表,也代表不存在
  • 在两边插入或者改动值,效率最高。中间元素,相对效率会低一些
  • 消息队列!Lpush Rpop Lpop等
Set

Set中的值是不能重复的

#################################################################################
127.0.0.1:6379> sadd myset "hello" # 集合中添加元素
(integer) 1 
127.0.0.1:6379> sadd myset "hello"
(integer) 0
127.0.0.1:6379> sadd myset "hello1"
(integer) 1
127.0.0.1:6379> sadd myset "hello2"
(integer) 1
127.0.0.1:6379> sadd myset "hello3"
(integer) 1
127.0.0.1:6379> sadd myset "hello4"
(integer) 1
127.0.0.1:6379>

127.0.0.1:6379> smembers myset # 查看所有的集合元素
1) "hello2"
2) "hello1"
3) "hello"
4) "hello3"
5) "hello4"
127.0.0.1:6379> sismember myset hello4 # 查看集合中是不是有此元素
(integer) 1
127.0.0.1:6379> sismember myset hello34
(integer) 0
127.0.0.1:6379>
#################################################################################
scard myset # 获取set集合中的元素和个数

127.0.0.1:6379> scard myset
(integer) 5
127.0.0.1:6379>
#################################################################################
srem # 移除set的元素

127.0.0.1:6379> srem myset hello
(integer) 1
127.0.0.1:6379> smembers myset
1) "hello1"
2) "hello3"
3) "hello4"
4) "hello2"
127.0.0.1:6379>

#################################################################################
set 无序不重复 #  RANDMEMBER myset 随机抽出一个元素
127.0.0.1:6379> SRANDMEMBER myset
"hello3"
127.0.0.1:6379> SRANDMEMBER myset
"hello1"
127.0.0.1:6379> SRANDMEMBER myset
"hello4"
127.0.0.1:6379> SRANDMEMBER myset
"hello1"
127.0.0.1:6379>
#################################################################################
随机移除一个元素

127.0.0.1:6379> smembers myset
1) "hello1"
2) "hello3"
3) "hello4"
4) "hello2"
127.0.0.1:6379> spop myset # 随机移除一个元素
"hello4"
127.0.0.1:6379> smembers myset
1) "hello1"
2) "hello3"
3) "hello2"
127.0.0.1:6379>
#################################################################################
将一个指定的值移到另外一个set集合中
初始化两个集合
smove # 移动

127.0.0.1:6379> smembers myset2
1) "c"
2) "d"
3) "a"
4) "b"
5) "e"
6) "f"
127.0.0.1:6379> smembers myset1
1) "12"
2) "13"
3) "14"
4) "15"
5) "16"
6) "17"
7) "18"
127.0.0.1:6379>


127.0.0.1:6379> smove myset1  myset2 13 # 移动元素 ,将一个指定的set集合的值移动到另一个集合
(integer) 1
127.0.0.1:6379> smembers myset1
1) "12"
2) "14"
3) "15"
4) "16"
5) "17"
6) "18"
127.0.0.1:6379> smembers myset2
1) "d"
2) "a"
3) "b"
4) "13"
5) "c"
6) "e"
7) "f"
127.0.0.1:6379>
#################################################################################
微博,B站,共同关注(并集)
数字集合类:
- 差集
- 并集 
- 交集

127.0.0.1:6379> sadd key1 a
(integer) 1
127.0.0.1:6379> sadd key1 b
(integer) 1
127.0.0.1:6379> sadd key1 c
(integer) 1
127.0.0.1:6379> sadd key1 d
(integer) 1
127.0.0.1:6379> sadd key2 c
(integer) 1
127.0.0.1:6379> sadd key2 d
(integer) 1
127.0.0.1:6379> sadd key2 e
(integer) 1
127.0.0.1:6379> sadd key2 f
(integer) 1
127.0.0.1:6379> sdiff key1 key2 # 差集
1) "b"
2) "a"
127.0.0.1:6379> sinter key1 key2 # 交集 共同好友
1) "c"
2) "d"
127.0.0.1:6379> sunion key1 key2 # 并集
1) "b"
2) "c"
3) "a"
4) "e"
5) "f"
6) "d"
127.0.0.1:6379>

微博,A用户将所有关注的人放在一个set集合中,将粉丝也放在集合中

Hash

map集合 key-<key,value> 本质和String的类型没有太大区别,还是一个简单的 key - value

127.0.0.1:6379> hset myhash f1 ic # 设置一个具体的值
(integer) 1
127.0.0.1:6379> hget myhash f1 # 打印一个具体的值
"ic"
127.0.0.1:6379> hmset myhash i1 v1 i2 v2 i3 v3 # 设置多个字段的值
OK
127.0.0.1:6379> hmget myhash i1 i2 i3 # 获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> hgetall myhash # 获取所有的值
1) "f1"
2) "ic"
3) "i1"
4) "v1"
5) "i2"
6) "v2"
7) "i3"
8) "v3"
127.0.0.1:6379> hdel myhash f1 # 删除一个
(integer) 1
127.0.0.1:6379>
#################################################################################
hlen # 获取hash表的长度

127.0.0.1:6379> hlen myhash
(integer) 3
127.0.0.1:6379>
#################################################################################
127.0.0.1:6379> hexists myhash i1 # 判断是否有某个元素
(integer) 1
127.0.0.1:6379> hexists myhash i7 # 判断是否有某个元素
(integer) 0
127.0.0.1:6379>
#################################################################################
只获得所有的字段 和 左右的值

127.0.0.1:6379> hkeys myhash # 获取所有的 key
1) "i1"
2) "i2"
3) "i3"
127.0.0.1:6379> hvals myhash # 获取所有的值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379>
#################################################################################
incr # 自增
decr # 自减

127.0.0.1:6379> hset myhash file 4 # 指定增量
(integer) 1
127.0.0.1:6379> hincrby myhash file 1 # 自增1
(integer) 5
127.0.0.1:6379> hincrby myhash file -2 # 自增 -2
(integer) 3
127.0.0.1:6379> hsetnx myhash file3 he #  如果没有就创建
(integer) 1
127.0.0.1:6379> hsetnx myhash file3 he # 如果有就创建失败
(integer) 0
127.0.0.1:6379>

hash可以存储一些变更的数据,尤其是用户信息的保存,和经常变动的信息,hash更加适合存储对象 ,而String更加适合字符串的存储

Zset(有序集合)

在set的基础上,增加了一个值,set k1 v1 ,zset k1

#################################################################################
127.0.0.1:6379> zadd myset 1 one
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three
(integer) 2
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"
127.0.0.1:6379>

#################################################################################
127.0.0.1:6379> zadd salary 2000 xiaohong
(integer) 1
127.0.0.1:6379> zadd salary 2400 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 2100 ic
(integer) 1
#################################################################################
127.0.0.1:6379> zrangebyscore salary -inf +inf
1) "xiaohong"
2) "ic"
3) "zhangsan"
#################################################################################
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores
1) "xiaohong"
2) "2000"
3) "ic"
4) "2100"
5) "zhangsan"
6) "2400"
#################################################################################

127.0.0.1:6379> zadd salary 2000 xiaohong # 添加三个用户
(integer) 1
127.0.0.1:6379> zadd salary 2400 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 2100 ic
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> zrangebyscore salary -inf +inf # 从小到大排列 
1) "xiaohong"
2) "ic"
3) "zhangsan"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores # 从小到大排列,并且附带成绩
1) "xiaohong"
2) "2000"
3) "ic"
4) "2100"
5) "zhangsan"
6) "2400"

127.0.0.1:6379> ZREVRANGE salary 0 -1 # 从大到小排序
1) "zhangsan"
2) "xiaohong"
127.0.0.1:6379>

#################################################################################
移除 rem 中的元素
zrem 

127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "ic"
3) "zhangsan"
127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "zhangsan"
127.0.0.1:6379>
#################################################################################
获取有序集合中的个数

127.0.0.1:6379> zcard salary # 获取有序集合中的个数
(integer) 2
127.0.0.1:6379>
#################################################################################

127.0.0.1:6379> zadd myset 1 2
(integer) 1
127.0.0.1:6379> zadd myset 1 4
(integer) 1
127.0.0.1:6379> zadd myset 3 world 4 hello
(integer) 2
127.0.0.1:6379> zcount myset 1 3 # 获取指定区间的的成员的数量
(integer) 3
127.0.0.1:6379>

其余的API,剩下的可以去官方文档查看

案例思路:set 排序 存储班级成绩表,工资表排序

带权重进行判断:排行榜应用实现

三种特殊的数据类型

geospatial 地理位置

朋友的定位,附近的人,打车距离计算

Redis的Geo在Redis3.2就推出了

可以查询一些测试数据: http://www.ximizi.com/jingweidu.php

只有6个命令

getadd 添加地理位置

127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 39.53 chongqing
(integer) 1
127.0.0.1:6379> geoadd china:city 114.05 22.52 shenzhen
(integer) 1
127.0.0.1:6379>

参数:key 维度 精度 地址

规则:地球的两极无法添加,一般会下载城市数据,直接使用Java程序写入

geopos 获取指定位置的地理坐标

127.0.0.1:6379> geopos china:city beijing
1) 1) "116.39999896287918091"
   2) "39.90000009167092543"
127.0.0.1:6379>

geodist 返回两个给定位置之间的距离

  • m标识米
  • km标识千米
  • mi标识英里
  • ft标识英尺
127.0.0.1:6379> geodist china:city beijing shanghai # 查看北京到上海的距离
"1067378.7564" # km
127.0.0.1:6379>

我附近的人:获得所有的附近的人的地址,定位,通过半径来查询

georedius 以给定的经纬度为中心,找出某一半径的元素

127.0.0.1:6379> geodist china:city beijing shanghai
"1067378.7564"
127.0.0.1:6379>
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km
1) "shenzhen"
# 获取 110 30 经纬度的 在5000 km范围内的城市
127.0.0.1:6379> GEORADIUS china:city 110 30 5000 km
1) "chongqing"
2) "shenzhen"
3) "shanghai"
4) "beijing"
127.0.0.1:6379>

# 获取 110 30 经纬度的 在5000 km范围内的城市 并列出经纬度

127.0.0.1:6379> GEORADIUS china:city 110 30 5000 km withcoord 
1) 1) "chongqing"
   2) 1) "106.49999767541885376"
      2) "39.52999923987874098"
2) 1) "shenzhen"
   2) 1) "114.04999762773513794"
      2) "22.5200000879503861"
3) 1) "shanghai"
   2) 1) "121.47000163793563843"
      2) "31.22999903975783553"
4) 1) "beijing"
   2) 1) "116.39999896287918091"
      2) "39.90000009167092543"
127.0.0.1:6379>

# 获取 110 30 经纬度的 在5000 km范围内的城市 并列出经纬度 限制数量为3个

127.0.0.1:6379> GEORADIUS china:city 110 30 5000 km withcoord count 3
1) 1) "shenzhen"
   2) 1) "114.04999762773513794"
      2) "22.5200000879503861"
2) 1) "shanghai"
   2) 1) "121.47000163793563843"
      2) "31.22999903975783553"
3) 1) "chongqing"
   2) 1) "106.49999767541885376"
      2) "39.52999923987874098"
127.0.0.1:6379>

GEORADIUSBYMEMBER 找出位于指定元素周围的其他元素

# 找出位于指定元素周围的其他元素

127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1) "beijing"
2) "chongqing"
127.0.0.1:6379>

geohash 返回11个字符的字符串

127.0.0.1:6379> geohash china:city beijing
1) "wx4fbxxfke0"
127.0.0.1:6379>

如果两个字符串越想象,就越近

geo 底层的实现原理就是Zset,可以使用Zset命令执行geo

127.0.0.1:6379> ZRANGE china:city 0 -1 # 查看地图中全部元素
1) "chongqing"
2) "shenzhen"
3) "shanghai"
4) "beijing"
127.0.0.1:6379>
hyperloglog

什么是基数

A{1,3,4,5,6}

B{1,3,5,7,8}

基数(不重复的元素) = 5 可以接受误差

简介

Redis 2.8.9 版本就更新了这个数据结构

Redis Hyperloglog 基数统计的算法

优点:占用的内存是固定的 2 ^ 64 次方。只要想使用12kb内存,如果使用内存角度,这个是首选

网页的UV(一个人访问一个网站多次,但是还算是一个人)

传统的方式,set保存用户的 id ,然后就可以统计set 中的数量作为标准判断

0.81% 的错误率。,统计UV业务,是可以忽略不计的

127.0.0.1:6379> pfadd mykey a j g f k j h l h l l a s n w q y # 创建第一组元素
(integer) 1
127.0.0.1:6379> PFCOUNT mykey # 统计不重复元素
(integer) 12
127.0.0.1:6379> pfadd mykey2 a j g f k j h l h l l a s n w q y n a d2 fd a fdas
(integer) 1
127.0.0.1:6379> PFCOUNT mykey2
(integer) 15
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 # 合并两组 并集
OK
127.0.0.1:6379> PFCOUNT mykey3 # 并集
(integer) 15
127.0.0.1:6379>

如果不许容错,就不可以使用

bitmaps

位存储

统计疫情感染人数:0 1 0 1 0 1

统计用户信息,活跃,不活跃!登录,未登录

两个状态的都可以使用

Bitmaps 位图,数据结构,都是操作二进制来进行记录,就只有0和1两个概念

# 使用bitmap来记录周一到周日的打开
127.0.0.1:6379> setbit sign 0 0
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379>

查看某一天是否打卡

127.0.0.1:6379> getbit sign 4
(integer) 0
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 6
(integer) 0
127.0.0.1:6379> getbit sign 5
(integer) 0
127.0.0.1:6379> getbit sign 1
(integer) 1
127.0.0.1:6379>

统计打卡的天数(就是为1的天数)

127.0.0.1:6379> BITCOUNT sign # 统计打卡的天数
(integer) 3
127.0.0.1:6379>

事务

事务:要么同时成功,要么同时失败。

Redis单条命令是保存原子性的,但是事务不保持原子性

Redis事务本质:一组命令的集合,一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行

一次性、顺序性、排他性

----- 队列 set set set 执行 ------

Redis没有隔离级别的概念

所有的命令中没有直接被执行

redis的事务:

  • 开启事务(multi)
  • 命令入队(...)
  • 执行事务(exec)

锁:乐观锁

正常执行事务

127.0.0.1:6379> multi # 开启事务
OK
127.0.0.1:6379> set k1 v1 # 命令入队
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v2
QUEUED
127.0.0.1:6379> exec # 执行事务
1) OK
2) OK
3) OK
127.0.0.1:6379>

放弃执行事务 discard

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 k2
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> exec'
Invalid argument(s)
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI
127.0.0.1:6379>

每次执行完一次事务之后,就没了

编译型异常 代码又问题,命令有错,事务中所有的命令都不会执行

运行时异常 其他的命令是可以正常执行的 错误命令抛出异常

监控

悲观锁:

  • 认为什么时候都会出现问题,无论做什么都要加锁

乐观锁

  • 很乐观,认为什么时候都不会出现问题,所以不会上锁,更新数据的时候去判断一下,在此期间有没有人修改过这个数据
  • 获取version
  • 更新的时候比较version

redis 监测测试

正常执行成功

127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money # 监视 money对象
OK
127.0.0.1:6379> multi # 事务正常结束,数据期间没有发生变动,这个时候就正常执行成功
OK
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
127.0.0.1:6379>

执行之前,如何另外一个线程修改了值,那么就会执行失败

  • 如果发现事务执行失败,就先解锁unwatch
  • 再次获取锁 也就是获取最新的值
  • 执行的时候,比对监视的值有没有修改然后执行

Jedis

我们要使用Jedis来操作Redis

什么是Jedis 是Redis官方推荐的Java连接工具,使用Java操作Redis的中间件。如果你要使用Java操作Redis,那么一定要对Jedis十分熟悉

学习不能急躁,慢慢来!

测试

导入对应的依赖

<!-- 导入Jedis的包 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.2.0</version>
</dependency>
<!-- 导入 阿里的 fastjson 包 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.70</version>
</dependency>

编码测试

  • 连接数据库
  • 操作命令
  • 断开连接
public class TestRedis {
    public static void main(String[] args) {
        // 1. new 一个 Jedis 对象
        Jedis jedis = new Jedis("127.0.0.1",6379);
        // 2. redis 所有的命令都在 jedis 就是之前学习的命令
        System.out.println(jedis.ping());
    }
}
// 输出:PONG

常用的API

String、List、set、Hash、Zset、geospatial、hyperloglog、bitmaps

本质就是上述的命令

事务操作

public class TestTx {
    public static void main(String[] args) {
        // 1. new 一个 Jedis 对象
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        jedis.flushAll();
        Transaction multi = jedis.multi();
        String res = new JSONObject().toJSONString();
        jedis.watch(res);
        try {
            multi.set("user1", "passw1");
            int i = 1 / 0;
            multi.set("user2", "passw2");
            multi.set("user3", "passw3");

            // 执行事务
            multi.exec();
        } catch (Exception e) {
            // 放弃事务
            multi.discard();
            e.printStackTrace();
        } finally {
            // 关闭连接
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            System.out.println(jedis.get("user3"));
            jedis.close();
        }
    }
}

SpringBoot整合

SpringBoot 操作数据层 :Spring-data、jpa、jdbc、mongdb、redis

SpringData也是和SpringBoot齐名的项目

整合测试

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

在SpringBoot2.x之后 Jedis被替换为了lettuce

jedis:采用直连的方法,多个线程操作的话会有线程不安全问题,如果要避免数据不安全的问题,使用 jedis pool 连接池 BIO模式

lettuce:底层使用netty,实例可以在多个线程中共享,不存在线程不安全的情况下,可以减少线程数据了。NIO模式。

具体配置在

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

测试一下

原理-源码分析

// 如果不存在这个bean才生效,说明我们可以自定义一个
@EnableConfigurationProperties(RedisProperties.class) 
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {
    @Bean
    // 默认的RedisTemple没有很多的设置,redis对象都是需要序列化
    // 两个泛型都是Object类型,使用之后需要强转
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
        throws UnknownHostException {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean
    // 由于String是redis最常用的数据结构,所以就直接加入
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory)
        throws UnknownHostException {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

@ConfigurationProperties(prefix = "spring.redis")
public class RedisProperties {
}

配置properties

# 配置 Redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=1

测试

@SpringBootTest
class RedisBootApplicationTests {

    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void contextLoads() {
        // redisTemp 点出来,封装的 api 和原生操作是一样的
        // opsForValue 操作字符串 类似String
        // opsForSet 操作集合 类似set
        // opsForList 操作列表 类似 list
        // opsForHash 操作哈希 类似Hash
        System.out.println(redisTemplate);
        redisTemplate.opsForValue().set("ic",12);
        Object ic = redisTemplate.opsForValue().get("ic");
        System.out.println(ic);
        redisTemplate.opsForSet();
        redisTemplate.opsForList();
        redisTemplate.opsForHash();
        // 除了基本的操作 我们常用的方法都可以直接通过 redisTemplate 操作,
        // 比如事务和基本的CURD操作

        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        connection.flushDb();
        connection.flushAll();
        redisTemplate.multi();
        redisTemplate.exec();
    }
}
// RedisTemplate 序列化配置
@Nullable
private RedisSerializer keySerializer = null;
@Nullable
private RedisSerializer valueSerializer = null;
@Nullable
private RedisSerializer hashKeySerializer = null;
@Nullable
private RedisSerializer hashValueSerializer = null;

public void afterPropertiesSet() {
    super.afterPropertiesSet();
    boolean defaultUsed = false;
    if (this.defaultSerializer == null) {
        this.defaultSerializer = new JdkSerializationRedisSerializer(this.classLoader != null ? this.classLoader : this.getClass().getClassLoader());
    }
}
// 默认的序列化方法是通过 JDK 的方式序列化的
// 我们可能会使用JSON的方式来序列化

自定义RedisTemple

// Redis 配置类
@Configuration
public class RedisConfig {
    // 编写我们自己的配置 redisTemplate
    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
        throws UnknownHostException {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }
}

关于对象的保存

@Test
void test2() throws Exception{
    // 真实的开发一般都使用json来传递对象
    User user = new User("ic", "哈哈哈");
    // String res = new ObjectMapper().writeValueAsString(user);
    redisTemplate.opsForValue().set("user",user);
    System.out.println(redisTemplate.opsForValue().get("user"));
}
// 如果这个时候 User 没有实现 Serializable 接口,这个时候就会报错,如下
// Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using 

// 所以此处使用JSON传递数据就没问题
// 在企业中,我们的所有的 pojo 都会序列化
public interface RedisSerializer<T> {
	// Redis支持的序列化接口
}

1594863288675

核心配置类 RedisConfig 以后拿来即用

@Configuration
public class RedisConfig {

    /**
     * 编写我们自己的配置 redisTemplate
     * 这是写好的一个固定模板,拿到企业中就直接使用
     *
     * @param factory
     * @return
     * @throws UnknownHostException
     */
    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory)
        throws UnknownHostException {
        // 为了开发方便,一般直接使用 <String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        // 配置具体序列化规则
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        // 设置
        serializer.setObjectMapper(om);
        // String 的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key 采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash 的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value 序列化方式采用jackson
        template.setValueSerializer(serializer);
        // hash 的value序列化方法采用jackson
        template.setHashValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }
}

封装 RedisUtil.java

package cn.icanci.redis.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @Author: icanci
 * @ProjectName: ic-redis
 * @PackageName: cn.icanci.redis.utils
 * @Date: Created in 2020/7/16 10:12
 * @ClassAction: 在我们真实的分发中,或者你们在公司,一般都可以看到一个公司自己封装RedisUtil
 */
@Component
public final class RedisUtil {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // =============================common============================

    /**
     * 指定缓存失效时间
     *
     * @param key  键
     * @param time 时间(秒)
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }


    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }


    // ============================String=============================

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */

    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 普通缓存放入并设置时间
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */

    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 递增
     *
     * @param key   键
     * @param delta 要增加几(大于0)
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }


    /**
     * 递减
     *
     * @param key   键
     * @param delta 要减少几(小于0)
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }


    // ================================Map=================================

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key 键
     * @param map 对应多个键值
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * HashSet 并设置时间
     *
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key   键
     * @param item  项
     * @param value 值
     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }


    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }


    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key  键
     * @param item 项
     * @param by   要增加几(大于0)
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }


    /**
     * hash递减
     *
     * @param key  键
     * @param item 项
     * @param by   要减少记(小于0)
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }


    // ============================set=============================

    /**
     * 根据key获取Set中的所有值
     *
     * @param key 键
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key   键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将数据放入set缓存
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 将set数据放入缓存
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0)
                expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 获取set缓存的长度
     *
     * @param key 键
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 移除值为value的
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 移除的个数
     */

    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    // ===============================list=================================

    /**
     * 获取list缓存的内容
     *
     * @param key   键
     * @param start 开始
     * @param end   结束 0 到 -1代表所有值
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 获取list缓存的长度
     *
     * @param key 键
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }


    /**
     * 通过索引 获取list中的值
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }


    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */

    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */

    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

测试

@SpringBootTest
class RedisBootApplicationTests {

    @Autowired
    @Qualifier("redisTemplate")
    private RedisTemplate redisTemplate;

    @Autowired
    private RedisUtil redisUtil;
    @Test
    void contextLoads() {

        // 在企业开发的过程中,我们80%的情况下,都不会使用原生的方式去编写代码
        // 一般我们都去自己封装方法 RedisUtils


        // redisTemp 点出来,封装的 api 和原生操作是一样的
        // opsForValue 操作字符串 类似String
        // opsForSet 操作集合 类似set
        // opsForList 操作列表 类似 list
        // opsForHash 操作哈希 类似Hash
        System.out.println(redisTemplate);
        redisTemplate.opsForValue().set("ic",12);
        Object ic = redisTemplate.opsForValue().get("ic");
        System.out.println(ic);
        redisTemplate.opsForSet();
        redisTemplate.opsForList();
        redisTemplate.opsForHash();
        // 除了基本的操作 我们常用的方法都可以直接通过 redisTemplate 操作,
        // 比如事务和基本的CURD操作

        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        connection.flushDb();
        connection.flushAll();
        redisTemplate.multi();
        redisTemplate.exec();
    }

    @Test
    void test2() throws Exception{
        // 真实的开发一般都使用json来传递对象
        User user = new User("ic", "哈哈哈");
//        String res = new ObjectMapper().writeValueAsString(user);
        redisTemplate.opsForValue().set("user",user);
        System.out.println(redisTemplate.opsForValue().get("user"));
    }

    @Test
    void test3() throws Exception{
        redisUtil.set("ic","iiii");
        System.out.println(redisUtil.get("ic"));
    }
}

所有的Redis操作,其实对于Java开发人员来说,十分简单,更重要的是去理解Redis思想和每一种数据结构的用处和作用场景

Redis.conf 详解

启动的时候,就是通过配置文件来启动的

1594869955552

  • 配置文件 unit 单位对大小写命令不敏感

1594870088411

就是好比我们学习Spring、Import,include把文件包含进来

网络

bind 127.0.0.1 # 绑定的 ip 地址

protected-mode yes # 保护模式

port 6379 # 默认端口

通用设置

daemonize yes # 后台守护进程 以守护进程运行,默认是 no

pidfile /var/run/redis_6379.pid # 配置文件的 pid 文件 如果以后台方式运行 ,就要指定一个 pid 文件

# 日志

# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生产环境
# warning (only very important / critical messages are logged)
loglevel notice

# 日志的输出文件
# Specify the log file name. Also the empty string can be used to force
# Redis to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile "" 

# 默认有16个数据库 数据库的数量
databases 16

# 是否显示 开启的 logo 
always-show-logo yes

SNAPSHOTTING 快照

持久化操作 在规定的时间内,执行了多少次操作,就会持久化到文件 .rdb文件和.aof文件

持久化规则 Redis 是内存数据库,如果没有持久化,则断电就失去数据
# 如何900秒之内,如果至少有 1 个key进行了修改,就会执行持久化操作
save 900 1
# 如果在300秒之内,如果如果至少有 10 个key进行了修改,就会执行持久化操作
save 300 10
# 如果在60秒之内,如果如果至少有 10000 个key进行了修改,就会执行持久化操作
save 60 10000
# 之后会自定义进行测试

stop-writes-on-bgsave-error yes  # 持久化如果出错,是否还需要继续工作

rdbcompression yes # 是否压缩 rdb 文件,需要消耗一些 cpu 资源

rdbchecksum yes # 保存 rdb 文件的时候,进行错误的校验检查

dir ./ # rdb 文件的保存目录

REPLICATION 主从复制 后面进行讲解

SECURITY 安全 可以设置密码

默认是没有密码的

设置密码:requirepass icanci

也可使用密令设置密码:config set requirepass "icanci"

登录:auth icanci

CLIENTS 限制

maxclients 10000 # 最大的客户端连接数

maxmemory <bytes> # Redis 设置的最大内存 单位是 bytes

maxmemory-policy noeviction # 内存到达上限的操作

volatile-lru:从已设置过期时间的内存数据集中挑选最近最少使用的数据 淘汰;
volatile-ttl: 从已设置过期时间的内存数据集中挑选即将过期的数据 淘汰;
volatile-random:从已设置过期时间的内存数据集中任意挑选数据 淘汰;
allkeys-lru:从内存数据集中挑选最近最少使用的数据 淘汰;
allkeys-random:从数据集中任意挑选数据 淘汰;
noeviction(驱逐):禁止驱逐数据。(默认淘汰策略。当redis内存数据达到maxmemory,在该策略下,直接返回OOM错误)

APPEND ONLY MODE 模式 AOF在这里进行配置

appendonly no # 默认是不开启 aof 模式的,默认双击哦是使用rdb方式持久化的,在大部分情况下,rdb完全够用
appendfilename "appendonly.aof" # 持久化文件的名字

# appendfsync always # 每次修改都 sync 消耗性能
appendfsync everysec # 每秒执行一次 可能会丢失一秒的数据
# appendfsync no #  不执行同步 这个时候操作系统速度最快

Redis持久化

Redis 是内存数据库, 如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能

RDB(Redis Database)

在主从复制中,rdb 就是备用了

什么是 RDB

1594875313652

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是 Snapshot 快照,它恢复时是将快照文件直接读取到内存里

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中去,待持久化过程都结束了,在用这个临时文件替换上次持久化好的文件,整个过程中,主进程是不执行任何IO操作的。这就确保了极高的性能。如果需要进程大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改。

rdb 文件就是 dump.rdb

有时候在生产环境我们会将这个文件备份

1594875898349

触发规则

  • save的规则满足的情况下,会自动触发rdb规则

  • 执行了 flushall 命令。也会触发我们的rdb文件

  • 退出redis也会成 rdb 文件

如何恢复rdb文件 只要将 dump.rdb 文件 放在 redis 的 redis-server 启动目录下即可,注意要和配置文件的文件名称是一样的

查看需要存放文件的位置

127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" # 如果在这个目录下存在 dump.rdb 文件,那么在启动的时候就会恢复此文件的数据
127.0.0.1:6379>

几乎自己默认的配置就够用了

优点:

  • 适合大规模的数据恢复
  • 对数据的完整性要求不高

缺点:

  • 需要一定的时间间隔操作,如果 redis 意外挂机了,数据可能会丢失,最后一次修改数据就没了
  • fork进程的时候, 会占用一定的内存空间
AOP (Append Only File)

将所有的命令都记录下来,history,在恢复的时候,会把这个文件全部再执行一遍

是什么

1594878316658

以日志的形式记录每个写操作,将Redis执行过的所有命令记录下来(读操作不记录),只许追加文件但是不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就会根据日志文件的内容将指令从前到后执行一次以完成数据的恢复工作

AOF保存的是 appendonly.aof 文件

1594878911424

重写:rewrite 如果发现aof文件大于64m,太大了,fork 一个新的进程来将我们的文件进行重写

aof 默认就是无限制增加

1594881742293

默认是不开启的,我们需要手动开启进行配置

只要将 appendonly 改为 yes 就开启了

再重启的时候就开始了。

1594880795588

注意:如果 appendonly.aof 如果被破坏,那么就恢复不了

这个时候是aof文件有问题,redis启动不起来,我们要修复这个文件

redis给我门提供了修复的工具:redis-check-aof命令 redis-check-aof --fix 文件名

如果文件正常,就会重启成功

优点:

  • 每一次修改都同步,文件的完整性会更好
  • 默认是每秒同步一次
  • 从不同步,效率最高

缺点

  • 相对于数据文件来说,aof 远远大于rdb,修复的速度也比 rdb 慢
  • aof 运行效率也比 rdb 慢,所以rdb是默认的

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息

Redis 客户端可以订阅任意数量的频道

订阅/发布消息图

1594882241831

下图展示了频道 channel1,以及订阅这个频道的三个客户端 - client2、client5和client1 之间的关系

1594882461420

当有新消息通过 PUBLISH 命令发送给频道 channel1 时候,这个消息就会被发送给订阅它的三个客户端

1594882573818

命令

这些命令被广泛用于构建即时通信应用,比图网络聊天室(chatroom)和实时广播、实时提醒

1594882813523

测试

# 订阅者
127.0.0.1:6379> SUBSCRIBE ic # 订阅一个频道 ic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "ic"
3) (integer) 1 # 等待读取信息
1) "message" # 消息
2) "ic" # 哪个频道
3) "hello" # 消息的具体内容
1) "message"
2) "ic"
3) "hello2"
1) "message"
2) "ic"
3) "hello3"

# 发送者
[root@ali-icanci bin]# redis-cli -p 6379 # 发送消息到频道
127.0.0.1:6379> PUBLISH ic hello
(integer) 1
127.0.0.1:6379> PUBLISH ic hello2
(integer) 1
127.0.0.1:6379> PUBLISH ic hello3
(integer) 1
127.0.0.1:6379>

原理

Redis 是使用C实现的,通过分析Redis源码里面的 pubsub.c 文件,了解发布和订阅机制的底层实现,来加深对Redis的理解

Redis通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现发布 订阅功能

Redis 通过 PUBLISH 命令订阅某频道后,redis-server 里维护了一个字典,字典的关键字就是一个个 channel ,而字典的值则是以一个链表,链表中保存了所有订阅了这个channel的客户端,SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中

通过PUBLISH命令向订阅者发送消息。redis-server 会使用给定的频道作为键 ,在它维护的channel字典中查找订阅了channel所有的客户端链表,遍历这个链表,将消息发送给所有订阅者

Pub/Sub 从字面上理解就是发布(PUBLISH)和订阅(SubScribe),在Redis中,你可以设定某一个key值进行消息发布以及消息订阅,当一个key值上进行了消息发布的时候,所有订阅他的客户端都会收到相应的消息,这一功能最明显的用法就是作为实时消息系统。比如普通的即时聊天,群聊等功能。

使用场景

  • 实时消息系统
  • 实时聊天(频道当作聊天室,将信息回显给所有人即可)
  • 订阅,关注系统都是可以的

稍微复杂的场景

  • 消息中间件
  • kafka、等

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者成为从节点(slave/follower);数据的复制时单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。

主从复制的作用包括:

  • 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种冗余方式
  • 故障恢复:当主节点出现问题的时候,可以由从节点提供服务,实现快速的故障恢复,实际上是一种服务的冗余
  • 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载,可以大大提高 Redis 服务器的并发量
  • 高可用基础(集群):除了上述作用之外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是高可用的基础

一般来说,要将Redis运用于工程项目中,只是用一台 redis 是万万不能的,原因如下

  • 从结构上,单个 Redis 服务器回发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大
  • 从容量上,单个 Redis 服务器内存容量有限,就算一台Redis服务器内容容量为 256G,也不能将所有的内存用作 Redis内存存储,一般来说,单台Redis最大的内存不应该超过20G

电商网站上的产品,一般都是一次上传,无数次浏览的,说专业点就是”多读少写“

对于这样的场景,我们可以使用如下架构:

1594886479410

80% 的情况下都是在读操作,减缓服务器的压力,架构中经常使用

最低配置 一主二从

环境配置

只配置从库,不用配置主库

查看一些信息

127.0.0.1:6379> info replication  # 获取 当前库的信息
# Replication
role:master # 角色
connected_slaves:0 # 没有从机
master_replid:07094272497183fdc2392884bb2111fe70654996
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379>

复制三个配置文件,修改对应的信息

  • 端口号
  • pid 名字
  • log名字
  • rdb名字
  • 分别修改的端口为 6389 6380 6381 启动3个Redis服务如下:
1594888623175
一主二从

默认情况下,每台redis服务器都是主节点,我们一般情况下只用配置从机即可

认老大!一主(6379)二从(6380、6381)

SLAVEOF host port # 设置主机和端口

下图是主机信息:

1594889433351

# 主机的信息

127.0.0.1:6379> info replication
# Replication
role:master # 当前角色
connected_slaves:2 # 从机个数
slave0:ip=127.0.0.1,port=6380,state=online,offset=126,lag=0 # 从机1的信息
slave1:ip=127.0.0.1,port=6381,state=online,offset=126,lag=0 # 从机2的信息
master_replid:2aff04d8d964c198fcceb423f6168658353cea13
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:126
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:126
127.0.0.1:6379>

#从机的信息

127.0.0.1:6380> info replication
# Replication 
role:slave # 机器信息
master_host:127.0.0.1 # 主机地址
master_port:6379 # 主机 端口
master_link_status:up
master_last_io_seconds_ago:0
master_sync_in_progress:0
slave_repl_offset:14
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:2aff04d8d964c198fcceb423f6168658353cea13
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14
127.0.0.1:6380>

真实的主从配置应该在配置文件中配置,这样的永久的,这里使用的是命令,是暂时的,下图是配置文件永久配置,在Redis开启的时候就配置完毕

1594889971302

细节

主机可以设置值,从机不能设置值,从机只能读,

主机中的所有的数据信息都会同步到从机

测试

主机断开连接,从机依旧连接到主机,但是没有写操作,这个时候,主机如果回来了,从机依旧可以连接到主机,获取主机

如果是使用命令行配置主从,如果这个时候Reids重启了,就会变回主机。只要变为了从机,就立刻从主机获得值

复制原理

Slave 启动成功连接到 master 后会发送一个sync同步指令

Master 街道命令,启动后台的存盘进程,同时收集到所有接收到的修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到Slave,并完成一次同步

  • 全量复制 :Slave服务在接收到数据库文件之后,将其存盘加载在内存中
  • 增量复制:Master 继续将新的所有收集到的修改命令一次传给 slave ,完成同步

但是只要重新连接master ,一次完全同步(全量复制)将被自动执行,我们的数据一定可以在从机中看到

1594896489223

层层链路,如上图

工作中上面的都不会使用

如果没有老大了, 这个时候能不能选择出来一个老大出来呢?手动操作

谋权篡位

如果主机断了,可以使用这个命令完成切换自己为主机。其他节点就可以通过手动连接到这个主机(手动)

SLAVEOF no one 

如果这个时候老大重新配置了,那就只能重新配置

哨兵模式

自动选举老大模式

概述

主从切换技术的方法是:当主服务器宕机,需要手动把一台服务器切换为主服务器,这就需要人工干预,这很费力

还会造成一段时间之内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式,Redis2.8 开始就提供了Sebtinel 模式 (哨兵模式)来解决这个问题

谋权篡位的自动版本,能够后台监控主机是否故障,如果故障了可以根据投票数自动切换为主库

哨兵模式是一种特殊的模式,首先Redis提供了哨兵模式,哨兵是一个独立的进程,作为进程,会独立运行,其原理是哨兵通过发送命令,等待redis服务器响应,从而监控运行的多个Redis实例

1594897665531

Redis缓存穿透和雪崩

Redis 缓存的作用,极大的提升了在应用程序的性能和效率,特别是数据查询方面,但同时,也带了一些问题。其中最要害的问题,就是数据的一致性问题,严格意义上讲,这个问题无解,如何对数据一致性要求很高,就不能使用缓存

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案

缓存穿透

概念

缓存穿透的概念很简单,用户想要查询一个数据,发现Redis数据库没有,也就是缓存没有命中,玉树向持久层数据库查询,发现也没有,于是本次查询失败,当用户很多的时候,缓存没有命中,于是都请求可数据库,这就会给持久层带来很大的压力,这个时候就出现可缓存击穿

解决方案

布隆过滤器

对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

img

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

但是这种方法会存在两个问题:

  • 如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;

  • 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿

量太大,缓存过期

解决

  • 设置热点数据永不过期

  • 加互斥锁

缓存雪崩

缓存雪崩是指,由于缓存层承载着大量请求,有效的保护了存储层,但是如果缓存层由于某些原因整体不能提供服务,于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

img

解决方案

保证缓存层服务高可用性

即使个别节点、个别机器、甚至是机房宕掉,依然可以提供服务,比如 Redis Sentinel 和 Redis Cluster 都实现了高可用。

依赖隔离组件为后端限流并降级

在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。

数据预热

可以通过缓存reload机制,预先去更新缓存,再即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

缓存并发

缓存并发是指,高并发场景下同时大量查询过期的key值、最后查询数据库将缓存结果回写到缓存、造成数据库压力过大

分布式锁

在缓存更新或者过期的情况下,先获取锁,在进行更新或者从数据库中获取数据后,再释放锁,需要一定的时间等待,就可以从缓存中继续获取数据。

一些解决方案
Redis高可用
  • redis 集群
限流降级
数据预热
1
https://gitee.com/elonchung/Java-Review.git
git@gitee.com:elonchung/Java-Review.git
elonchung
Java-Review
Java-Review
master

搜索帮助