Skip to content

Kafka 消息队列

基础概念

名称介绍权限
Topic队列,生产者生产的消息存储在某个topic中,topic可以被多个消费者订阅,消费者从订阅的topic中获取消息
ProducerGroup生产组,针对同一个集群, 我们建议一个应用对应一个生产组,不同的集群需要申请不同的生产组, 生产组和生产者是一对多的关系,生产组可以根据不同的topic生成对应的生产者,kafka根据生产组做相应的权限控制1.只有申请生产组时指定的应用才有权限启动kafka
2.生产组只能往指定的topic发送消息
Producer生产者,发送消息的实体
ConsumerGroup消费组,不同的集群需要申请不同的消费组, 消费组和消费者是一对多的关系,消费组可以根据不同的topic生成对应的消费者, kafka根据消费组做相应的权限控制1.只有申请消费组时指定的应用才有权限启动kafka
2.消费组只能消费指定的topic的消息
Consumer消费者,消费消息的实体

Topic:表示一类相同类型的消息。可以理解为表或队列

Partition (分片):一个Topic包含多partition,消息存储在这些partition中。

Offset:topic+partition+offset唯一确定一条消息

消费组 (ConsumerGroup) 包含多个消费者 (Consumer),每个Consumer消费Topic中的一部分消息;每个partition只能被分配给一个消费者消费,当partition数小于消费者数量时,会有消费者分配不到partition,因此消费不到消息(流量不均)。

一个消费组中所有消费者消费topic里的所有消息,2个不同的消费组会各自消费一份topic里的所有消息,2个消费组的消费进度无关,topic里的消息删除和消费进度无关,只和保留时间有关。

生产组(ProducerGroup)包含多个生产者,生产者可以选择同步或者异步发送消息,其中异步发送可以提高攒批效果,提高吞吐,降低客户端和服务端的CPU开销。

最佳实践

一、发送重试

推荐使用发送重试机制,重试会换分片partition发送;降低消息丢失的风险。

二、消息不带key,提升可用性

仅在需要顺序消费消息时,才需要带key发送消息;发送时会根据key的哈希值,选择固定的partition;带key的消息,重试时不会换分片;

以下场景可以减少报错

  • kafka服务端单机短时间不可用,比如云厂商虚拟机冻结维护;
  • kafka服务端单机宕机;
  • kafka服务端运维:下个版本运维时也不会产生报错了;

三、通过异步发送降低成本

同步发送的优势是代码简单,异常处理简单,但在消息量大时,成本很高;

异步发送 -> 消息攒在内存里 -> 攒够一定时间或一定大小后发送,异步发送可以有效减少业务服务自身和kafka集群的CPU开销,同时增加了攒批,往往也能增加压缩率,减少存储成本;

异步发送是否会丢数据?

  • 如果服务端返回错误或发送超时,会回调用户代码处理异常。异步发送同样自带重试版本
  • 如果消息还没发出去,机器宕机了,那么会丢数据。合理设置 linger.ms

四、消息处理异常解决

怎么算失败?

消费函数返回 RECONSUME_LATER,消费函数抛异常,消费超时 (默认15分钟,可调长)

消费失败之后会发生什么?

消费组支持重试 => 进入重试队列。如果重试次数耗尽,则进入死信;消费组不支持重试 => 丢弃

消费重试和死信?

(1) 从业务topic里消费消息

(2) 消费失败时发送到重试Topic中

(3) 再次尝试消费 (从重试Topic中消费)

(4) 重试消费仍失败,依旧发送到重试Topic

(5) 重试次数耗尽仍失败,发送到死信Topic

(6) 用户在kafka管控转回死信 (到重试Topic),又可以再次被消费到

五、消息去重

kafka无法避免消息重复,需要业务做好去重

去重需要选择一个唯一键,优先选择:业务层的唯一键,如订单号等。这些信息要带在消息体里,也可以选择kafka生成的msgId。

msgId注意事项:

  • 相同事件产生多个msgId:如果服务自己重试发送、上游/app重复请求,可能导致同一个事件会发送多条msgId不同的消息;
  • 不同事件msgId冲突:msgId类似UUID,虽然已经包含IP、时间戳等信息,但仍有极小概率冲突。

