Redis大杀器,深入剖析,从简入深

  行走江湖多年,关于Redis曾经在面试中饱受面试官拷打,本文将以简洁的图文来阐述Redis,建立起对redis的透彻印象,同时也会对redis在企业实战中的一些优化来做深度剖析,目的是拿捏redis不求人。




Redis是什么?(Remote Dictionary Server )-远程字典服务是一个高性能的键值对(key-value)存储系统。它通常被用作缓存层,以提升应用程序的性能。Redis是内存中的数据结构存储系统,这意味着所有的数据都存储在内存中,使得读取和写入操作都非常快速。




redis安装参考地址





  • 字符串(string):字符串是 Redis 最基本的数据类型,一个键最大的存储容量是 512MB。

  • 哈希表(hash):哈希表存储的是键值对,可以理解为一个字典。每个哈希表最多可以存储 2^32 – 1 个键值对(40多亿)。

  • 列表(list):列表是一个有序的字符串类型数据集合,可以理解为一个数组。一个列表最多可以包含 2^32 – 1 个元素(40多亿)。

  • 集合(set):集合是一个无序的字符串类型数据集合,不允许重复元素。

  • 有序集合(sorted set):有序集合是一个有序的字符串类型数据集合,每个元素都有一个分数值(score),可以理解为带权重的元素。

  • 位图(bitmap):由二进制位组成的数据类型,可以对其进行位运算。

  • 地理位置(geospatial):可以储存地理位置的数据类型,支持距离计算和地理范围查询。

  • Redis 5.0 版本引入了 Stream 数据结构,它提供了基于消息的处理方式。

  • HyperLogLog:用于进行基数统计,即统计不同元素的数量。




  虽然Redis主要存储在内存中,但为了防止数据丢失,你可以使用持久化功能。Redis支持两种持久化方法:RDB(Redis DataBase)和AOF(Append Only File)。RDB通过生成数据快照进行持久化,而AOF则通过记录每次写操作进行持久化。


RDB持久化 Redis某一时刻的数据持久化到磁盘中,是一种快照式的持久化方法。


Redis在进行数据持久化的过程中,会先将数据写入到一个临时文件中,待持久化过程都结束了,才会用这个临时文件替换上次持久化好的文件。正是这种特性,让我们可以随时来进行备份,因为快照文件总是完整可用的。


RDB优点:


  • RDB是一个非常紧凑的文件,它保存了某个时间点得数据集,非常适用于数据集的备份,比如你可以在每个小时报保存一下过去24小时内的数据,同时每天保存过去30天的数据,这样即使出了问题你也可以根据需求恢复到不同版本的数据集.

  • RDB是一个紧凑的单一文件,很方便传送到另一个远端数据中心或者亚马逊的S3(可能加密),非常适用于灾难恢复.

  • RDB在保存RDB文件时父进程唯一需要做的就是fork出一个子进程,接下来的工作全部由子进程来做,父进程不需要再做其他IO操作,所以RDB持久化方式可以最大化redis的性能.


RDB缺点:


  • 如果你希望在redis意外停止工作(例如电源中断)的情况下丢失的数据最少的话,那么RDB不适合你.虽然你可以配置不同的save时间点(例如每隔5分钟并且对数据集有100个写的操作),是Redis要完整的保存整个数据集是一个比较繁重的工作,你通常会每隔5分钟或者更久做一次完整的保存,万一在Redis意外宕机,你可能会丢失几分钟的数据.

  • RDB 需要经常fork子进程来保存数据集到硬盘上,当数据集比较大的时候,fork的过程是非常耗时的,可能会导致Redis在一些毫秒级内不能响应客户端的请求.如果数据集巨大并且CPU性能不是很好的情况下,这种情况会持续1秒,AOF也需要fork,但是你可以调节重写日志文件的频率来提高数据集的耐久度.


AOF持久化



AOF方式是将执行过的写指令记录下来,在数据恢复时按照从前到后的顺序再将指令都执行一遍。


