Kafka 消息队列
基础概念
| 名称 | 介绍 | 权限 |
|---|---|---|
| Topic | 队列,生产者生产的消息存储在某个topic中,topic可以被多个消费者订阅,消费者从订阅的topic中获取消息 | |
| ProducerGroup | 生产组,针对同一个集群, 我们建议一个应用对应一个生产组,不同的集群需要申请不同的生产组, 生产组和生产者是一对多的关系,生产组可以根据不同的topic生成对应的生产者,kafka根据生产组做相应的权限控制 | 1.只有申请生产组时指定的应用才有权限启动kafka 2.生产组只能往指定的topic发送消息 |
| Producer | 生产者,发送消息的实体 | |
| ConsumerGroup | 消费组,不同的集群需要申请不同的消费组, 消费组和消费者是一对多的关系,消费组可以根据不同的topic生成对应的消费者, kafka根据消费组做相应的权限控制 | 1.只有申请消费组时指定的应用才有权限启动kafka 2.消费组只能消费指定的topic的消息 |
| Consumer | 消费者,消费消息的实体 |
Topic:表示一类相同类型的消息。可以理解为表或队列
Partition (分片):一个Topic包含多partition,消息存储在这些partition中。
Offset:topic+partition+offset唯一确定一条消息
消费组 (ConsumerGroup) 包含多个消费者 (Consumer),每个Consumer消费Topic中的一部分消息;每个partition只能被分配给一个消费者消费,当partition数小于消费者数量时,会有消费者分配不到partition,因此消费不到消息(流量不均)。
一个消费组中所有消费者消费topic里的所有消息,2个不同的消费组会各自消费一份topic里的所有消息,2个消费组的消费进度无关,topic里的消息删除和消费进度无关,只和保留时间有关。
生产组(ProducerGroup)包含多个生产者,生产者可以选择同步或者异步发送消息,其中异步发送可以提高攒批效果,提高吞吐,降低客户端和服务端的CPU开销。
最佳实践
一、发送重试
推荐使用发送重试机制,重试会换分片partition发送;降低消息丢失的风险。
二、消息不带key,提升可用性
仅在需要顺序消费消息时,才需要带key发送消息;发送时会根据key的哈希值,选择固定的partition;带key的消息,重试时不会换分片;
以下场景可以减少报错
- kafka服务端单机短时间不可用,比如云厂商虚拟机冻结维护;
- kafka服务端单机宕机;
- kafka服务端运维:下个版本运维时也不会产生报错了;
三、通过异步发送降低成本
同步发送的优势是代码简单,异常处理简单,但在消息量大时,成本很高;
异步发送 -> 消息攒在内存里 -> 攒够一定时间或一定大小后发送,异步发送可以有效减少业务服务自身和kafka集群的CPU开销,同时增加了攒批,往往也能增加压缩率,减少存储成本;
异步发送是否会丢数据?
- 如果服务端返回错误或发送超时,会回调用户代码处理异常。异步发送同样自带重试版本
- 如果消息还没发出去,机器宕机了,那么会丢数据。合理设置 linger.ms
四、消息处理异常解决
怎么算失败?
消费函数返回 RECONSUME_LATER,消费函数抛异常,消费超时 (默认15分钟,可调长)
消费失败之后会发生什么?
消费组支持重试 => 进入重试队列。如果重试次数耗尽,则进入死信;消费组不支持重试 => 丢弃
消费重试和死信?
(1) 从业务topic里消费消息
(2) 消费失败时发送到重试Topic中
(3) 再次尝试消费 (从重试Topic中消费)
(4) 重试消费仍失败,依旧发送到重试Topic
(5) 重试次数耗尽仍失败,发送到死信Topic
(6) 用户在kafka管控转回死信 (到重试Topic),又可以再次被消费到
五、消息去重
kafka无法避免消息重复,需要业务做好去重
去重需要选择一个唯一键,优先选择:业务层的唯一键,如订单号等。这些信息要带在消息体里,也可以选择kafka生成的msgId。
msgId注意事项:
- 相同事件产生多个msgId:如果服务自己重试发送、上游/app重复请求,可能导致同一个事件会发送多条msgId不同的消息;
- 不同事件msgId冲突:msgId类似UUID,虽然已经包含IP、时间戳等信息,但仍有极小概率冲突。
六、topic数据保留时间
每个Topic都有自己的保留时间,默认是24小时
- 超过保留时间的消息会被删除。一旦删除就无法找回
- 即使没来得及被消费,超时依旧会被删除。
- 可在kafka管控上提单修改Topic保留时间 (topic owner审批)
- 建议保留时间最少3小时,给故障处理预留时间
七、关注Lag告警,及时处理死信
消费组lag、死信默认都有告警,但都是普通告警
- 为消费组设置合理的lag告警阈值与告警级别
- 对重要消费组请设置电话告警。普通告警过多容易忽略。
- 可以为电话告警、普通告警设置不同的阈值。
- 死信Topic也有保留时间 (3天),请及时处理,否则也会过期被删除!
八、关注消费Outofrange
消费OutOfRange
- 消费速度 < 删除速度 ==> timelag不断上升
- timelag > TopicTTL ==> 未被消费的消息过期删除了!
- 关注自己服务中的Event打点:kafkaConsumerOutOfRange
- 可以自行在monitor上配置告警
- kafka提供预警:『消费组消费慢有丢数据风险』(普通告警)
九、延时消息和延迟消费
希望发送的消息一个小时后/一天后/明天0点才消费到,如何实现?
- 业务发送延时消息的TPS不能太高:处理延时消息的开销比普通消息高很多倍
- 延时消息无法精确到达,会稍微延后一小段时间投递
- 最小与最大延时时间:
- 延时消息的最小延时时间为 500ms
- 延时消息最大默认不超过3天 (可联系kafka管理员调大,最大不能超过15天)
- 延迟消费最大不能超过topic的保留时间
十、事务消息
十一、顺序消息
顺序消息只能尽量保证有序,并不能保证完全不出现乱序的情况
- 情况1: 增加分片数。同一个key的消息会发送到不同的分片中。
- 情况2: 在消费者发版、重启、扩缩容时,由于同一个partition会被分配给不同的消费者消费,所以还是会出现乱序 (重复消费)。
- 乱序的情况是这样的:
- A1, A2, ..., Am, Am+1,..., An, Am, Am+1, ... An, An+1, An+2, ....
- 中间 Am, Am+1, ..., An 这一段会重复消费
十二、消费过滤
kafka提供的消费过滤是在kafka服务端做过滤
- 可以减少服务端的出带宽,但会增加服务端的CPU开销
- 不推荐优先考虑该功能:
- 业务可自行在消费者消费逻辑里做过滤
- Producer改造:按消息类型拆分成多个topic
十三、批量消费
一般业务用不到。少数适合的场景:
- 多条消息的处理流程中,可以批量读写数据库等存储系统,或批量调用下游RPC,减少存储系统、下游服务的开销。
- 注意事项:
- 会丢失链路追踪标记:比如预发标记、压测标记。
- 会增加内存开销:可能出现OOM,或GC压力变大
十四、全链路标记透传
公司的 链路追踪组件 提供了标记透传能力,包含了预发、压测等标记
- kafka-client会自动透传 链路追踪组件 里的所有标记:
- 发送消息时,kafka-client会自动将所有 链路追踪标记 都加到消息的 header 里
- 消费消息时,kafka-client会自动将消息 header 里的 链路追踪标记 都设置到当前线程的 TraceContext 中 (批量消费不会自动设置)
架构详解
Kafka队列的生产者和消费者如何跟Kafka通信?
核心架构概览
首先,要理解Kafka的通信模型是基于 "客户端-服务器" 模式的:
服务器端:Kafka集群,由多个Broker(服务器节点)组成。
客户端:生产者(Producer)和消费者(Consumer)。
通信协议:使用Kafka自定义的、基于TCP的二进制协议。这比HTTP等通用协议更高效、开销更小。
生产者与Kafka的通信流程
生产者的核心任务是:将消息发布(发送)到指定Topic的特定分区上。
1. 初始化与元数据获取
生产者启动时,会连接到 bootstrap.servers 配置中指定的一个或多个Broker。
向这些Broker发送元数据请求,获取整个集群的视图,包括:
- 有哪些Topic,每个Topic有哪些分区。
- 每个分区的Leader副本在哪个Broker上。
- 所有Broker的地址和端口。
这些元数据会被缓存在生产者本地,并定期刷新(例如,当检测到新的Broker或分区Leader发生变化时)。
2. 序列化与分区
生产者将用户提供的消息Key和Value对象,通过配置的序列化器(如 StringSerializer, JsonSerializer)转换为字节数组。
然后,根据配置的分区器确定这条消息应该被发送到Topic的哪个分区。
如果指定了Key,默认分区器会对Key进行哈希,确保相同Key的消息总是进入同一个分区。
如果没有Key,则会使用轮询等策略。
3. 批量发送与网络通信
消息不会立即被发送。生产者有一个内存缓冲区(RecordAccumulator),它会将发往同一分区的多条消息组合成一个批次。
这样做极大地提高了吞吐量,减少了网络IO次数。
一个独立的Sender线程在后台运行,它不断地从缓冲区中取出已准备好的批次(满足 batch.size 或 linger.ms 条件)。
4. 发送生产请求
Sender线程根据它缓存的元数据,知道每个分区的Leader Broker在哪里。
它向目标分区的Leader Broker建立TCP连接,并发送 PRODUCE请求,其中包含一个或多个消息批次。
5. Broker处理与响应
Leader Broker收到请求后,会将消息写入其分区日志(追加到磁盘上的文件)。
写入成功后,Broker会向生产者发送一个响应。
生产者可以通过 acks 配置来控制等待何种程度的确认:
- acks=0:"发后即忘"。不等待任何确认,吞吐量最高,但可能丢失消息。
- acks=1:默认值。等待Leader写入成功即可。在吞吐量和可靠性之间折衷。
- acks=all:等待Leader和所有ISR(同步副本)都写入成功。最可靠,但延迟最高。
6. 处理响应
生产者收到响应后,如果是成功,则会清空已发送的批次,并可能执行用户设置的回调函数。
如果失败(如网络问题、非Leader错误),并且配置了重试(retries),生产者会重新发送消息。
消费者与Kafka的通信流程
消费者的核心任务是:从指定Topic的分区上拉取并处理消息。
其通信流程主要围绕 "拉取消息" 和 "协调管理" 两大核心活动展开。
1. 加入消费者组与再平衡
消费者启动时,通过 group.id 标识自己属于哪个消费者组。
它会向Broker发送 FIND_COORDINATOR请求,找到负责它所在消费者组的 Group Coordinator(组协调者,是集群中的一个Broker)。
消费者然后向Coordinator发送 JOIN_GROUP请求。
Coordinator会触发再平衡,重新分配组内所有消费者需要消费的分区,并通过 SYNC_GROUP请求 将分配方案下发给每个消费者。
2. 拉取消息
消费者知道自己要消费哪些分区后,会直接向这些分区的Leader Broker建立TCP连接。
消费者主动向Broker发送 FETCH请求 来拉取消息。
这是一个长轮询的过程:如果分区末端没有新消息,Broker会保持这个请求一段时间(fetch.max.wait.ms),直到有新消息到达或超时,然后才返回响应。这避免了无效的频繁轮询。
消费者可以控制每次拉取的数据量(max.poll.records, fetch.max.bytes)。
3. 提交偏移量
消费者处理完消息后,需要告知Kafka它消费到了哪个位置,这个位置就是偏移量。
消费者会定期向Group Coordinator所在的Broker发送 OFFSET_COMMIT请求,将当前消费的偏移量提交到Kafka内部的 __consumer_offsets Topic中。
这样,当消费者重启或发生再平衡后,新的消费者可以从这个提交的偏移量位置继续消费。
4. 心跳与会话保持
消费者在后台会定期向Group Coordinator发送 HEARTBEAT请求,表明自己"存活"。
如果Coordinator在 session.timeout.ms 时间内没有收到某个消费者的心跳,就会认为它已经失效,并触发新一轮的再平衡,将它负责的分区重新分配给其他活着的消费者。
总结:通信模式对比
| 方面 | 生产者 | 消费者 |
|---|---|---|
| 通信方向 | 推:主动将消息发送给Broker。 | 拉:主动从Broker拉取消息。 |
| 关键请求 | PRODUCE(生产), METADATA(元数据) | FETCH(拉取), JOIN_GROUP(入组), OFFSET_COMMIT(提交偏移量), HEARTBEAT(心跳) |
| 连接对象 | 主要连接分区的Leader Broker。 | 既连接分区的Leader Broker(拉消息),也连接Group Coordinator(管理组)。 |
| 核心机制 | 批处理、压缩、异步回调。 | 再平衡、偏移量管理、长轮询。 |
| 数据保证 | 通过 acks 配置控制可靠性。 | 通过提交偏移量来记录消费进度,默认"至少一次"。 |
总而言之,生产者和消费者都是作为智能客户端,通过高效的二进制协议与Kafka Broker进行直接通信,各自承担着消息传递链路中不同角色的职责。
重复消费和消费乱序问题产生的原因?
核心矛盾:分区顺序 vs. 消费者组再平衡
Kafka的顺序保证是分区级别的:
Kafka只能保证同一个分区(Partition)内的消息被顺序消费。
生产者将消息发送到Topic时,如果指定了相同的Key,这些消息会被路由到同一个分区。
在一个分区内部,消息严格按照偏移量(Offset)的顺序存储。
消费者组与分区分配:
一个消费者组(Consumer Group)共同消费一个Topic。
Topic的每个分区在同一时间只能被组内的一个消费者消费。
这种分配关系是由Kafka的"再平衡(Rebalance)"机制来动态管理的。
问题发生的具体场景分析
当消费者发生发版、重启、扩缩容时,会触发再平衡。这个过程如下:
触发再平衡:一个消费者离开组(如被关闭)或新消费者加入组(如扩容),协调者(Coordinator)会通知组内所有消费者:"情况有变,我们需要重新分配分区"。
暂停消费:所有消费者都会停止消费,等待新的分配方案。
重新分配分区:协调者根据分配策略(如Range或RoundRobin)计算出新的分区分配方案。例如:
- 之前:消费者A消费分区0,消费者B消费分区1。
- 消费者B重启后:协调者可能将分区1重新分配给消费者A。现在消费者A同时消费分区0和分区1。
恢复消费:消费者根据新的分配方案,从各自分配到的分区的最新提交偏移量(Committed Offset)开始消费。
为什么会导致"乱序"和"重复消费"?
现在,我们结合上面的场景,来看问题是如何产生的:
1. 乱序
这里的"乱序"不是指消息在分区内存储的顺序乱了,而是指业务层面感知到的处理顺序乱了。
假设场景:
Topic:order_events,分区0。
消息流(按顺序):订单创建 -> 订单付款 -> 订单发货。
正常流程:
消费者C1持续消费分区0,按顺序处理这三个消息。
发生再平衡时:
消费者C1正在处理订单付款消息,但还没来得及提交偏移量就崩溃或被重启了。
再平衡触发,分区0被分配给了新的消费者C2。
消费者C2从最后一次提交的偏移量开始消费。假设C1只提交了订单创建的偏移量。
结果:C2会重新消费订单付款和订单发货。
业务层看到的顺序:
订单创建 (C1处理) -> 订单付款 (C1处理) -> 订单付款 (C2重新处理) -> 订单发货 (C2处理)
你发现了吗?对于"订单付款"这个业务操作,它被处理了两次,并且第二次处理发生在"订单发货"之后,这在业务逻辑上就是严重的"乱序"。
2. 重复消费
从上面的例子可以清晰地看到,"乱序"的根本原因正是"重复消费"。
根本原因:消费者在处理完消息后、提交偏移量之前发生了失败或再平衡。
结果:新的消费者会从之前已提交的偏移量开始消费,导致一部分已经被处理过的消息被再次处理。
Kafka默认的提交方式是自动提交,它定期在后台提交偏移量。这更容易导致重复消费,因为可能在批量处理消息时,已经处理了10条,但只自动提交了第5条的偏移量,那么再平衡后,第6到10条会被重复消费。
总结
| 阶段 | 核心机制 | 导致的问题 |
|---|---|---|
| 顺序保证 | 分区内消息有序 | 这是实现顺序消费的基础。 |
| 弹性伸缩 | 消费者组再平衡 | 这是破坏顺序消费的触发条件。 |
| 偏移量管理 | 异步/定期提交 | 这是导致重复消费和乱序的根本原因。消费者在"处理消息"和"提交偏移量"这两个动作之间存在时间差。 |
| 最终结果 | - | 业务层面的乱序,本质是由于消息的重复消费和处理进程的切换共同造成的。 |
如何缓解或解决?
完全避免在再平衡时的乱序和重复是非常困难的,但可以通过一些策略来缓解:
将并发消费者数量降至1:
最简单粗暴的方法。一个Topic只设置一个分区,并且消费者组里只有一个消费者。这样永远不会有再平衡和分配问题。
缺点:完全丧失了并行处理和横向扩展的能力,性能瓶颈明显。
在业务层实现幂等性:
这是最常用、最有效的解决方案。承认消息可能会重复,但在处理逻辑上保证即使收到重复消息,也不会产生错误结果。
例如:处理"订单付款"消息时,先检查数据库该订单是否已支付过,如果已支付则直接忽略此消息。
精细控制偏移量提交:
关闭自动提交(enable.auto.commit=false)。
在处理完一条消息后,立即同步提交偏移量(commitSync())。
缺点:严重牺牲性能,因为每条消息都要与Broker进行同步通信。并且如果消息处理成功但提交失败,仍然会导致重复消费。
在再平衡监听器中处理:
实现ConsumerRebalanceListener接口,在分区被撤销时(onPartitionsRevoked)立即提交偏移量,减少偏移量提交的延迟。
但这并不能完全保证,因为消费者可能已经崩溃,无法执行这个回调。
结论:
Kafka在设计上为了极致的吞吐量和扩展性,在消费者端牺牲了"精确一次"和"严格顺序"的保证。理解和接受"至少一次"交付语义以及可能出现的乱序,并通过业务层的幂等性设计来弥补,是正确使用Kafka的最佳实践。
kafka的lag指标是什么?
Kafka 中的 Lag(全称 Consumer Lag,也叫消费滞后量或消费积压量)是衡量消费者处理进度的一个核心监控指标。它表示在某个时间点,消费者尚未消费的消息总数。简单来说,就是生产者已经"跑"了多远,而消费者还落后多少。
为了帮你更直观地理解,我们先来看看它的核心计算公式:
Lag = LOG-END-OFFSET(分区最新消息的位置) - CURRENT-OFFSET(消费者组的当前消费位置)
为什么Lag指标至关重要?
持续关注Lag指标非常重要,因为它直接反映了消费者群体的健康状况,Lag过大可能带来一系列问题:
- 处理延迟:业务逻辑无法获得实时数据,影响系统实时性。
- 资源压力:大量积压的消息会占用Broker的磁盘空间。
- 消息丢失风险:这是最需要警惕的一点。Kafka的消息有留存时间(例如默认7天),如果消费者慢到要消费的数据已被删除,当它重新连接时,可能会从最新的位移开始消费,导致积压期间的消息被跳过,造成事实上的消息丢失。
Lag过大怎么办?
如果你发现Lag持续增长,可以考虑从以下方面进行优化:
- 提升消费者处理能力:优化消费逻辑、引入多线程或异步处理。
- 增加消费者实例:通过增加消费者数量(但不要超过分区数)来提升整体消费能力。
- 调整生产者速率:如果可能,适当控制生产者的发送速率。
- 避免频繁重平衡:合理配置会话超时等参数,减少不必要的消费者组重平衡。
Kafka的延迟消息和延时消费功能原理?
由于Kafka未直接暴露延迟消息API,实践中主要通过以下几种架构模式来实现。下表对比了三种主流方案的特点:
| 方案 | 核心原理 | 适用场景 |
|---|---|---|
| 多级Topic + 时间轮询 | 创建多个不同延迟级别的Topic(如delay_5s, delay_1m)。生产者按需发送,独立消费者组轮询对应Topic,到期后转发至业务Topic。 | 最常用方案。适用于延迟级别固定、可控性要求高的场景,如订单超时处理(30分钟未支付)。 |
| 外部存储 + 定时扫描 | 将消息和到期时间存入外部存储(如Redis Sorted Set或DB)。定时任务扫描到期消息并投递至Kafka。 | 延迟时间灵活、精度要求高,且愿意引入外部组件。 |
| Kafka Streams 时间窗口 | 利用Kafka Streams的窗口操作(如suppress())来抑制消息,达到延迟效果。 | 主要用于流处理场景,适合基于事件时间的固定窗口延迟计算。 |