六、topic数据保留时间

每个Topic都有自己的保留时间,默认是24小时

  • 超过保留时间的消息会被删除。一旦删除就无法找回
  • 即使没来得及被消费,超时依旧会被删除。
  • 可在kafka管控上提单修改Topic保留时间 (topic owner审批)
  • 建议保留时间最少3小时,给故障处理预留时间

七、关注Lag告警,及时处理死信

消费组lag、死信默认都有告警,但都是普通告警

  • 为消费组设置合理的lag告警阈值与告警级别
  • 对重要消费组请设置电话告警。普通告警过多容易忽略。
  • 可以为电话告警、普通告警设置不同的阈值。
  • 死信Topic也有保留时间 (3天),请及时处理,否则也会过期被删除!

八、关注消费Outofrange

消费OutOfRange

  • 消费速度 < 删除速度 ==> timelag不断上升
  • timelag > TopicTTL ==> 未被消费的消息过期删除了!
  • 关注自己服务中的Event打点:kafkaConsumerOutOfRange
  • 可以自行在monitor上配置告警
  • kafka提供预警:『消费组消费慢有丢数据风险』(普通告警)

九、延时消息和延迟消费

希望发送的消息一个小时后/一天后/明天0点才消费到,如何实现?

  • 业务发送延时消息的TPS不能太高:处理延时消息的开销比普通消息高很多倍
  • 延时消息无法精确到达,会稍微延后一小段时间投递
  • 最小与最大延时时间:
  • 延时消息的最小延时时间为 500ms
  • 延时消息最大默认不超过3天 (可联系kafka管理员调大,最大不能超过15天)
  • 延迟消费最大不能超过topic的保留时间

十、事务消息

十一、顺序消息

顺序消息只能尽量保证有序,并不能保证完全不出现乱序的情况

  • 情况1: 增加分片数。同一个key的消息会发送到不同的分片中。
  • 情况2: 在消费者发版、重启、扩缩容时,由于同一个partition会被分配给不同的消费者消费,所以还是会出现乱序 (重复消费)。
  • 乱序的情况是这样的:
  • A1, A2, ..., Am, Am+1,..., An, Am, Am+1, ... An, An+1, An+2, ....
  • 中间 Am, Am+1, ..., An 这一段会重复消费

十二、消费过滤

kafka提供的消费过滤是在kafka服务端做过滤

  • 可以减少服务端的出带宽,但会增加服务端的CPU开销
  • 不推荐优先考虑该功能:
  • 业务可自行在消费者消费逻辑里做过滤
  • Producer改造:按消息类型拆分成多个topic

十三、批量消费

一般业务用不到。少数适合的场景:

  • 多条消息的处理流程中,可以批量读写数据库等存储系统,或批量调用下游RPC,减少存储系统、下游服务的开销。
  • 注意事项:
  • 会丢失链路追踪标记:比如预发标记、压测标记。
  • 会增加内存开销:可能出现OOM,或GC压力变大

十四、全链路标记透传

公司的 链路追踪组件 提供了标记透传能力,包含了预发、压测等标记

  • kafka-client会自动透传 链路追踪组件 里的所有标记:
  • 发送消息时,kafka-client会自动将所有 链路追踪标记 都加到消息的 header 里
  • 消费消息时,kafka-client会自动将消息 header 里的 链路追踪标记 都设置到当前线程的 TraceContext 中 (批量消费不会自动设置)

架构详解

Kafka队列的生产者和消费者如何跟Kafka通信?

核心架构概览

首先,要理解Kafka的通信模型是基于 "客户端-服务器" 模式的:

服务器端:Kafka集群,由多个Broker(服务器节点)组成。

客户端:生产者(Producer)和消费者(Consumer)。

通信协议:使用Kafka自定义的、基于TCP的二进制协议。这比HTTP等通用协议更高效、开销更小。

生产者与Kafka的通信流程

生产者的核心任务是:将消息发布(发送)到指定Topic的特定分区上。

1. 初始化与元数据获取

生产者启动时,会连接到 bootstrap.servers 配置中指定的一个或多个Broker。