实现方式:我们通过配置Redis.conf 中的appendonly yes 就可以打开AOF功能。如果有写操作(如SET 等),Redis 就会被追加到AOF 文件的末尾。AOF 持久化的方式:默认的AOF 持久化策略是每秒钟fsync 一次(fsync是指把缓存中的写指令记录到磁盘中),因为在这种情况下,Redis 仍然可以保持很好的处理性能,即使Redis 故障,也只会丢失最近1 秒钟的数据。   如果在追加日志时,恰好遇到磁盘空间满或断电等情况导致日志写入不完整,也没有关系,Redis 提供了Redis-check-aof工具,可以用来进行日志修复。因为采用了追加方式,如果不做任何处理的话,AOF 文件会变得越来越大,为此,Redis 提供了AOF 文件重写(rewrite)机制,即当AOF 文件的大小超过所设定的阈值时,Redis 就会启动AOF 文件的内容压缩,只保留可以恢复数据的最小指令集。举个例子或许更形象,假如我们调用了100 次INCR 指令,在AOF 文件中就要存储100 条指令,但这明显是很低效的,完全可以把这100条指令合并成一条SET 指令,这就是重写机制的原理。


AOF 优点:我们通过一个场景再现来说明。某同学在操作Redis 时,不小心执行了FLUSHALL,导致Redis 内存中的数据全部被清空了,这是很悲剧的事情。不过这也不是世界末日,只要Redis 配置了AOF 持久化方式,且AOF文件还没有被重写(rewrite),我们就可以用最快的速度暂停Redis 并编辑AOF文件,将最后一行的FLUSHALL 命令删除,然后重启Redis,就可以恢复Redis的所有数据到FLUSHALL 之前的状态了。是不是很神奇,这就是AOF 持久化方式的好处之一。但是如果AOF 文件已经被重写了,那就无法通过这种方法来恢复数据了。 AOF缺点:比如在同样数据规模的情况下,AOF文件要比RDB文件的体积大。而且,AOF 方式的恢复速度也要慢于RDB 方式。


企业级的持久化的配置策略


RDB 和 AOF 是 Redis 内部的两种数据持久化策略,这是两种不同的持久化策略,一种是基于内存快照,一种是基于操作日志。


在企业中,RDB的生成策略,用默认的也差不多


save 60 10000:如果你希望尽可能确保说,RDB最多丢1分钟的数据,那么尽量就是每隔1分钟都生成一个快照,低峰期,数据量很少,也没必要


具体是10000->生成RDB,1000->RDB,这个生成rdb操作的阈值大小,这个根据你自己的应用和业务的数据量,你自己去决定


AOF一定要打开,设置appendsync = everysec

auto-aof-rewrite-percentage 100: 就是当前AOF大小膨胀到超过上次100%,上次的两倍

auto-aof-rewrite-min-size 64mb: 根据你的数据量来定,16mb,32mb


redis数据备份恢复



    学习一门技术,关键是为我所用,能过通过技术驱动业务,有实际的业务产出,提高企业变现的能力或整个生产流程的效率,这是每个技术人必须落实的理念!


    作为史上最大的Redis集群,有必要透过新浪微博这家企业来看redis在企业中的使用借鉴!


新浪微博目前使用的98%都是持久化的应用,2%的是缓存,用到了600+服务器 Redis中持久化的应用和非持久化的方式不会差别很大: 非持久化的为8-9万tps,那么持久化在7-8万tps左右; 当使用持久化时,需要考虑到持久化和写性能的配比,也就是要考虑redis使用的内存大小和硬盘写的速率的比例计算;


🍉 5.1 使用历程


2009年, 使用memcache(用于非持久化内容), memcacheDB(用于持久化+计数), memcacheDB是新浪在memcache的基础上,使用BerkeleyDB作为数据持久化的存储实现;


面临的问题


