当前位置:
首页
文章
前端
详情

Kafka配置消息保存时间的方法

配置参考

然后不废话,直接贴最终的关键配置:

# 想实现消息队列中保存2小时的消息,那么配置应该像这样:
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0

Kafka日志存储机制分析

首先先贴一下对我帮助很大的一篇文档: https://www.zybuluo.com/jewes/note/64450 。这篇文章详细阐述了kafka的日志存储机制,建议深入阅读。

然后我简述一下。我们将对照kafka的broker配置简单说明一下: http://kafka.apache.org/documentation.html#brokerconfigs

Kafka的持久化策略更像redis —— 数据都在内存中,定期flush到硬盘上持久化存储,以保证重启的时候数据不丢。flush策略由log.flush.*这些properties控制。

每个topic可以存储于多个partition,每个partition在kafka的log目录下表现为topicname-id这样的文件夹,如mytopic-0。kafka队列中的内容会按照日志的形式持久化到硬盘上。每个日志文件称为“段”(segment)。

Kafka清理队列中过期message的方式实际上就是删除过期的segment,这种表现形式十分类似于日志滚动。因此,控制kafka队列中消息的保存时间的方式实际上就是日志文件定期分段,然后定期清理掉过期的段文件

与控制分段策略相关的几个properties:

log.roll.{hours,ms} —— 日志滚动的周期时间(小时,毫秒,log.roll.ms优先级更高),到达指定周期时强制生成一个新的segment。
log.segment.bytes —— 每个segment的最大容量上限(默认1GB)。到达指定容量时会强制生成一个新的segment。

与过期segment处理策略相关的几个properties:

cleanup.policy={compact,delete} —— 过期segment处理算法,默认delete。
log.retention.{hours,minutes,ms} —— 日志保留时间(小时,分钟,毫秒。优先级依次升高),超出保留时间的日志执行cleanup.policy定义的操作
log.segment.delete.delay.ms —— 删除日志文件前的保留一段时间。默认60000。
log.retention.check.interval.ms —— log checker的检测是否需要删除文件的周期。默认300000。

现在解释一下开头的配置段的含义——每小时滚动一个日志文件,日志删除(cleanup.policy默认为delete)时间为2小时,

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。