向这些Broker发送元数据请求,获取整个集群的视图,包括:

  • 有哪些Topic,每个Topic有哪些分区。
  • 每个分区的Leader副本在哪个Broker上。
  • 所有Broker的地址和端口。

这些元数据会被缓存在生产者本地,并定期刷新(例如,当检测到新的Broker或分区Leader发生变化时)。

2. 序列化与分区

生产者将用户提供的消息Key和Value对象,通过配置的序列化器(如 StringSerializer, JsonSerializer)转换为字节数组。

然后,根据配置的分区器确定这条消息应该被发送到Topic的哪个分区。

如果指定了Key,默认分区器会对Key进行哈希,确保相同Key的消息总是进入同一个分区。

如果没有Key,则会使用轮询等策略。

3. 批量发送与网络通信

消息不会立即被发送。生产者有一个内存缓冲区(RecordAccumulator),它会将发往同一分区的多条消息组合成一个批次。

这样做极大地提高了吞吐量,减少了网络IO次数。

一个独立的Sender线程在后台运行,它不断地从缓冲区中取出已准备好的批次(满足 batch.size 或 linger.ms 条件)。

4. 发送生产请求

Sender线程根据它缓存的元数据,知道每个分区的Leader Broker在哪里。

它向目标分区的Leader Broker建立TCP连接,并发送 PRODUCE请求,其中包含一个或多个消息批次。

5. Broker处理与响应

Leader Broker收到请求后,会将消息写入其分区日志(追加到磁盘上的文件)。

写入成功后,Broker会向生产者发送一个响应。

生产者可以通过 acks 配置来控制等待何种程度的确认:

  • acks=0:"发后即忘"。不等待任何确认,吞吐量最高,但可能丢失消息。
  • acks=1:默认值。等待Leader写入成功即可。在吞吐量和可靠性之间折衷。
  • acks=all:等待Leader和所有ISR(同步副本)都写入成功。最可靠,但延迟最高。

6. 处理响应

生产者收到响应后,如果是成功,则会清空已发送的批次,并可能执行用户设置的回调函数。

如果失败(如网络问题、非Leader错误),并且配置了重试(retries),生产者会重新发送消息。

消费者与Kafka的通信流程

消费者的核心任务是:从指定Topic的分区上拉取并处理消息。

其通信流程主要围绕 "拉取消息" 和 "协调管理" 两大核心活动展开。

1. 加入消费者组与再平衡

消费者启动时,通过 group.id 标识自己属于哪个消费者组。

它会向Broker发送 FIND_COORDINATOR请求,找到负责它所在消费者组的 Group Coordinator(组协调者,是集群中的一个Broker)。

消费者然后向Coordinator发送 JOIN_GROUP请求。

Coordinator会触发再平衡,重新分配组内所有消费者需要消费的分区,并通过 SYNC_GROUP请求 将分配方案下发给每个消费者。

2. 拉取消息

消费者知道自己要消费哪些分区后,会直接向这些分区的Leader Broker建立TCP连接。

消费者主动向Broker发送 FETCH请求 来拉取消息。

这是一个长轮询的过程:如果分区末端没有新消息,Broker会保持这个请求一段时间(fetch.max.wait.ms),直到有新消息到达或超时,然后才返回响应。这避免了无效的频繁轮询。

消费者可以控制每次拉取的数据量(max.poll.records, fetch.max.bytes)。

3. 提交偏移量

消费者处理完消息后,需要告知Kafka它消费到了哪个位置,这个位置就是偏移量。

消费者会定期向Group Coordinator所在的Broker发送 OFFSET_COMMIT请求,将当前消费的偏移量提交到Kafka内部的 __consumer_offsets Topic中。

这样,当消费者重启或发生再平衡后,新的消费者可以从这个提交的偏移量位置继续消费。

4. 心跳与会话保持

消费者在后台会定期向Group Coordinator发送 HEARTBEAT请求,表明自己"存活"。

如果Coordinator在 session.timeout.ms 时间内没有收到某个消费者的心跳,就会认为它已经失效,并触发新一轮的再平衡,将它负责的分区重新分配给其他活着的消费者。

总结:通信模式对比