数据结构(Data Structure)需求越来越多, 但memcache中没有, 影响开发效率性能需求, 随着读操作的量的上升需要解决。 经历的过程有: 数据库读写分离(M/S)–>数据库使用多个Slave–>增加Cache (memcache)–>转到Redis解决写的问题:水平拆分,对表的拆分,将有的用户放在这个表,有的用户放在另外一个表;可靠性需求:Cache的”雪崩”问题让人纠结 Cache面临着快速恢复的挑战开发成本需求:Cache和DB的一致性维护成本越来越高(先清理DB, 再清理缓存, 不行啊, 太慢了!) 开发需要跟上不断涌入的产品需求: 硬件成本最贵的就是数据库层面的机器,基本上比前端的机器要贵几倍,主要是IO密集型,很耗硬件;维护性复杂:一致性维护成本越来越高; BerkeleyDB使用B树,会一直写新的,内部不会有文件重新组织;这样会导致文件越来越大;大的时候需要进行文件归档,归档的操作要定期做; 这样,就需要有一定的down time;


🥝 5.2 寻找开源软件的方式及评判标准


  • 对于开源软件,首先看其能做什么,但更多的需要关注它不能做什么,它会有什么问题?

  • 上升到一定规模后,可能会出现什么问题,是否能接受?

  • google code上, 国外论坛找材料(国内比国外技术水平滞后5年)

  • 观察作者个人的代码水平


🍓 5.3 业务使用方式


  • hash sets: 关注列表, 粉丝列表, 双向关注列表(key-value(field), 排序)

  • string(counter): 微博数, 粉丝数, …(避免了select count(*) from …)

  • sort sets(自动排序): TopN, 热门微博等, 自动排序

  • lists(queue): push/sub提醒,…


上述四种, 从精细化控制方面: hash sets和string(counter)推荐使用, sort sets和lists(queue)不推荐使用 还可通过二次开发,进行精简。比如: 存储字符改为存储整形, 16亿数据, 只需要16G内存 存储类型保存在3种以内,建议不要超过3种; 将memcache +myaql 替换为Redis: Redis作为存储并提供查询,后台不再使用mysql,解决数据多份之间的一致性问题;


🍎 5.3.1 List


微博和微信公众号消息流


LPUSH msg:{id} 10086


查看最新的微博消息


//LRANGE:返回列表key中指定区间内的元素,区间以偏移量start和stop指定
LRANGE msg:{id} 0 5


微信、微博在线用户抽奖

//思路1)点击参与抽奖加入集合SADD key {userID}2)查看参与抽奖所有用户SMEMBERS key3)抽count名中奖者如果希望中奖用户依然保留在抽奖池里面SRANDMENMBER key [count]SRANDMENMBER 从集合key中选出count个元素,元素不从key中删除如果不希望中奖用户依然保留在抽奖池里面SPOP key [count]SPOP从集合key中选出count个元素,元素从key中删除


微信、微博、抖音点赞、收藏标签


//思路1)抖音大V视频点赞量SADD like:{messageId}  {userId}2)取消点赞SREM like:{messageId} {userId}移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略3)检查用户是否点过赞SISMEMBER like:{messageId} {userId}4)获取点赞的用户列表SISMEMBER like:{messageId}5)获取点赞用户数返回集合 key 的基数(集合中元素的数量)SCARD like:{messageId}


微博、QQ共同关注的人,我关注的人也关注他,可能认识的人


//思路共同关注的人://两个集合的交集SINTER simaSet yanguoSet我关注的人也关注他//我关注的人在他的集合里面,可以运用SISMENBERSISMENBER simaSet yanguo我可能认识的人//他关注的人,不在我的集合里面,但是在我的朋友集合里面SDIFF simaSet yanguoSet


🍅 5.3.2 Zset


微博热搜、排行榜


//思路1)热搜//新闻点击量ZINCRBY HOTNEWS:20191115 1 李小璐离婚为有序集合key中元素member的分值加上increment2)展示当日排行版前十ZREVRANGE hotNews:20191115 0 10 WITHSCORESZREVRANGE倒序获取有序集合key中start下标到stop下标的元素3)七日搜索榜单计算ZUNIONSTORE hotNews:20190722 7并集计算4)展示七日排行前十ZREVRANGE hotNews:20190716-20190722 0 10 WITHSCORES交集计算


