redis的数据是redis存在内存中里吗

使用Redis之前5个必须了解的事情
发表于 08:49|
来源Redis Blog|
作者Itamar Haber
摘要:对比传统关系型数据库,虽然基于Redis的应用程序开发有着很多相同之处,但是有一些关键区别在应用程序设计时却必须铭记在心,比如基于内存的单线程特性。
使用Redis开发应用程序是一个很愉快的过程,但是就像其他技术一样,基于Redis的应用程序设计你同样需要牢记几点。在之前,你可能已经对关系型数据库开发的那一整个套路了然如胸,而基于Redis的应用程序开发也有许多相似的地方,但是你必须牢记以下两点——Redis是个内存数据库,同时它是单线程的。因此,在使用Redis时,你需要注意以下几点:
1. 掌控储存在Redis中的所有键
数据库的主要功能是储存数据,但是对于开发者来说,因为应用程序需求或者数据使用方法的改变,忽略存储在数据库中的某些数据是非常正常的,在Redis中同样如此。你可能忽视期满某些键,也可能因为应用程序的某个模块弃用而忘掉这些数据。
无论哪种情况,Redis都存储了一些不再使用的数据,平白无故的占用了一些空间。Redis的弱结构数据模式让集中储存的内容很难被弄清,除非你为键使用一套非常成熟的命名法则。使用合适的命名方法会简化你的数据库管理,当你通过你的应用程序或者服务做键的命名空间时(通常情况下是使用冒号来划分键名),你就可以在数据迁移、转换或者删除时轻松的识别。
Redis另一个常见用例是作为热数据项作的第二数据存储,大部分的数据被保存在其他的数据库中,比如PostgreSQL或MongoDB。在这些用例中,当数据从主存储移除时,开发者经常会忘记删除Redis中对应的数据。这种存在跨数据存储的情况下,通常需要做级联删除,这种情况下,可以通过在Redis配置保存特定数据项的所有识别符来实现,从而保证数据在主数据库被删除后,系统会调用一个清理程序来删除所有相关副本和信息。
2. 控制所有键名的长度
在上文我们说过要使用合适的命名规则,并且添加前缀来识别数据走向,因此这一条看起来似乎与之违背。但是,请别忘记,Redis是个内存数据库,键越短你需要的空间就越少。理所当然,当数据库中拥有数百万或者数十亿键时,键名的长度将影响重大。
举个例子:在一个32位的Redis服务器上,如果储存一百万个键,每个值的长度是32-character,那么在使用6-character长度键名时,将会消耗大约96MB的空间,但是如果使用12-character长度的键名时,空间消耗则会提升至111MB左右。随着键的增多,15%的额外开销将产生重大的影响。
3. 使用合适的数据结构
不管是内存使用或者是性能,有的时候数据结构将产生很大的影响,下面是一些可以参考的最佳实践:
取代将数据存储为数千(或者数百万)独立的字符串,可以考虑使用哈希数据结构将相关数据进行分组。哈希表是非常有效率的,并且可以减少你的内存使用;同时,哈希还更有益于细节抽象和代码可读。
合适时候,使用list代替set。如果你不需要使用set特性,List在使用更少内存的情况下可以提供比set更快的速度。
Sorted sets是最昂贵的数据结构,不管是内存消耗还是基本操作的复杂性。如果你只是需要一个查询记录的途径,并不在意排序这样的属性,那么轻建议使用哈希表。
Redis中一个经常被忽视的功能就是bitmaps或者bitsets(V2.2之后)。Bitsets允许你在Redis值上执行多个bit-level操作,比如一些轻量级的分析。
4. 使用SCAN时别使用键
从Redis v2.8开始,SCAN命令已经可用,它允许使用游标从keyspace中检索键。对比KEYS命令,虽然SCAN无法一次性返回所有匹配结果,但是却规避了阻塞系统这个高风险,从而也让一些操作可以放在主节点上执行。
需要注意的是,SCAN 命令是一个基于游标的迭代器。SCAN 命令每次被调用之后, 都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为
SCAN 命令的游标参数, 以此来延续之前的迭代过程。同时,使用SCAN,用户还可以使用keyname模式和count选项对命令进行调整。
SCAN相关命令还包括SSCAN 命令、HSCAN 命令和 ZSCAN 命令,分别用于集合、哈希键及有续集等。
5. 使用服务器端Lua脚本
在Redis使用过程中,Lua脚本的支持无疑给开发者提供一个非常友好的开发环境,从而大幅度解放用户的创造力。如果使用得当,Lua脚本可以给性能和资源消耗带来非常大的改善。取代将数据传送给CPU,脚本允许你在最接近数据的地方执行逻辑,从而减少网络延时和数据的冗余传输。
在Redis中,Lua一个非常经典的用例就是数据过滤或者将数据聚合到应用程序。通过将处理工作流封装到一个脚本中,你只需要调用它就可以在更短的时间内使用很少的资源来获取一个更小的答案。
专家提示:Lua确实非常棒,但是同样也存在一些问题,比如很难进行错误报告和处理。一个明智的方法就是使用Redis的Pub/Sub功能,并且让脚本通过专用信道来推送日志消息。然后建立一个订阅者进程,并进行相应的处理。原文链接:(编译/仲浩 审校/魏伟)
免费订阅“CSDN云计算(左)和CSDN大数据(右)”微信公众号,实时掌握第一手云中消息,了解最新的大数据进展!
CSDN发布虚拟化、Docker、OpenStack、CloudStack、数据中心等相关云计算资讯, & & 分享Hadoop、Spark、NoSQL/NewSQL、HBase、Impala、内存计算、流计算、机器学习和智能算法等相关大数据观点,提供云计算和大数据技术、平台、实践和产业信息等服务。
& & & & & &
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章830被浏览201325分享邀请回答8018 条评论分享收藏感谢收起7511 条评论分享收藏感谢收起博客分类:
tpn(taobao push notification)在使用redis计算消息未读数的过程中,遇到了一系列的问题,下面把这个过程整理了一下,也让大家了解这个纠结的过程,供大家以后使用redis或者做类似的功能时进行参考
redis在tpn里面主要是用于计算移动千牛(Android、IOS)上的消息未读数。tpn的未读消息数是基于bizId维度的,即同一个bizId(每条消息的业务id,如果商品id、订单id等),即使有多条消息,未读数也只能算1。因此在接收消息,计算移动千牛未读数的过程中,就需要对bizId去重,这个去重的功能就是通过redis来实现的。随着消息量的不断上涨,这个基于redis的去重方案也不断变化。一、基于redis Set结构的未读数计算
前面说到的tpn未读数计算的最大特点就是基于bizId去重,在java里面,我们很容易想到利用HashMap或者HashSet来判重,因此最初tpn就是利用redis的Set结构来进行判重。主要利用了redis set结构的这两个命令:SADD和SCARDSADD key member
[member....]:将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。假如 key 不存在,则创建一个只包含 member 元素作成员的集合。 如果member元素不在集合里面,则返回1;如果member元素已经存在于集合当中,则返回0。SCARD key:返回集合 key 中元素的数量。
有了这两个命令,计算未读数的步骤就是这样的:
tpn会为用户保留7天内的消息,也就是说保存到redis set结构中的bizId失效时间是7天,同时用户在查看消息后,就会把其对应的redis set清空(即如果一个用户连续几天都不查看千牛的消息,那么其对应的redis set集合里面就会保存大量的bizid)。tpn总共有6台redis机器,每台机器上部署5个redis实例,每个实例的maxmemory设为1G,总共30G的内存用于存放消息bizId。在tpn的早期,由于用户量不多,消息量也不大,redis的内存完全可以存放7天内的所有消息bizId,因此这个方案work的很好。但随着全网大多数活跃卖家开始使用千牛,tpn的消息量也随之暴涨,越来越多的消息bizId给redis带来了极大的压力,在消息高峰期,tpn的日志里会有大量的redis timeout异常(tpn使用jedis,配置的timeout是300ms),经过分析,主要是由下面原因造成的:
缓存失效造成的超时:前面我们提到了,tpn的每个redis实例的maxmemory设置的是1G,因为bizId越来越多,因此很快每个redis 实例的内存就超过了maxmemory。而redis在处理客户端请求时,如果发现当前内存的使用量已经大于等于maxmemory,就会去失效部分过期的缓存,直到内存使用量小于maxmemory。很明显这个失效缓存释放内存的操作会影响redis的rt。在消息高峰期,redis实例的内存使用量一直再maxmemory附加徘徊,造成redis在应对大量请求的同时,还要不停地失效缓存释放内存,造成频繁超时。
因为bizId太多,而redis内存不够,所以造成redis请求大量超时,最简单地办法就是加机器,部署更多的redis实例来存储越来越多的消息bizId。初步估计了一下,要完全把7天内的所有消息bizId都保存到内存中,需要高达上百G的内存:交易消息和商品消息是tpn最主要的两类消息,因为目前全网大多数活跃卖家都使用了千牛,为了去重,tpn需要把全网7天内所有新增的交易id和商品id都保存到redis内存中,换句话来说,也就是要用内存来保存7天内tc和ic新增的所有id。tpn基本不可能申请到这么多的redis机器,就算有这么多的redis机器,部署维护成本也是巨大的。就算不用redis,使用tair的rdb,这个陈本仍然是不能接受的。
在移动千牛客户端,推送没有正常到达的情况下(比如长连接断开的时候),是依赖客户端在发现长连接断开以后调用messagecount.get接口来获取到消息未读数,然后促使用户手动获取最新的消息。当redis的内存使用量接近极限时,调用redis的sadd、scard命令很容易就timeout了,因此不能正确地计算出消息未读数,就会造成用户不能及时获取到最新的消息。
总的来说,redis的内存容量不足以容纳越来越多的业务消息bizId,造成大量redis请求超时,不能正确地计算消息未读数。因此需要对上述方案进行优化。二、redis用于消息去重判断,tair存放未读数消息数的方案
根据上面的分析,当redis内存使用量达到了上限时,很容易发送timeout,同时redis内存使用量会之所以会很快地达到上限,主要是因为不活跃用户的set结构里面保存了大量的bizId。在不能快速增加redis机器的前提下,最简单地方法就是在夜间重启redis。重启redis会带来一下影响:所有用户保存在set里面的消息bizId全部被清空了,就会造成误判:即对同一个bizId的消息重复提醒用户有新消息。但这个并不会对用户造成太大的影响:因为活跃用户会及时地来查看消息,所以活跃的set结构基本都是空的;而非活跃用户的redis set结构虽然有很多消息bizId,但是因为其是不活跃的,就算被清空,很快又会有新的bizId存放进去,但认为是不活跃用户,对这种情况基本无感知。因为set结构被清空,所以所有用户的消息未读数也被清空(通过scard命令来计算未读数)。根据前面的分析,在消息推送不能正常达到的情况下,正确的未读数会促使用户主动地来获取最新消息,所以基本不能接受重启redis的时候,清空用户的消息未读数
因为不能接受随意清空用户的消息未读数,所以我们不能定期重启redis来释放内存。但是如果我们把消息去重和计算未读数分开,即redis的set结构只用于判断一条消息是否是新消息,是否需要增加未读数,而把未读数保存在其他的地方,如果tair之类的,那我们是不是就可以定期重启redis了呢?因此我们得到了下面的方案:继续是用redis的set结构来判断一条消息是不是新消息,是不是需要增加消息未读数不再使用redis的scard命令计算消息未读数,而是采用基于tair的计数器来计算消息未读数,即如果通过redis的set结构判断出是新消息,则对保存在tair里面的未读数计数器执行incr unReadCountKey 1。
这样一来,redis就只用于对消息bizId去重,而不再用于计算消息未读数,消息未读数单独保存在基于tair的计数器当中。因此我们就大胆地定期在夜间重启redis了。这个方案成功work了一段时间,但过了一段时间后,应用在请求redis的时候又开始是不是抛出大量的timeout exception。分析了一下,问题还是处在redis内存上:虽然可以通过定期重启redis来释放内存,但是redis内存的增加的速度是不可预期的,我们并不能每次都能在内存使用达到极限前重启redis有时候虽然redis的整体内存使用量还没有达到极限,但是如果一个用户的set结构里面的bizId太多了,scard命令仍然会timeout
所以这个方案还不是一个最佳的方案,仍然需要通过更好的办法来降低redis的内存使用量三、基于redis的bloomfilter的消息去重方案
从方案一到方案二,我们一直想解决的就是如何用最小的内存来判断一个消息bizId是不是新的bizId,即一个消息bizId是不是已经存在了。以最小的内存来实现判断操作,很容易就联想到bloomfilter。但是在这个场景,我们不能简单地使用bloomfilter,先来计算一下“最直接”地使用bloomfilter需要多大的内存:bloomfilter的所占用的内存由bitSize决定,而根据公式:
(int) Math.ceil(maxKey * (Math.log(errorRate) / Math.log(0.6185)));
我们为每个用户的每个消息类型创建一个bloomfilter,以500万用户,每个用户订阅了10个消息类型,那么这个用于去重的bloomfilter所占用的内存总量是:
totalMemory(G) = *Math.ceil(maxKey * (Math.log(errorRate) / Math.log(0.6185)))
这个totalMemory的大小就取决于maxKey和errorRate,保证errorRate不变的前提下,bloomfilter 的maxKey越大,bloomfilter所需要的内存也就越大。那我们估算一下使用bloomfilter,需要多少内存。以商品消息和交易小为例,不同的卖家,7天内的消息数从几个到几万个不等。最小的是7天只有几条消息,最多的7天内有7万多条。就算取个1000的评价值,这5000w个bloomfilter的内存消耗也在上百G,这明显行不通。
但是,tpn的消息未读数还有一个业务特点就是,当一个用户的某个消息类型的未读数已经超99了,就不再显示具体的数字,而是显示成99+,同时一个用户的消息未读数超过了99,那么其实他自己对消息未读数的敏感性也不高了,即就算有一条消息不是新消息,但是仍然给未读数+1了,用户也察觉不出来。
因此,在上面的公式里,我们可以把每个bloomfilter的maxKey设为100,那这样一来,所占用的内存就是一个十分能够接受的数字了:设errorRate=0.0001,maxKey=100,那么上面的5000w个bloomfilter只需要11G的内存,很明显,这不是一个完全可以接受的内存消耗。
这样一来,我们就得出下面这个基于redis bloomfilter去重方案:
通过redis的setbit命令来实现一个远端的bloomfilter,具体可以参见这个例子:/olylakers/RedisBloomFilter/blob/master/src/main/java/org/olylakers/bloomfilter/BloomFilter.java
每次来一条新消息,通过redis的bloomfilter来判断这是不是一条新消息
如果是,则对tair中的未读数计数器+1
用户每次读取消息后,则清空对应的bloomfilter
这样一来,终于我们可以通过能接受的内存来实现未读数的计算,不再要每天担心redis是不是内存不够用了,应用又频繁抛timeout exception了四、诡异的connection broken pipe
在方案三上线以后,我认为这些redis应该会消停了,redis运行一段时间后,的确再也没用timeout exception了,但是在运行一段时间后,tpn在向redis执行请求时,往redis写入命令时会报这个异常:java.net.SocketException: Broken pipe。我们知道,如果一个socket连接已经被远端给close掉了,但是客户端没有察觉,仍然通过这个连接读写数据,那么就会产生Broken pipe异常。因为tpn使用jedis,通过common pool来实现jedis的connection pool,我第一反应就是tpn没用正确使用jedis的connection pool,没有销毁掉broken的redis connection,而是已经重新把归还给了connection pool,或者是jedis的connection
pool有bug,造成了connection泄露,导致ton在往一条已经往一条已经被close的连接写入数据。但是仔细检查了一遍tpn的代码和jedis connection pool的代码,发现没用什么问题,那就说明有些redis是真的被redis服务端给关闭了,但是jedis 的connection pool没有发现。
因为客户端的jedis pool没有问题,那么基本上可以确定的确是redis server端关闭了一些连接。首先怀疑的就是tpn的redis 配置出错了,错误地配置了redis.conf里的timeout 配置项:首先怀疑的是不是tpn的redis配置不多,造成因此就去查看redis的相关代码。redis的配置文件redis.config里面有timeou这个配置项:
# Close the connection after a client is idle for N seconds (0 to disable)
检查了下tpn 6台redis上的所有配置文件,发现都没有配置这个选择,但是tpn部署了两个版本的redis,redis-2.6.14和redis-2.4,结果在redis-2.4里面,如果没有配置这个值,redis就会使用默认的值,5*60(s),而redis-2.6.14的默认值是0,即disable timeout,同时又去查看了下jedis common pool的设置,发现minEvictableIdleTimeMillis=1000L * 60L * 60L * 5L(ms),即一个redis连接的空闲时间超过5个小时才会被connection pool给回收。很明显,就是因为客户端和服务端的connection idle time设置不一样,造成了connection被一端关闭了,但是另一端没有感知,所有造成了broken pipe。解决办法就是把redid-2.4升级到redid-2.6.14。五、总结
从方案一到方案三,我最大的感触就是,在解决问题,优化方案的时候,不能仅仅执拗于技术本身,而是要联系业务思考。这个redis的bloomfilter的想法我很早就有了,但是我之前一直没有想到tpn未读数消息数只显示99+这个业务逻辑,而是一直想如何通过降低消息bizId的长度来尽可能地去节省内存,结果越想越复杂,然后就没有然后了。。。。
浏览 33393
看了下官方文档,scard命令的时间复杂度应该是O(1)吧?求解释scard是O(1)的,这里应该是笔误,写错了,是因为我们有个场景还会去调用SMEMBERS获取里面所有的bizId,这个是O(n)的,所以就会超时,这里写着写着就写串了。。。多谢提醒
浏览: 189336 次
来自: 杭州
浏览量:16338
我也是这个原因,大赞!!!!
谢谢分享,很好的实例,赞一个
lz,我有一个问题想请教下您呀,red5怎么实现播放另一台存储 ...
&div class=&quote_title ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'Redis 数据淘汰机制
在 Redis 中,允许用户设置最大使用内存大小 server.maxmemory,在内存限定的情况下是很有用的。譬如,在一台 8G 机子上部署了 4 个 Redis 服务点,每一个服务点分配 1G 的内存大小,减少内存紧张的情况,由此获取更为稳健的服务。Redis 内存数据集大小上升到一定大小的时候,就会施行数据淘汰策略。Redis 提供 6 种数据淘汰策略:
volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用
的数据淘汰
volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数
volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据
allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰
allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰
no-enviction(驱逐):禁止驱逐数据
Redis 确定驱逐某个键值对后,会删除这个数据,并将这个数据变更消息发布到本地(AOF 持久化)和从机(主从连接)。
LRU 数据淘汰机制
在服务器配置中保存了 lru 计数器 server.lrulock,会定时(Redis 定时程序serverCorn())更新,server.lrulock 的值是根据 server.unixtime 计算出来的。
// redisServer 保存了lru 计数器
struct redisServer {
unsigned lruclock:22; /* Clock incrementing every minute, for LRU */
另外,从 struct redisObject 中可以发现,每一个 Redis 对象都会设置相应的 lru,即最近访问的时间。可以想象的是,每一次访问数据的时候,会更新 redisObject.lru。
LRU 数据淘汰机制是这样的:在数据集中随机挑选几个键值对,取出其中 lru 最大的键值对淘汰。所以,你会发现,Redis 并不是保证取得所有数据集中最近最少使用(LRU)的键值对,而只是随机挑选的几个键值对中的。
// 每一个redis 对象都保存了lru
#define REDIS_LRU_CLOCK_MAX ((1&&21)-1) /* Max value of obj-&lru */
#define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */
typedef struct redisObject {
// 刚刚好32 bits
// 对象的类型,字符串/列表/集合/哈希表
unsigned type:4;
// 未使用的两个位
unsigned notused:2; /* Not used */
// 编码的方式,redis 为了节省空间,提供多种方式来保存一个数据
// 譬如:“” 会被存储为整数
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
// 数据指针
// redis 定时执行程序。联想:linux cron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We have just 22 bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
* 2^22 bits with 10 seconds resolution is more or less 1.5 years.
Note that even if this will wrap after 1.5 years it's not a problem,
* everything will still work but just some object will appear younger
* to Redis. But for this to happen a given object should never be touched
* for 1.5 years.
Note that you can change the resolution altering the
* REDIS_LRU_CLOCK_RESOLUTION define.
updateLRUClock();
// 更新服务器的lru 计数器
void updateLRUClock(void) {
server.lruclock = (server.unixtime/REDIS_LRU_CLOCK_RESOLUTION) &
REDIS_LRU_CLOCK_MAX;
TTL 数据淘汰机制
Redis 数据集数据结构中保存了键值对过期时间的表,即 redisDb.expires,在使用 SET 命令的时候,就有一个键值对超时时间的选项。和 LRU 数据淘汰机制类似,TTL 数据淘汰机制是这样的:从过期时间 redisDB.expires 表中随机挑选几个键值对,取出其中 ttl 最大的键值对淘汰。同样你会发现,Redis 并不是保证取得所有过期时间的表中最快过期的键值对,而只是随机挑选的几个键值对中的。
无论是什么机制,都是从所有的键值对中挑选合适的淘汰。
在哪里开始淘汰数据
Redis 每服务客户端执行一个命令的时候,会检测使用的内存是否超额。如果超额,即进行数据淘汰。
// 执行命令
int processCommand(redisClient *c) {
// 内存超额
/* Handle the maxmemory directive.
First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if ((c-&cmd-&flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
这是我们之前讲述过的命令处理函数。在处理命令处理函数的过程,会涉及到内存使用量的检测,如果检测到内存使用超额,会触发数据淘汰机制。我们来看看淘汰机制触发的函数 freeMemoryIfNeeded() 里面发生了什么。
// 如果需要,是否一些内存
int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_
int slaves = listLength(server.slaves);
// redis 从机回复空间和AOF 内存大小不计算入redis 内存大小
// 关于已使用内存大小是如何统计的,我们会其他章节讲解,这里先忽略这个细节
/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
mem_used = zmalloc_used_memory();
// 从机回复空间大小
if (slaves) {
listNode *
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = listNodeValue(ln);
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
if (obuf_bytes & mem_used)
mem_used = 0;
mem_used -= obuf_
// server.aof_buf && server.aof_rewrite_buf_blocks
if (server.aof_state != REDIS_AOF_OFF) {
mem_used -= sdslen(server.aof_buf);
mem_used -= aofRewriteBufferSize();
// 内存是否超过设置大小
/* Check if we are over the memory limit. */
if (mem_used &= server.maxmemory) return REDIS_OK;
// redis 中可以设置内存超额策略
if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
return REDIS_ERR; /* We need to free memory, but policy forbids. */
/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.
mem_freed = 0;
while (mem_freed & mem_tofree) {
int j, k, keys_freed = 0;
// 遍历所有数据集
for (j = 0; j & server. j++) {
long bestval = 0; /* just to prevent warning */
sds bestkey = NULL;
struct dictEntry *
redisDb *db = server.db+j;
// 不同的策略,选择的数据集不一样
if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM)
dict = server.db[j].
dict = server.db[j].
// 数据集为空,继续下一个数据集
if (dictSize(dict) == 0)
// 随机淘汰随机策略:随机挑选
/* volatile-random and allkeys-random policy */
if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM)
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
// LRU 策略:挑选最近最少使用的数据
/* volatile-lru and allkeys-lru policy */
else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
// server.maxmemory_samples 为随机挑选键值对次数
// 随机挑选server.maxmemory_samples 个键值对,驱逐最近最少使用的数据
for (k = 0; k & server.maxmemory_ k++) {
// 随机挑选键值对
de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
/* When policy is volatile-lru we need an additional lookup
* to locate the real key, as dict is set to db-&expires. */
if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
de = dictFind(db-&dict, thiskey);
o = dictGetVal(de);
// 计算数据的空闲时间
thisval = estimateObjectIdleTime(o);
// 当前键值空闲时间更长,则记录
/* Higher idle time is better candidate for deletion */
if (bestkey == NULL || thisval & bestval) {
// TTL 策略:挑选将要过期的数据
/* volatile-ttl */
else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
// server.maxmemory_samples 为随机挑选键值对次数
// 随机挑选server.maxmemory_samples 个键值对,驱逐最快要过期的数据
for (k = 0; k & server.maxmemory_ k++) {
de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
thisval = (long) dictGetVal(de);
/* Expire sooner (minor expire unix timestamp) is better
* candidate for deletion */
if (bestkey == NULL || thisval & bestval) {
// 删除选定的键值对
/* Finally remove the selected key. */
if (bestkey) {
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 发布数据更新消息,主要是AOF 持久化和从机
propagateExpire(db,keyobj);
// 注意, propagateExpire() 可能会导致内存的分配,
// propagateExpire() 提前执行就是因为redis 只计算
// dbDelete() 释放的内存大小。倘若同时计算dbDelete()
// 释放的内存和propagateExpire() 分配空间的大小,与此
// 同时假设分配空间大于释放空间,就有可能永远退不出这个循环。
// 下面的代码会同时计算dbDelete() 释放的内存和propagateExpire() 分配空间的大小// propagateExpire(db,keyobj);
// delta = (long long) zmalloc_used_memory();
// dbDelete(db,keyobj);
// delta -= (long long) zmalloc_used_memory();
// mem_freed +=
/////////////////////////////////////////
/* We compute the amount of memory freed by dbDelete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
// 只计算dbDelete() 释放内存的大小
delta = (long long) zmalloc_used_memory();
dbDelete(db,keyobj);
delta -= (long long) zmalloc_used_memory();
mem_freed +=
server.stat_evictedkeys++;
// 将数据的删除通知所有的订阅客户端
notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
keyobj, db-&id);
decrRefCount(keyobj);
keys_freed++;
// 将从机回复空间中的数据及时发送给从机
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
// 未能释放空间,且此时redis 使用的内存大小依旧超额,失败返回
if (!keys_freed) return REDIS_ERR; /* nothing to free... */
return REDIS_OK;

我要回帖

更多关于 内存数据库redis 的文章

 

随机推荐