方面生产者消费者
通信方向推:主动将消息发送给Broker。拉:主动从Broker拉取消息。
关键请求PRODUCE(生产), METADATA(元数据)FETCH(拉取), JOIN_GROUP(入组), OFFSET_COMMIT(提交偏移量), HEARTBEAT(心跳)
连接对象主要连接分区的Leader Broker。既连接分区的Leader Broker(拉消息),也连接Group Coordinator(管理组)。
核心机制批处理、压缩、异步回调。再平衡、偏移量管理、长轮询。
数据保证通过 acks 配置控制可靠性。通过提交偏移量来记录消费进度,默认"至少一次"。

总而言之,生产者和消费者都是作为智能客户端,通过高效的二进制协议与Kafka Broker进行直接通信,各自承担着消息传递链路中不同角色的职责。

重复消费和消费乱序问题产生的原因?

核心矛盾:分区顺序 vs. 消费者组再平衡

Kafka的顺序保证是分区级别的:

Kafka只能保证同一个分区(Partition)内的消息被顺序消费。

生产者将消息发送到Topic时,如果指定了相同的Key,这些消息会被路由到同一个分区。

在一个分区内部,消息严格按照偏移量(Offset)的顺序存储。

消费者组与分区分配:

一个消费者组(Consumer Group)共同消费一个Topic。

Topic的每个分区在同一时间只能被组内的一个消费者消费。

这种分配关系是由Kafka的"再平衡(Rebalance)"机制来动态管理的。

问题发生的具体场景分析

当消费者发生发版、重启、扩缩容时,会触发再平衡。这个过程如下:

触发再平衡:一个消费者离开组(如被关闭)或新消费者加入组(如扩容),协调者(Coordinator)会通知组内所有消费者:"情况有变,我们需要重新分配分区"。

暂停消费:所有消费者都会停止消费,等待新的分配方案。

重新分配分区:协调者根据分配策略(如Range或RoundRobin)计算出新的分区分配方案。例如:

  • 之前:消费者A消费分区0,消费者B消费分区1。
  • 消费者B重启后:协调者可能将分区1重新分配给消费者A。现在消费者A同时消费分区0和分区1。

恢复消费:消费者根据新的分配方案,从各自分配到的分区的最新提交偏移量(Committed Offset)开始消费。

为什么会导致"乱序"和"重复消费"?

现在,我们结合上面的场景,来看问题是如何产生的:

1. 乱序

这里的"乱序"不是指消息在分区内存储的顺序乱了,而是指业务层面感知到的处理顺序乱了。

假设场景:

Topic:order_events,分区0。

消息流(按顺序):订单创建 -> 订单付款 -> 订单发货。

正常流程:

消费者C1持续消费分区0,按顺序处理这三个消息。

发生再平衡时:

消费者C1正在处理订单付款消息,但还没来得及提交偏移量就崩溃或被重启了。

再平衡触发,分区0被分配给了新的消费者C2。

消费者C2从最后一次提交的偏移量开始消费。假设C1只提交了订单创建的偏移量。

结果:C2会重新消费订单付款和订单发货。

业务层看到的顺序:

订单创建 (C1处理) -> 订单付款 (C1处理) -> 订单付款 (C2重新处理) -> 订单发货 (C2处理)

你发现了吗?对于"订单付款"这个业务操作,它被处理了两次,并且第二次处理发生在"订单发货"之后,这在业务逻辑上就是严重的"乱序"。

2. 重复消费

从上面的例子可以清晰地看到,"乱序"的根本原因正是"重复消费"。

根本原因:消费者在处理完消息后、提交偏移量之前发生了失败或再平衡。

结果:新的消费者会从之前已提交的偏移量开始消费,导致一部分已经被处理过的消息被再次处理。

Kafka默认的提交方式是自动提交,它定期在后台提交偏移量。这更容易导致重复消费,因为可能在批量处理消息时,已经处理了10条,但只自动提交了第5条的偏移量,那么再平衡后,第6到10条会被重复消费。

总结

阶段核心机制导致的问题
顺序保证分区内消息有序这是实现顺序消费的基础。
弹性伸缩消费者组再平衡这是破坏顺序消费的触发条件。
偏移量管理异步/定期提交这是导致重复消费和乱序的根本原因。消费者在"处理消息"和"提交偏移量"这两个动作之间存在时间差。
最终结果-业务层面的乱序,本质是由于消息的重复消费和处理进程的切换共同造成的。