🥒  5.3.3 Geo


附近的人


//思路微博、微信、默默<附近的人>滴滴打车、哈啰单车、美团单车<附近的车>美团和饿了吗<附近的餐馆>
基于地理位置比如GEORADIUSredis> GEOADD Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"(integer) 2
redis> GEORADIUS Sicily 15 37 200 km WITHDIST1) 1) "Palermo" 2) "190.4424"2) 1) "Catania" 2) "56.4413"
redis> GEORADIUS Sicily 15 37 200 km WITHCOORD1) 1) "Palermo" 2) 1) "13.361389338970184" 2) "38.115556395496299"2) 1) "Catania" 2) 1) "15.087267458438873" 2) "37.50266842333162"
redis> GEORADIUS Sicily 15 37 200 km WITHDIST WITHCOORD1) 1) "Palermo" 2) "190.4424" 3) 1) "13.361389338970184" 2) "38.115556395496299"2) 1) "Catania" 2) "56.4413" 3) 1) "15.087267458438873" 2) "37.50266842333162"


🍒 5.3.4 String


微信公众号中文章阅读量、抖音中的城市访问量


Redis提供了基于原子的加减操作,
这里面我们可以用计数器功能,
运用INCR对feild进行加加操作统计数据,
INCR article:readcount:{articleId}.


🌽 5.4 其他高频通用场景


会话缓存(Session Cache)or 全页缓存(FPC) or 数据缓存(Data Cache)


  Redis数据库是一种基于缓存的数据库,通过缓存来提升访问速度。   当我们访问某个网站时,第一个访问者向服务器发起请求,服务器返回一些静态文件。   如果某些静态文件的内容不经常变化,就可以将它们存储到Redis中,以便下一次访问时直接从Redis中返回,节省查找时间。如果Redis中没有需要的数据,就在文件中查找并返回数据,同时将数据存储到Redis中,以便下一个人访问时可以快速返回。   由于Redis基于内存存储,存储空间有限,因此需要对数据进行区分和设置存储时间,避免存储满后无法继续存储。对于访问频率高的数据,其存储时间会被不断更新延长,而对于访问频率低的数据,则会因为长时间未被访问而自动删除。这种数据存储方式非常常见,但需要根据实际情况进行设置和调整。


bitMap(大数据处理)


  源于redis特殊的数据结构位图(bitmap),用于数据量上亿的场景下,例如几亿用户系统的签到,去重登录次数统计,某用户是否在线状态等等。腾讯10亿用户,要几个毫秒内查询到某个用户是否在线,能怎么做?千万别说给每个用户建立一个key,然后挨个记(你可以算一下需要的内存会很恐怖,而且这种类似的需求很多。这里要用到位操作——使用setbit、getbit、bitcount命令。原理是:   redis内构建一个足够长的数组,每个数组元素只能是0和1两个值,然后这个数组的下标index用来表示用户id(必须是数字哈),那么很显然,这个几亿长的大数组就能通过下标和元素值(0和1)来构建一个记忆系统。


分布式锁(重点)


基于Redis分布式锁实现



分布式锁对比


分类 方案 实现原理 优点 缺点
基于数据库 基于mysql 表唯一索引 1.表增加唯一索引 2.加锁:执行insert语句,若报错,则表明加锁失败 3.解锁:执行delete语句 完全利用DB现有能力,实现简单 1.锁无超时自动失效机制,有死锁风险 2.不支持锁重入,不支持阻塞等待 3.操作数据库开销大,性能不高
基于MongoDB findAndModify原子操作 1.加锁:执行findAndModify原子命令查找document,若不存在则新增
2.解锁:删除document
实现也很容易,较基于MySQL唯一索引的方案,性能要好很多 1.大部分公司数据库用MySQL,可能缺乏相应的MongoDB运维、开发人员 2.锁无超时自动失效机制
基于分布式协调系统 基于ZooKeeper 1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点 2.解锁:删除节点 1.由zk保障系统高可用 2.Curator框架已原生支持系列分布式锁命令,使用简单 需单独维护一套zk集群,维保成本高
基于缓存 基于redis命令 1. 加锁:执行setnx,若成功再执行expire添加过期时间 2. 解锁:执行delete命令 实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好 1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入
基于redis Lua脚本能力 1. 加锁:执行SET lock_name random_value EX seconds NX 命令  
2. 解锁:执行Lua脚本,释放锁时验证random_value  — ARGV[1]为random_value, KEYS[1]为lock_nameif redis.call(“get”, KEYS[1]) == ARGV[1] then  return redis.call(“del”,KEYS[1])else  return 0end
同上;实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大。 不支持锁重入,不支持阻塞等待


分布式锁需满足四个条件


  • 互斥性。在任意时刻,只有一个客户端能持有锁。

  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

  • 解铃还须系铃人,加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。

  • 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。


Redisson分布式锁的实现



Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
RLock rLock = redissonClient.getLock(lockKey);try { * 4.尝试获取锁 * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完) */ boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS); if (res) { }} catch (Exception e) { throw new RuntimeException("aquire lock fail");}finally{ rLock.unlock();}


