11-15节 实践篇 —— 数据结构
11-“万金油”的String,为什么不好用了?
背景:要开发一个图片存储系统,要求这个系统能快速地记录图片ID和图片在存储系统中保存时的ID(可以直接叫作图片存储对象ID)。同时,还要能够根据图片ID快速查找到图片存储对象ID。
第一个方案是:把图片ID和图片存储对象ID分别作为键值对的key和value来保存,其中,图片存储对象ID用了String类型。
为什么String类型内存开销大?
关键词:【SDS结构体、RedisObject、内存分配库jemalloc】
我们来分析一下。图片ID和图片存储对象ID都是10位数,我们可以用两个8字节的Long类型表示这两个ID。因为8字节的Long类型最大可以表示2的64次方的数值,所以肯定可以表示10位数。但是,为什么String类型却用了64字节呢?
除了记录实际数据,String类型还需要额外的内存空间记录数据长度、空间使用等信息,这些信息也叫作元数据。当实际保存的数据较小时,元数据的空间开销就显得比较大了,有点“喧宾夺主”的意思。
当你保存64位有符号整数时,String类型会把它保存为一个8字节的Long类型整数,这种保存方式通常也叫作int编码方式。
但是,当你保存的数据中包含字符时,String类型就会用简单动态字符串(Simple Dynamic String,SDS)结构体来保存,如下图所示:
- buf:字节数组,保存实际数据。为了表示字节数组的结束,Redis会自动在数组最后加一个“\0”,这就会额外占用1个字节的开销。
- len:占4个字节,表示buf的已用长度。
- alloc:也占个4字节,表示buf的实际分配长度,一般大于len。
可以看到,在SDS中,buf保存实际数据,而len和alloc本身其实是SDS结构体的额外开销。
不同数据类型都有些相同的元数据要记录(比如最后一次访问的时间、被引用的次数等),所以,Redis会用一个RedisObject结构体来统一记录这些元数据,同时指向实际数据。
为了节省内存空间,Redis还对Long类型整数和SDS的内存布局做了专门的设计。
- 一方面,当保存的是Long类型整数时,RedisObject中的指针就直接赋值为整数数据了,这样就不用额外的指针再指向整数了,节省了指针的空间开销。
- 另一方面,当保存的是字符串数据,并且字符串小于等于44字节时,RedisObject中的元数据、指针和SDS是一块连续的内存区域,这样就可以避免内存碎片。这种布局方式也被称为embstr编码方式。
- 当然,当字符串大于44字节时,SDS的数据量就开始变多了,Redis就不再把SDS和RedisObject布局在一起了,而是会给SDS分配独立的空间,并用指针指向SDS结构。这种布局方式被称为raw编码模式。
为了帮助你理解int、embstr和raw这三种编码模式,我画了一张示意图,如下所示:
Redis会使用一个全局哈希表保存所有键值对,哈希表的每一项是一个dictEntry的结构体,用来指向一个键值对。dictEntry结构中有三个8字节的指针,分别指向key、value以及下一个dictEntry,三个指针共24字节,如下图所示:
这三个指针只有24字节,为什么会占用了32字节呢?这就要提到Redis使用的内存分配库jemalloc了。
jemalloc在分配内存时,会根据我们申请的字节数N,找一个比N大,但是最接近N的2的幂次数作为分配的空间,这样可以减少频繁分配的次数。
用什么数据结构可以节省内存?
我们先回顾下**压缩列表(ziplist)**的构成。表头有三个字段zlbytes、zltail和zllen,分别表示列表长度、列表尾的偏移量,以及列表中的entry个数。压缩列表尾还有一个zlend,表示列表结束。
压缩列表之所以能节省内存,就在于它是用一系列连续的entry保存数据。每个entry的元数据包括下面几部分。
- prev_len,表示前一个entry的长度。prev_len有两种取值情况:1字节或5字节。取值1字节时,表示上一个entry的长度小于254字节。虽然1字节的值能表示的数值范围是0到255,但是压缩列表中zlend的取值默认是255,因此,就默认用255表示整个压缩列表的结束,其他表示长度的地方就不能再用255这个值了。所以,当上一个entry长度小于254字节时,prev_len取值为1字节,否则,就取值为5字节。
- len:表示自身长度,4字节;
- encoding:表示编码方式,1字节;
- content:保存实际数据。
如何用集合类型保存单值的键值对?
在保存单值的键值对时,可以采用基于Hash类型的二级编码方法。这里说的二级编码,就是把一个单值的数据拆分成两部分,前一部分作为Hash集合的key,后一部分作为Hash集合的value,这样一来,我们就可以把单值数据保存到Hash集合中了。
以图片ID 1101000060和图片存储对象ID 3302000080为例,我们可以把图片ID的前7位(1101000)作为Hash类型的键,把图片ID的最后3位(060)和图片存储对象ID分别作为Hash类型值中的key和value。
1 | hset 1101000 060 3302000080 |
“二级编码一定要把图片ID的前7位作为Hash类型的键,把最后3位作为Hash类型值中的key吗?”其实,二级编码方法中采用的ID长度是有讲究的。
Redis Hash类型的两种底层实现结构,分别是压缩列表和哈希表。
那么,Hash类型底层结构什么时候使用压缩列表,什么时候使用哈希表呢?其实,Hash类型设置了用压缩列表保存数据时的两个阈值,一旦超过了阈值,Hash类型就会用哈希表来保存数据了。
这两个阈值分别对应以下两个配置项:
- hash-max-ziplist-entries:表示用压缩列表保存时哈希集合中的最大元素个数。
- hash-max-ziplist-value:表示用压缩列表保存时哈希集合中单个元素的最大长度。
如果我们往Hash集合中写入的元素个数超过了hash-max-ziplist-entries,或者写入的单个元素大小超过了hash-max-ziplist-value,Redis就会自动把Hash类型的实现结构由压缩列表转为哈希表。
一旦从压缩列表转为了哈希表,Hash类型就会一直用哈希表进行保存,而不会再转回压缩列表了。在节省内存空间方面,哈希表就没有压缩列表那么高效了。
为了能充分使用压缩列表的精简内存布局,我们一般要控制保存在Hash集合中的元素个数。 所以,在刚才的二级编码中,我们只用图片ID最后3位作为Hash集合的key,也就保证了Hash集合的元素个数不超过1000,同时,我们把hash-max-ziplist-entries设置为1000,这样一来,Hash集合就可以一直使用压缩列表来节省内存空间了。
所以在选用Hash和Sorted Set存储时,意味着把Redis当做数据库使用,这样就需要务必保证Redis的可靠性(做好备份、主从副本),防止实例宕机引发数据丢失的风险。而采用String存储时,可以把Redis当做缓存使用,每个key设置过期时间,同时设置maxmemory和淘汰策略,控制整个实例的内存上限,这种方案需要在数据库层(例如MySQL)也存储一份映射关系,当Redis中的缓存过期或被淘汰时,需要从数据库中重新查询重建缓存,同时需要保证数据库和缓存的一致性,这些逻辑也需要编写业务代码实现。
12-有一亿个keys要统计,应该用哪种集合?
要想选择合适的集合,我们就得了解常用的集合统计模式。 这节课,我就给你介绍集合类型常见的四种统计模式,包括聚合统计、排序统计、二值状态统计和基数统计。我会以刚刚提到的这四个场景为例,和你聊聊在这些统计模式下,什么集合类型能够更快速地完成统计,而且还节省内存空间。
聚合统计
我们先来看集合元素统计的第一个场景:聚合统计。
所谓的聚合统计,就是指统计多个集合元素的聚合结果。(交集、差集、并集)
Set的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致Redis实例阻塞。所以,我给你分享一个小建议:你可以从主从集群中选择一个从库,让它专门负责聚合计算,或者是把数据读取到客户端,在客户端来完成聚合统计,这样就可以规避阻塞主库实例和其他从库实例的风险了。
排序统计
List是按照元素进入List的顺序进行排序的,而Sorted Set可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入Sorted Set的时间确定权重值,先插入的元素权重小,后插入的元素权重大。
我先说说用List的情况。每个商品对应一个List,这个List包含了对这个商品的所有评论,而且会按照评论时间保存这些评论,每来一个新评论,就用LPUSH命令把它插入List的队头。
如果在获取第二页的评论时,插入的新的评论,通过 LRANGE product1 3 5 指令获取的数据可能会不准确。
在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显示,建议你优先考虑使用Sorted Set。
二值状态统计
这里的二值状态就是指集合元素的取值就只有0和1两种。这个时候,我们就可以选择Bitmap。这是Redis提供的扩展数据类型。我来给你解释一下它的实现原理。
Bitmap本身是用String类型作为底层数据结构实现的一种统计二值状态的数据类型。String类型是会保存为二进制的字节数组,所以,Redis就把字节数组的每个bit位利用起来,用来表示一个元素的二值状态。你可以把Bitmap看作是一个bit数组。
Bitmap提供了GETBIT/SETBIT操作,使用一个偏移值offset对bit数组的某一个bit位进行读和写。不过,需要注意的是,Bitmap的偏移量是从0开始算的,也就是说offset的最小值是0。当使用SETBIT对一个bit位进行写操作时,这个bit位会被设置为1。Bitmap还提供了BITCOUNT操作,用来统计这个bit数组中所有“1”的个数。
在统计1亿个用户连续10天的签到情况时,你可以把每天的日期作为key,每个key对应一个1亿位的Bitmap,每一个bit对应一个用户当天的签到情况。
接下来,我们对10个Bitmap做 “与”操作,得到的结果也是一个Bitmap。在这个Bitmap中,只有10天都签到的用户对应的bit位上的值才会是1。最后,我们可以用BITCOUNT统计下Bitmap中的1的个数,这就是连续签到10天的用户总数了。
基数统计
基数统计就是指统计一个集合中不重复的元素个数。对应到我们刚才介绍的场景中,就是统计网页的UV。
使用Set:用SCARD命令,这个命令会返回一个集合中的元素个数。
使用Hash:用HLEN命令统计Hash集合中的所有元素个数。
节省内存 ——> 用到Redis提供的HyperLogLog了
HyperLogLog是一种用于统计基数的数据集合类型,它的最大优势就在于,当集合元素数量非常多时,它计算基数所需的空间总是固定的,而且还很小。
在Redis中,每个 HyperLogLog只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数。你看,和元素越多就越耗费内存的Set和Hash类型相比,HyperLogLog就非常节省空间。
在统计UV时,你可以用PFADD命令(用于向HyperLogLog中添加新元素)把访问页面的每个用户都添加到HyperLogLog中。
1 | PFADD page1:uv user1 user2 user3 user4 user5 |
接下来,就可以用PFCOUNT命令直接获得page1的UV值了,这个命令的作用就是返回HyperLogLog的统计结果。【具体原理不需要重点掌握】
有一点需要你注意一下,HyperLogLog的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是0.81%。如果你需要精确统计结果的话,最好还是继续用Set或Hash类型。
小结
我把Set、Sorted Set、Hash、List、Bitmap、HyperLogLog的支持情况和优缺点汇总在了下面的表格里,希望你把这张表格保存下来,时不时地复习一下。
13-GEO是什么?还可以定义新的数据类型吗?
面向LBS应用的GEO数据类型
LBS应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO就非常适合应用在LBS服务的场景中
GEO的底层结构
一般来说,在设计一个数据类型的底层结构时,我们首先需要知道,要处理的数据有什么访问特点。 所以,我们需要先搞清楚位置信息到底是怎么被存取的。
实际上,GEO类型的底层数据结构就是用Sorted Set来实现的
这时问题来了,Sorted Set元素的权重分数是一个浮点数(float类型),而一组经纬度包含的是经度和纬度两个值,是没法直接保存为一个浮点数的,那具体该怎么进行保存呢?
用Sorted Set来保存车辆的经纬度信息时,Sorted Set的元素是车辆ID,元素的权重分数是经纬度信息,如下图所示:
这就要用到GEO类型中的GeoHash编码了。
GeoHash的编码方法
为了能高效地对经纬度进行比较,Redis采用了业界广泛使用的GeoHash编码方法,这个方法的基本原理就是“二分区间,区间编码”。
做完5次分区后,我们把经度值116.37定位在[112.5, 123.75]这个区间,并且得到了经度值的5位编码值,即11010。这个编码过程如下表所示:
我们刚刚计算的经纬度(116.37,39.86)的各自编码值是11010和10111,组合之后,第0位是经度的第0位1,第1位是纬度的第0位1,第2位是经度的第1位1,第3位是纬度的第1位0,以此类推,就能得到最终编码值1110011101,如下图所示:
我要提醒你一句,有的编码值虽然在大小上接近,但实际对应的方格却距离比较远。例如,我们用4位来做GeoHash编码,把经度区间[-180,180]和纬度区间[-90,90]各分成了4个分区,一共16个分区,对应了16个方格。编码值为0111和1000的两个方格就离得比较远,如下图所示:
所以,为了避免查询不准确问题,我们可以同时查询给定经纬度所在的方格周围的4个或8个方格。
GEO类型是把经纬度所在的区间编码作为Sorted Set中元素的权重分数,把和经纬度相关的车辆ID作为Sorted Set中元素本身的值保存下来,这样相邻经纬度的查询就可以通过编码值的大小范围查询来实现了。
如何操作GEO类型?
- GEOADD命令:用于把一组经纬度信息和相对应的一个ID记录到GEO类型集合中;
- GEORADIUS命令:会根据输入的经纬度位置,查找以这个经纬度为中心的一定范围内的其他元素。当然,我们可以自己定义这个范围。
限定返回数量
1 | GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10 |
如何自定义数据类型?
Redis的基本对象结构
RedisObject的内部组成包括了type、encoding、lru和refcount 4个元数据,以及1个*ptr指针。
- type:表示值的类型,涵盖了我们前面学习的五大基本类型;
- encoding:是值的编码方式,用来表示Redis中实现各个基本类型的底层数据结构,例如SDS、压缩列表、哈希表、跳表等;
- lru:记录了这个对象最后一次被访问的时间,用于淘汰过期的键值对;
- refcount:记录了对象的引用计数;
- *ptr:是指向数据的指针。
我们在定义了新的数据类型后,也只要在RedisObject中设置好新类型的type和encoding,再用*ptr指向新类型的实现,就行了。
开发一个新的数据类型
接下来,我以开发一个名字叫作NewTypeObject的新数据类型为例,来解释下具体的4个操作步骤。
第一步:定义新数据类型的底层结构
1 | struct NewTypeObject { |
NewTypeObject类型的底层结构其实就是一个Long类型的单向链表。
第二步:在RedisObject的type属性中,增加这个新类型的定义
这个定义是在Redis的server.h文件中。比如,我们增加一个叫作OBJ_NEWTYPE的宏定义,用来在代码中指代NewTypeObject这个新类型。
1 | #define OBJ_STRING 0 /* String object. */ |
第三步:开发新类型的创建和释放函数
Redis把数据类型的创建和释放函数都定义在了object.c文件中。所以,我们可以在这个文件中增加NewTypeObject的创建函数createNewTypeObject,如下所示:
1 | robj *createNewTypeObject(void){ |
createNewTypeObject分别调用了newtypeNew和createObject两个函数,我分别来介绍下。
先说newtypeNew函数。它是用来为新数据类型初始化内存结构的。这个初始化过程主要是用zmalloc做底层结构分配空间,以便写入数据。
1 | NewTypeObject *newtypeNew(void){ |
newtypeNew函数涉及到新数据类型的具体创建,而Redis默认会为每个数据类型定义一个单独文件,实现这个类型的创建和命令操作,例如,t_string.c和t_list.c分别对应String和List类型。按照Redis的惯例,我们就把newtypeNew函数定义在名为t_newtype.c的文件中。
createObject是Redis本身提供的RedisObject创建函数,它的参数是数据类型的type和指向数据类型实现的指针*ptr。
我们给createObject函数中传入了两个参数,分别是新类型的type值OBJ_NEWTYPE,以及指向一个初始化过的NewTypeObjec的指针。这样一来,创建的RedisObject就能指向我们自定义的新数据类型了。
1 | robj *createObject(int type, void *ptr) { |
对于释放函数来说,它是创建函数的反过程,是用zfree命令把新结构的内存空间释放掉。
第四步:开发新类型的命令操作
简单来说,增加相应的命令操作的过程可以分成三小步:
1.在t_newtype.c文件中增加命令操作的实现。比如说,我们定义ntinsertCommand函数,由它实现对NewTypeObject单向链表的插入操作:
1 | void ntinsertCommand(client *c){ |
2.在server.h文件中,声明我们已经实现的命令,以便在server.c文件引用这个命令,例如:
1 | void ntinsertCommand(client *c) |
3.在server.c文件中的redisCommandTable里面,把新增命令和实现函数关联起来。例如,新增的ntinsert命令由ntinsertCommand函数实现,我们就可以用ntinsert命令给NewTypeObject数据类型插入元素了。
1 | struct redisCommand redisCommandTable[] = { |
此时,我们就完成了一个自定义的NewTypeObject数据类型,可以实现基本的命令操作了。当然,如果你还希望新的数据类型能被持久化保存,我们还需要在Redis的RDB和AOF模块中增加对新数据类型进行持久化保存的代码。
14-如何在Redis中保存时间序列数据?
这些与发生时间相关的一组数据,就是时间序列数据。这些数据的特点是没有严格的关系模型,记录的信息可以表示成键和值的关系(例如,一个设备ID对应一条记录),所以,并不需要专门用关系型数据库(例如MySQL)来保存。
时间序列数据的读写特点
在实际应用中,时间序列数据通常是持续高并发写入的,例如,需要连续记录数万个设备的实时状态值。同时,时间序列数据的写入主要就是插入新数据,而不是更新一个已存在的数据【状态值,通常不会变】
这种数据的写入特点很简单,就是插入数据快,这就要求我们选择的数据类型,在进行数据插入时,复杂度要低,尽量不要阻塞。
那我们再看看,时间序列数据的“读”操作有什么特点。【单点查询、聚合查询、范围查询】
我们在查询时间序列数据时,既有对单条记录的查询(例如查询某个设备在某一个时刻的运行状态信息,对应的就是这个设备的一条记录),也有对某个时间范围内的数据的查询(例如每天早上8点到10点的所有设备的状态信息)。
除此之外,还有一些更复杂的查询,比如对某个时间范围内的数据做聚合计算。这里的聚合计算,就是对符合查询条件的所有数据做计算,包括计算均值、最大/最小值、求和等。例如,我们要计算某个时间段内的设备压力的最大值,来判断是否有故障发生。
那用一个词概括时间序列数据的“读”,就是查询模式多。
Redis提供了保存时间序列数据的两种方案,分别可以基于Hash和Sorted Set实现,以及基于RedisTimeSeries模块实现。
基于Hash和Sorted Set保存时间序列数据
为什么保存时间序列数据,要同时使用这两种类型?这是我们要回答的第一个问题。
可以看下用Hash集合记录设备的温度值的示意图:
用Hash类型来实现单键的查询很简单。但是,Hash类型有个短板:它并不支持对数据进行范围查询。
使用Sorted Set保存数据后,我们就可以使用ZRANGEBYSCORE命令,按照输入的最大时间戳和最小时间戳来查询这个时间范围内的温度值了。如下所示,我们来查询一下在2020年8月3日9点7分到9点10分间的所有温度值:
1 | ZRANGEBYSCORE device:temperature 202008030907 202008030910 |
我们又会面临一个新的问题,也就是我们要解答的第二个问题:如何保证写入Hash和Sorted Set是一个原子性的操作呢?
那Redis是怎么保证原子性操作的呢?这里就涉及到了Redis用来实现简单的事务的MULTI和EXEC命令。当多个命令及其参数本身无误时,MULTI和EXEC命令可以保证执行这些命令时的原子性。这节课,我们只要了解一下MULTI和EXEC这两个命令的使用方法就行了。
- MULTI命令:表示一系列原子性操作的开始。收到这个命令后,Redis就知道,接下来再收到的命令需要放到一个内部队列中,后续一起执行,保证原子性。
- EXEC命令:表示一系列原子性操作的结束。一旦Redis收到了这个命令,就表示所有要保证原子性的命令操作都已经发送完成了。此时,Redis开始执行刚才放到内部队列中的所有命令操作。接下来,我们需要继续解决第三个问题:如何对时间序列数据进行聚合计算?
1
2
3
4
5
6
7
8
9
10
11
12127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> HSET device:temperature 202008030911 26.8
QUEUED
127.0.0.1:6379> ZADD device:temperature 202008030911 26.8
QUEUED
127.0.0.1:6379> EXEC
1) (integer) 1
2) (integer) 1
聚合计算一般被用来周期性地统计时间窗口内的数据汇总状态,在实时监控与预警等场景下会频繁执行。
因为Sorted Set只支持范围查询,无法直接进行聚合计算,所以,我们只能先把时间范围内的数据取回到客户端,然后在客户端自行完成聚合计算。这个方法虽然能完成聚合计算,但是会带来一定的潜在风险,也就是大量数据在Redis实例和客户端间频繁传输,这会和其他操作命令竞争网络资源,导致其他操作变慢。
为了避免客户端和Redis实例间频繁的大量数据传输,我们可以使用RedisTimeSeries来保存时间序列数据。
RedisTimeSeries支持直接在Redis实例上进行聚合计算。如果我们需要进行大量的聚合计算,同时网络带宽条件不是太好时,Hash和Sorted Set的组合就不太适合了。此时,使用RedisTimeSeries就更加合适一些。
基于RedisTimeSeries模块保存时间序列数据
RedisTimeSeries是Redis的一个扩展模块。它专门面向时间序列数据提供了数据类型和访问接口,并且支持在Redis实例上直接对数据进行按时间范围的聚合计算。
因为RedisTimeSeries不属于Redis的内建功能模块,在使用时,我们需要先把它的源码单独编译成动态链接库redistimeseries.so,再使用loadmodule命令进行加载,如下所示:
1 | loadmodule redistimeseries.so |
当用于时间序列数据存取时,RedisTimeSeries的操作主要有5个:
- 用TS.CREATE命令创建时间序列数据集合;
- 用TS.ADD命令插入数据;
- 用TS.GET命令读取最新数据;
- 用TS.MGET命令按标签过滤查询数据集合;
- 用TS.RANGE支持聚合计算的范围查询。
1.用TS.CREATE命令创建一个时间序列数据集合
创建一个key为device:temperature、数据有效期为600s的时间序列数据集合。也就是说,这个集合中的数据创建了600s后,就会被自动删除。最后,我们给这个集合设置了一个标签属性{device_id:1},表明这个数据集合中记录的是属于设备ID号为1的数据。
1 | TS.CREATE device:temperature RETENTION 600000 LABELS device_id 1 |
2.用TS.ADD命令插入数据,用TS.GET命令读取最新数据
1 | TS.ADD device:temperature 1596416700 25.1 |
3.用TS.MGET命令按标签过滤查询数据集合
在使用TS.CREATE创建数据集合时,我们可以给集合设置标签属性。当我们进行查询时,就可以在查询条件中对集合标签属性进行匹配,最后的查询结果里只返回匹配上的集合中的最新数据。
举个例子。假设我们一共用4个集合为4个设备保存时间序列数据,设备的ID号是1、2、3、4,我们在创建数据集合时,把device_id设置为每个集合的标签。此时,我们就可以使用下列TS.MGET命令,以及FILTER设置(这个配置项用来设置集合标签的过滤条件),查询device_id不等于2的所有其他设备的数据集合,并返回各自集合中的最新的一条数据。
1 | TS.MGET FILTER device_id!=2 |
4.用TS.RANGE支持需要聚合计算的范围查询
最后,在对时间序列数据进行聚合计算时,我们可以使用TS.RANGE命令指定要查询的数据的时间范围,同时用AGGREGATION参数指定要执行的聚合计算类型。RedisTimeSeries支持的聚合计算类型很丰富,包括求均值(avg)、求最大/最小值(max/min),求和(sum)等。
例如,在执行下列命令时,我们就可以按照每180s的时间窗口,对2020年8月3日9时5分和2020年8月3日9时12分这段时间内的数据进行均值计算了。
1 | TS.RANGE device:temperature 1596416700 1596417120 AGGREGATION avg 180000 |
与使用Hash和Sorted Set来保存时间序列数据相比,RedisTimeSeries是专门为时间序列数据访问设计的扩展模块,能支持在Redis实例上直接进行聚合计算,以及按标签属性过滤查询数据集合,当我们需要频繁进行聚合计算,以及从大量集合中筛选出特定设备或用户的数据集合时,RedisTimeSeries就可以发挥优势了。
课后问题
15-消息队列的考验:Redis有哪些解决方案?
“Redis适合做消息队列吗?” 其实,这个问题的背后,隐含着两方面的核心问题:
- 消息队列的消息存取需求是什么?
- Redis如何实现消息队列的需求?
消息队列的消息存取需求
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
需求一:消息保序
需求二:重复消息处理
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。
需求三:消息可靠性保证
另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
Redis的List和Streams两种数据类型,就可以满足消息队列的这三个需求。我们先来了解下基于List的消息队列实现方法。
基于List的消息队列解决方案
List本身就是按先进先出的顺序对数据进行存取的,所以,如果使用List作为消息队列保存消息的话,就已经能满足消息保序的需求了。
具体来说,生产者可以使用LPUSH命令把要发送的消息依次写入List,而消费者则可以使用RPOP命令,从List的另一端按照消息的写入顺序,依次读取消息并进行处理。
消费者想及时消费,需要不停地调用RPOP命令 ——> Redis提供了BRPOP命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。 和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。
消息保序的问题解决了,接下来,我们还需要考虑解决重复消息处理的问题,这里其实有一个要求:消费者程序本身能对重复消息进行判断。
一方面,消息队列要能给每一个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息的ID号记录下来。
为了留存消息,List类型提供了BRPOPLPUSH命令,这个命令的作用是让消费者程序从一个List中读取消息,同时,Redis会把这个消息再插入到另一个List(可以叫作备份List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份List中重新读取消息并进行处理了。
我们还可能遇到过一个问题:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致List中的消息越积越多,给Redis的内存带来很大压力。
这个时候,我们希望启动多个消费者程序组成一个消费组,一起分担处理List中的消息。但是,List类型并不支持消费组的实现。那么,还有没有更合适的解决方案呢?这就要说到Redis从5.0版本开始提供的Streams数据类型了。
基于Streams的消息队列解决方案
Streams是Redis专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
- XADD:插入消息,保证有序,可以自动生成全局唯一ID;
- XREAD:用于读取消息,可以按ID读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING和XACK:XPENDING命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而XACK命令用于向消息队列确认消息处理已完成。
首先,我们来学习下Streams类型存取消息的操作XADD。
XADD命令可以往消息队列中插入新消息,消息的格式是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一的ID。
比如说,我们执行下面的命令,就可以往名称为mqstream的消息队列中插入一条消息,消息的键是repo,值是5。其中,消息队列名称后面的,表示让Redis为插入的数据自动生成一个全局唯一的ID,例如“1599203861727-0”。当然,我们也可以不用,直接在消息队列名称后自行设定一个ID号,只要保证这个ID号是全局唯一的就行。不过,相比自行设定ID号,使用*会更加方便高效。
1 | XADD mqstream * repo 5 |
可以看到,消息的全局唯一ID由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从0开始编号的。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第1条消息。
当消费者需要读取消息时,可以直接使用XREAD命令从消息队列中读取。
XREAD在读取消息时,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。
例如,我们可以执行下面的命令,从ID号为1599203861727-0的消息开始,读取后续的所有消息(示例中一共3条)。
1 | XREAD BLOCK 100 STREAMS mqstream 1599203861727-0 |
另外,消费者也可以在调用XRAED时设定block配置项,实现类似于BRPOP的阻塞读取操作。当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞,阻塞的时长可以在block配置项进行设置。
举个例子,我们来看一下下面的命令,其中,命令最后的“$”符号表示读取最新的消息,同时,我们设置了block 10000的配置项,10000的单位是毫秒,表明XREAD在读取最新消息时,如果没有消息到来,XREAD将阻塞10000毫秒(即10秒),然后再返回。下面命令中的XREAD执行后,消息队列mqstream中一直没有消息,所以,XREAD在10秒后返回空值(nil)。
1 | XREAD block 10000 streams mqstream $ |
刚刚讲到的这些操作是List也支持的,接下来,我们再来学习下Streams特有的功能。
Streams本身可以使用XGROUP创建消费组,创建消费组之后,Streams可以使用XREADGROUP命令让消费组内的消费者读取消息,
例如,我们执行下面的命令,创建一个名为group1的消费组,这个消费组消费的消息队列是mqstream。
1 | XGROUP create mqstream group1 0 |
然后,我们再执行一段命令,让group1消费组里的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams会自动使用内部队列(也称为PENDING List) 留存消费组里每个消费者读取的消息,直到消费者使用XACK命令通知Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给Streams发送XACK命令,消息仍然会留存。此时,消费者可以在重启后,用XPENDING命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下group2中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING返回结果的第二、三行分别表示group2中所有消费者读取的消息最小ID和最大ID。
1 | XPENDING mqstream group2 |
如果我们还需要进一步查看某个消费者具体读取了哪些数据,可以执行下面的命令:
1 | XPENDING mqstream group2 - + 10 consumer2 |
可以看到,consumer2已读取的消息的ID是1599274912765-0。
一旦消息1599274912765-0被consumer2处理了,consumer2就可以使用XACK命令通知Streams,然后这条消息就会被删除。当我们再使用XPENDING命令查看时,就可以看到,consumer2已经没有已读取、但尚未确认处理的消息了。
1 | XACK mqstream group2 1599274912765-0 |
现在,我们就知道了用Streams实现消息队列的方法,我还想再强调下,Streams是Redis 5.0专门针对消息队列场景设计的数据类型,如果你的Redis是5.0及5.0以后的版本,就可以考虑把Streams用作消息队列了。
Redis是一个非常轻量级的键值数据库,部署一个Redis实例就是启动一个进程,部署Redis集群,也就是部署多个Redis实例。而Kafka、RabbitMQ部署时,涉及额外的组件,例如Kafka的运行就需要再部署ZooKeeper。相比Redis来说,Kafka和RabbitMQ一般被认为是重量级的消息队列。
所以,关于是否用Redis做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。