如何缓解或解决?

完全避免在再平衡时的乱序和重复是非常困难的,但可以通过一些策略来缓解:

将并发消费者数量降至1:

最简单粗暴的方法。一个Topic只设置一个分区,并且消费者组里只有一个消费者。这样永远不会有再平衡和分配问题。

缺点:完全丧失了并行处理和横向扩展的能力,性能瓶颈明显。

在业务层实现幂等性:

这是最常用、最有效的解决方案。承认消息可能会重复,但在处理逻辑上保证即使收到重复消息,也不会产生错误结果。

例如:处理"订单付款"消息时,先检查数据库该订单是否已支付过,如果已支付则直接忽略此消息。

精细控制偏移量提交:

关闭自动提交(enable.auto.commit=false)。

在处理完一条消息后,立即同步提交偏移量(commitSync())。

缺点:严重牺牲性能,因为每条消息都要与Broker进行同步通信。并且如果消息处理成功但提交失败,仍然会导致重复消费。

在再平衡监听器中处理:

实现ConsumerRebalanceListener接口,在分区被撤销时(onPartitionsRevoked)立即提交偏移量,减少偏移量提交的延迟。

但这并不能完全保证,因为消费者可能已经崩溃,无法执行这个回调。

结论:

Kafka在设计上为了极致的吞吐量和扩展性,在消费者端牺牲了"精确一次"和"严格顺序"的保证。理解和接受"至少一次"交付语义以及可能出现的乱序,并通过业务层的幂等性设计来弥补,是正确使用Kafka的最佳实践。

kafka的lag指标是什么?

Kafka 中的 Lag(全称 Consumer Lag,也叫消费滞后量或消费积压量)是衡量消费者处理进度的一个核心监控指标。它表示在某个时间点,消费者尚未消费的消息总数。简单来说,就是生产者已经"跑"了多远,而消费者还落后多少。

为了帮你更直观地理解,我们先来看看它的核心计算公式:

Lag = LOG-END-OFFSET(分区最新消息的位置) - CURRENT-OFFSET(消费者组的当前消费位置)

为什么Lag指标至关重要?

持续关注Lag指标非常重要,因为它直接反映了消费者群体的健康状况,Lag过大可能带来一系列问题:

  • 处理延迟:业务逻辑无法获得实时数据,影响系统实时性。
  • 资源压力:大量积压的消息会占用Broker的磁盘空间。
  • 消息丢失风险:这是最需要警惕的一点。Kafka的消息有留存时间(例如默认7天),如果消费者慢到要消费的数据已被删除,当它重新连接时,可能会从最新的位移开始消费,导致积压期间的消息被跳过,造成事实上的消息丢失。

Lag过大怎么办?

如果你发现Lag持续增长,可以考虑从以下方面进行优化:

  • 提升消费者处理能力:优化消费逻辑、引入多线程或异步处理。
  • 增加消费者实例:通过增加消费者数量(但不要超过分区数)来提升整体消费能力。
  • 调整生产者速率:如果可能,适当控制生产者的发送速率。
  • 避免频繁重平衡:合理配置会话超时等参数,减少不必要的消费者组重平衡。

Kafka的延迟消息和延时消费功能原理?

由于Kafka未直接暴露延迟消息API,实践中主要通过以下几种架构模式来实现。下表对比了三种主流方案的特点:

方案核心原理适用场景
多级Topic + 时间轮询创建多个不同延迟级别的Topic(如delay_5s, delay_1m)。生产者按需发送,独立消费者组轮询对应Topic,到期后转发至业务Topic。最常用方案。适用于延迟级别固定、可控性要求高的场景,如订单超时处理(30分钟未支付)。
外部存储 + 定时扫描将消息和到期时间存入外部存储(如Redis Sorted Set或DB)。定时任务扫描到期消息并投递至Kafka。延迟时间灵活、精度要求高,且愿意引入外部组件。
Kafka Streams 时间窗口利用Kafka Streams的窗口操作(如suppress())来抑制消息,达到延迟效果。主要用于流处理场景,适合基于事件时间的固定窗口延迟计算。

Move fast and break things