redisson这个框架重度依赖了Lua脚本和Netty,代码很牛逼,各种Future及FutureListener的异步、同步操作转换。


如果要手写一个分布式锁组件,怎么做?肯定要定义2个接口:加锁、解锁;大道至简,redisson的作者就是在加锁和解锁的执行层面采用Lua脚本,逼格高,而且重要有原子性保证。当然,redisson的作者毕竟牛逼,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能。


加锁和解锁Lua脚本剖析:


1、加锁Lua脚本


脚本参数


参数 示例值 含义
KEY个数 1 KEY个数
KEYS[1] my_first_lock_name 锁名
ARGV[1] 60000 持有锁的有效时间:毫秒
ARGV[2] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID


脚本内容


-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间if (redis.call('exists', KEYS[1]) == 0) then    redis.call('hset', KEYS[1], ARGV[2], 1);    redis.call('pexpire', KEYS[1], ARGV[1]);    return nil;end; -- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then    redis.call('hincrby', KEYS[1], ARGV[2], 1);    redis.call('pexpire', KEYS[1], ARGV[1]);    return nil;end; -- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间return redis.call('pttl', KEYS[1]);


脚本解读



当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果


2、解锁Lua脚本


脚本入参


参数 示例值 含义
KEY个数 2 KEY个数
KEYS[1] my_first_lock_name 锁名
KEYS[2] redisson_lock__channel:{my_first_lock_name} 解锁消息PubSub频道
ARGV[1] 0 redisson定义0表示解锁消息
ARGV[2] 30000 设置锁的过期时间;默认值30秒
ARGV[3] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 唯一标识;同加锁流程


脚本内容


-- 若锁不存在:则直接广播解锁消息,并返回1if (redis.call('exists', KEYS[1]) == 0) then    redis.call('publish', KEYS[2], ARGV[1]);    return 1; end; -- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then    return nil;end;  -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then     -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期    redis.call('pexpire', KEYS[1], ARGV[2]);     return 0; else     -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程    redis.call('del', KEYS[1]);     redis.call('publish', KEYS[2], ARGV[1]);     return 1;end; return nil;


脚本解读广播解锁消息有什么用?是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。


返回值0、1、nil有什么不一样?当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本;


加锁源码


加锁源码中tryAcquire(leaseTime, unit, threadId)方法执行了加锁Lua脚本。直接进入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)源码


@Override    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {        // 获取锁能容忍的最大等待时长        long time = unit.toMillis(waitTime);        long current = System.currentTimeMillis();        final long threadId = Thread.currentThread().getId();         // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁        Long ttl = tryAcquire(leaseTime, unit, threadId);        // lock acquired        if (ttl == null) {            return true;        }         // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间        time -= (System.currentTimeMillis() - current);        if (time <= 0) {            acquireFailed(threadId);            return false;        }         current = System.currentTimeMillis();        // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage        /**     * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:     * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争     * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败     * 当 this.await返回true,进入循环尝试获取锁     */        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);        //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {            if (!subscribeFuture.cancel(false)) {                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {                    @Override                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {                        if (subscribeFuture.isSuccess()) {                            unsubscribe(subscribeFuture, threadId);                        }                    }                });            }            acquireFailed(threadId);            return false;        }         // 订阅成功        try {            // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间            time -= (System.currentTimeMillis() - current);            if (time <= 0) {                // 超出可容忍的等待时长,直接返回获取锁失败                acquireFailed(threadId);                return false;            }             while (true) {                long currentTime = System.currentTimeMillis();                // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】                ttl = tryAcquire(leaseTime, unit, threadId);                // lock acquired                if (ttl == null) {                    return true;                }                 time -= (System.currentTimeMillis() - currentTime);                if (time <= 0) {                    acquireFailed(threadId);                    return false;                }                 // waiting for message                currentTime = System.currentTimeMillis();                 // 【核心点3】根据锁TTL,调整阻塞等待时长;                // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;               //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。                if (ttl >= 0 && ttl < time) {                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                } else {                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);                }                 time -= (System.currentTimeMillis() - currentTime);                if (time <= 0) {                    acquireFailed(threadId);                    return false;                }            }        } finally {            // 取消解锁消息的订阅            unsubscribe(subscribeFuture, threadId);        }    }


tryAcquire的实现,真的就是执行Lua脚本!

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果 return get(tryAcquireAsync(leaseTime, unit, threadId));} // 见org.redisson.RedissonLock#tryAcquireAsyncprivate <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { // 实质是异步执行加锁Lua脚本 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture;} // 见org.redisson.RedissonLock#tryLockInnerAsync<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}


加锁总结


尝试获取锁,这一步是通过执行加锁Lua脚本来做; 若第一步未获取到锁,则去订阅解锁消息,当获取锁到剩余过期时间后,调用信号量方法阻塞住,直到被唤醒或等待超时 一旦持有锁的线程释放了锁,就会广播解锁消息。于是,第二步中的解锁消息的监听器会释放信号量,获取锁被阻塞的那些线程就会被唤醒,并重新尝试获取锁。


RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),默认值也是30000毫秒这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制


看门狗到底做了什么?


先思考一个问题:


假设在一个分布式环境下,多个服务实例请求获取锁,其中服务实例1成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了或者hang住了,那么这个锁会不会释放,什么时候释放?回答这个问题,自然想起来之前我们分析的lua脚本,其中第一次加锁的时候使用pexpire给锁key设置了过期时间,默认30000毫秒,由此来看如果服务实例宕机了,锁最终也会释放,其他服务实例也是可以继续获取到锁执行业务。但是要是30000毫秒之后呢,要是服务实例1没有宕机但是业务执行还没有结束,所释放掉了就会导致线程问题,这个redisson是怎么解决的呢?


自动延长锁有效期的机制


异步执行完lua脚本执行完成之后,设置了一个监听器,来处理异步执行结束之后的一些工作。在操作完成之后会去执行operationComplete方法,先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果,如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal,回想一下之前的lua脚本,当加锁逻辑处理结束,返回了一个nil;如此说来 就一定会走定时任务了。


定时调度scheduleExpirationRenewal代码


private void scheduleExpirationRenewal(final long threadId) {        if (expirationRenewalMap.containsKey(getEntryName())) {            return;        }         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {            @Override            public void run(Timeout timeout) throws Exception {                                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +                            "return 1; " +                        "end; " +                        "return 0;",                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));                                future.addListener(new FutureListener<Boolean>() {                    @Override                    public void operationComplete(Future<Boolean> future) throws Exception {                        expirationRenewalMap.remove(getEntryName());                        if (!future.isSuccess()) {                            log.error("Can't update lock " + getName() + " expiration", future.cause());                            return;                        }                                                if (future.getNow()) {                            // reschedule itself                            scheduleExpirationRenewal(threadId);                        }                    }                });            }        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);         if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {            task.cancel();        }    }


首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,如果已经存在了,就直接返回;第一次加锁,肯定是不存在的,接下来就是搞了一个TimeTask,延迟internalLockLeaseTime/3之后执行,这里就用到了文章一开始就提到奇妙的变量,算下来就是大约10秒钟执行一次,调用了一个异步执行的方法:



RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

任务调度异步执行并设置监听器,更新锁过期时间失败则返回;获取异步执行结果为true则每隔10秒执行一次,续约锁的过期时间到30000毫秒,保证锁一直在手中。


加锁成功后,在执行业务期间,其他服务实例或者当前客户端的其他线程若尝试加锁,都会被阻塞。加锁lua代码中,若锁key存在且当前客户端唯一key也存在,则自增重入次数,设置过期时间,并返回nil,实现锁的可重入性。定时调度任务会继续执行,完成锁key过期时间的续约。


如果没有出现锁续约的情况,直接返回当前锁的剩余有效期,不执行续约逻辑。若加锁成功,则返回;若加锁失败,则死循环尝试加锁,等待一段时间后重试,直到第一个服务实例释放锁。这样实现了锁的互斥。


锁释放的逻辑


锁释放的逻辑,调用lock.unlock()会异步执行lua脚本,判断锁key是否存在,不存在则返回;存在则判断唯一key的值是否存在,不存在返回nil;存在则自减一并判断是否大于零,大于零返回0,否则删除锁key并返回1。如果返回1,取消之前的续约任务,失败则设置状态。


源码解读


org.redisson.RedissonLock#unlock代码

@Override public void unlock() { // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁 Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } } // 见org.redisson.RedissonLock#unlockInnerAsyncprotected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }

加锁&解锁流程串起来


总结


  • 线程A抢到锁,线程B订阅解锁消息再尝试获取锁并阻塞等待。

  • 线程A释放锁并广播解锁消息。

  • LockPubSub收到消息后释放信号量,线程B被唤醒后再次抢到锁并干完活解锁。





Redis事务


Redis(Remote Dictionary Server)是一个开源的高性能键值对(key-value)存储系统,通常用作数据库、缓存和消息代理。它支持多种数据结构,如字符串、哈希表、列表、集合和有序集合。Redis具有原子性操作,能够在单个命令中实现数据的读取、写入和删除,这使得它在处理高并发请求时表现出色。


  • 高速缓存:Redis使用内存存储数据,因此它可以在大多数操作中提供非常快的响应时间。

  • 原子性操作:Redis的所有操作都是原子的,这意味着它们要么完全执行,要么完全不执行。

  • 数据结构多样性:Redis支持多种数据结构,使得它可以用作多种用途,如数据库、缓存和消息代理。

  • 持久化:虽然主要使用内存存储,但Redis也支持将数据持久化到磁盘。

  • 发布/订阅:Redis支持发布/订阅模型,使其成为实现实时应用的理想选择。

  • 事务处理:Redis支持事务处理,即一系列按顺序执行的命令,要么全部执行,要么全部不执行。

  • Lua脚本处理:Redis支持Lua脚本处理,可以在服务器端执行一段Lua脚本。

  • 分布式:通过Redis的分片,可以很容易地实现数据的分布式存储和处理。


Redis是一个功能丰富、性能卓越的键值对存储系统,适用于各种应用场景,无论是作为数据库、缓存还是消息代理,都能提供出色的性能和可靠性。

链接:https://juejin.cn/post/7309549295726854181

(版权归原作者所有,侵删)



标签

发表评论

苏ICP备2023047577号-1