一、MQ简介

MQ:MessageQueue,消息队列,一种中间件。就是用来将同步任务转成异步任务进行处理,主要特点就是**:异步、解耦、削峰**。市面上很多MQ产品比如RocketMQ、RabbitMQ、Kafka、ActiveMQ等。这些都是MQ的不同产品、适用的领域也不同、可以根据当前我们的业务进行选择。

MQ产品 优点 缺点 适合场景
Apache Kafka 吞吐量非常大,性能非常好,集群高可用。 会有丢数据的可能,功能比较单一。 日志分析、大数据采集
RabbitMQ 消息可靠性高,功能全面。 erlang语言不好定制,吞吐量比较低。 企业内部小规模服务调用
Apache Pulsar 基于Bookkeeper构建,消息可靠性非常高。 周边生态还有差距,目前使用的公司比较少。 企业内部大规模服务调用
Apache RocketMQ 高吞吐、高性能、高可用,功能全面,客户端协议丰富,使用Java语言开发,方便定制。 服务加载比较慢。 几乎全场景,特别适合金融场景

二、RocketMQ

RocketMQ官网:

1、RocketMQ运行架构

  • Producer:生产者,用于

  • Message:

    • Topic:粗粒度的业务线/系统边界

      • 定义:最顶层的逻辑分类,通常对应“一条完整的业务链路”。

      • 命名风格:{产品线}.{业务域}.{动作}{系统}.{对象}

      • 真实案例:

        公司 Topic 示例 语义
        电商 trade.order 交易域-订单相关所有消息
        物流 logistics.express 物流域-快递全流程
        内部系统 pay.callback 支付系统-第三方回调
      • 一条消息只能属于一个 Topic:同一个 Topic 可以被多个消费者组订阅

    • Tag:细粒度的业务场景/子动作

      • 定义:对 Topic 的二次切分,用来区分“同一业务线里的不同小场景”。

      • 命名风格:{动词}.{对象}{状态},全部小写、英文复数用单数。

      • 真实案例(挂在 trade.order Topic 下)

        Tag 值 场景
        create 订单创建
        paid 订单已支付
        cancel 订单取消
        refund 订单退款
      • 订阅粒度:Consumer 可以只订阅 trade.order:paid,不会收到 createcancel 的流量。

      • 数量控制:一个 Topic 下 Tag 不宜过多(< 20),否则运维难聚合。

    • Key:幂等与检索的唯一标识

      • 定义:消息体内置的“业务主键”,RocketMQ 用它做去重按 Key 查询

      • 命名风格:直接放可反解的业务主键,保持全局唯一即可。

      • 真实案例

        业务场景 Key 示例
        订单 2025061912300001(订单号)
        支付流水 pay_1234567890
        用户注册 user_987654
      • 长度限制:最大 128 字节;不要放 JSON

      • 幂等作用:消费端可用 key 做 Redis SETNX 或数据库唯一索引,实现“同 Key 只处理一次”。

  • Consumer:消息消费对应下游业务

  • Broker:消息存储、传递、查询等功能

  • NameServer:用于注册Broker

image-20250815202955374

2、RocketMQ的消息模型

2.1、RocketMQ的基本样例

消息的生产者会有以下三种方式

  • 同步发送:等待消息返回ACK之后再进行下面的操作
  • 异步发送:使用sendCallBack(),异步等待ACK
  • 单向发送:sendOneWay(),只负责发送,不管消息发送是否成功

消费者消费信息分两种:

  • 拉模式:消费者主动去Broker拉取消息
  • 推模式:消费者等待Broker把消息推过来

通常使用推模式

2.2、广播消息

应用场景:

广播模式和集群模式是 RocketMQ 的消费者端处理消息最基本的两种模式。集群模式:一个消息,只会被一个消费者组中的多个消费者实例共同处理一次。广播模式:一个消息,则会推送给所有消费者实例处理,不再关心消费者组。

  • MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
  • MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
1
consumer.setMessageModel(MessageModel.BROADCASTING);
  • 启动多个消费者,广播模式下,这些消费者都会消费一次消息。

  • 默认模式(集群模式)下,Broker 端会给每个 ConsumerGroup 维护一个统一的 Offset,这样,当 Consumer 来拉取消息时,就可以通过 Offset 保证一个消息,在同一个 ConsumerGroup 内只会被消费一次。

  • 而广播模式的本质,是将 Offset 移到 Consumer 端自行保管,包括 Offset 的记录以及更新,全部放到客户端。这样 Broker 推送消息时,就不再管 ConsumerGroup,只要 Consumer 来拉取消息,就返回对应的消息。

注意点:

  1. Broker 端不维护消费进度,而是客户端进行维护,意味着,如果消费者处理消息失败了,将无法进行消息重试。
  2. Consumer 维护 Offset 的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。如果 Offset 丢了,Consumer 依然可以拉取消息。
    • 比如生产者发送了 1~10 号消息。消费者当消费到第 6 个时宕机了。当他重启时,Broker 端已经把第 10 个消息都推送完成了。如果消费者端维护好了自己的 Offset,那么他就可以在服务重启时,重新向 Broker 申请 6 号到 10 号的消息。但是,如果消费者端的 Offset 丢失了,消费者服务依然可以正常运行,但是 6 到 10 号消息就无法再取到了。后续这个消费者就只能获取 10 号以后的消息。

2.3、过滤消息

应用场景:

在同一个Topic下需要进一步进行划分可以在Broker端按照Tag进行过滤然后推到Consumer

Tag属性的处理比较简单,就是直接匹配。而SQL语句的处理会比较麻烦一点。RocketMQ也是通过ANTLR引擎来解析SQL语句,然后再进行消息过滤的。

ANTLR是一个开源的SQL语句解析框架。很多开源产品都在使用ANTLR来解析SQL语句。比如ShardingSphere,Flink等。

注意点:

  1. 使用Tag过滤时,如果希望匹配多个Tag,可以使用两个竖线(|)连接多个Tag值。另外,也可以使用星号(*)匹配所有。
  2. 使用SQL过滤时,SQL语句是按照SQL92标准来执行的。SQL语句中支持一些常见的基本操作:
    • 数值比较,比如:>, >=, <, <=, BETWEEN, =;
    • 字符串比较,比如:=, <>, IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND, OR, NOT;
  3. 消息过滤,无论是Broker端和在Consumer端都可以做。Consumer端可以自行获取用户属性,不感兴趣的消息,直接返回不成功的状态,跳过该消息就行了。但是RocketMQ会在Broker端完成过滤条件的判断,只将Consumer感兴趣的消息推送给Consumer。这样的好处是减少了不少必要的网络IO,但是缺点是加大了服务端的压力。不过在RocketMQ的良好设计下,更建议使用消息过滤机制。
  4. Consumer不感兴趣的消息并不表示直接丢弃。通常是需要在同一个消费者组,定到另外的消费者实例,消费那些剩下的消息。但是,如果一直没有另外的Consumer,那么,Broker端还是会推进Offset。

2.4、顺序消息

应用场景:

每一个订单从下单、锁库存、支付、下物流等几个步骤中,每个业务步骤都是由一个消息生产者通知给下游服务的,如何保证每个订单的业务不乱序?

这里需要生产者和消费者同时进行处理

生产者:

通过MessageSelector,将orderid相同的消息,都转发到同一个MessageQueue中

消费者:

通过MessageListenerOrderly消费者每次读取消息都从一个queue中获取(通过加锁方式实现)

注意点:

  1. 理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。
  2. 生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于集中导致数据热点竞争。
  3. 消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的
    消息,这会造成消息乱序。
  4. 消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

2.5、延迟消息

延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。

  • message.setDelayTimeLevel(3):预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
  • msg.setDelayTimeMs(10L):指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。

2.6、批量消息

将多个消息合并成一个批量消息,一次发送出去。可以减少网络IO、提升吞吐量

1
ArrayList<Message>

2.7、事务消息

事务消息

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

4、面试

4.1 RocketMQ保证消息不丢失

通常RocketMQ消息丢失是由于网络波动、服务器过载等

在生产者端,通常是由于用户网络问题,导致发送消息不能及时到达Broker。生产者发送消息有三种一种是单向传递不用接收来自Broker的确认,同步传递需要等待Broker的确认,异步传递可以异步等待Broker的确认,或者通过事务传递进一步在业务上确认,如果为了保证数据的强安全可以设置位同步传递

在Broker端,通常是由于服务器宕机丢失数据,可以配置刷盘策略来持久化保存到磁盘或者使用deleger配置高可用集群

在Consumer端,由于网络波动,或者下游服务异常,可以设置重试机制来保证整个业务正常重试,但是重试过多这里需要将其加入到死信队列保证不长期占用服务器资源

4.2 RocketMQ消息持久化机制

  • 消息数据存储区域:CommitLog

    包括:topic、queueid、message

  • 消费逻辑队列:ConsumeQueue

    包括:minOffset、maxOffset、consumerOffset

  • 索引

    包括:key索引、创建时间索引…

    image-20250815222154096

4.3、RocketMQ保证消息的顺序性

  • 生产者端需要使用MessageQueueSelector按照一定规则将消息发送到同一个Queue中

  • 消费者端需要使用MessageListenerOrderly进行顺序消费,这种方式是使用了加锁实现的

  • 消费者并发消费的时候需要设置消费线程为1

4.4、RocketMQ保证消息幂等性

幂等性就是生产者发送消息和消费者消费消息一一对应,即生产者不重复发送,消费者不重复消费

生产者端

比如由于网络波动,用户多次点击提交按钮,导致发送多条数据,我们需要防止多次投递

不过这里MQ已经帮我们在API中已经封装好该方法,MQ通过将消息在Broker中对消息添加一个唯一ID,可以直接但是在SpringBoot中使用ack返回机制使用MessageExt进行业务代码的判断,但是这里SpringBoot中使用该封装的代码时候多次进入该Message时候ID是不同的

但是业务要求幂等性要求极高,则需要设置Key进行,并且存储到Redis中,并且前端进行唯一判断或者后端


💡 场景还原:用户端网络波动导致重复提交订单

  1. 用户点击“提交订单”按钮,前端发送请求给后端。
  2. 网络卡了,用户以为没点成功,又点了几次
  3. 后端收到多个相同的请求,每个请求都要发一条“创建订单”的消息到 RocketMQ。
  4. 如果没有幂等机制,就会创建多条订单消息,导致重复下单!

✅ 如何防止这种情况?(生产者端幂等)

✅ 第一步:生成全局唯一的业务订单号(不是数据库自增ID)

  • 前端生成一个 UUID,比如 orderNo = "20250815123456_uuid",点击后 1 秒内禁用按钮,所有请求用同一个 orderNo
  • 或者后端在第一次点击时生成并返回给前端,后续重复点击都用这个 orderNo

✅ 第二步:后端用 Redis 做幂等校验(防止重复消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 伪代码
String orderNo = request.getOrderNo();
String key = "order:submit:" + orderNo;

// 使用 Redis 的 SETNX(set if not exist)
Boolean isFirst = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(10));

if (!isFirst) {
// 说明已经处理过,直接返回成功(或提示“订单已提交”)
return "订单已提交,请勿重复操作";
}

// 第一次提交,发消息到 RocketMQ
Message msg = new Message("OrderTopic", "createOrder", JSON.toJSONBytes(order));
msg.setKeys(orderNo); // 设置业务唯一键
producer.send(msg);

✅ 第三步:消费者端也要幂等

即使生产者端防住了,极端情况下(如 Redis 宕机)仍可能重复消息,所以消费者也要做幂等:

  • 消费者收到消息后,用 orderNo 查数据库:
    • 如果已存在该订单,直接跳过。
    • 如果不存在,则创建订单。
  • 数据库订单表的 orderNo 字段设置为唯一索引,确保不会重复插入。

✅ 总结:重复提交订单的幂等保障流程

步骤 作用 技术点
1. 前端生成唯一订单号 标识每次提交请求 UUID、时间戳
2. Redis SETNX 去重 防止后端重复发消息 Redis 原子操作
3. 消息设置业务key 便于消费者幂等 msg.setKeys(orderNo)
4. 消费者幂等校验 防止重复消费 数据库唯一索引

用前端生成的唯一订单号 + Redis SETNX 做幂等校验,就能防止用户因网络波动重复提交订单导致的重复消息问题。

消费者端

消费者如果消费过则消费者不再消费,防止多次消费

4.5、RocketMQ处理消息积压

如果短时间消息积压,是没有太多问题,但是注意,如果消息积压长时间得不到解决。RocketMQ、Kafka中如果日志文件过期了,则会直接删除过期日志,而这些日志文件上未消费的小修就会直接丢失

如何处理

  • 通常解决Consumer业务端效率问题,比如优化SQL慢查询、等优化Consumer消费消息的性能
  • 在上线前可以多添加一些Topic和broker

4.6、快速读写实现原理

Broker会在commitlog中申请硬盘,默认为1G并且在该硬盘空间下实现顺序读写

正常来说消息的传递需要经过如下过程:

image-20250815161719901

为了进一步优化消息的传递,采用了**“零拷贝”**技术:

  • 数据传输由传统四次复制简化成3次复制,减少1次复制过程
  • Java中采用MappedByteBuffer类实现了该技术
  • 要求预留存储空间,用于保存数据(1G存储空间起步)

image-20250815162016408

4.7、刷盘机制

分为同步刷盘异步刷盘

同步刷盘就是同步的执行保存到硬盘的逻辑,这样虽然安全但是性能不高

异步刷盘就是当数据达到一定量的时候进行刷盘,从而保证了性能,这里即保留1、3、7

配置方式:

1
2
3
4
#刷盘方式
#-ASYNC_FLUSH异步刷盘
#-SYNC_FLUSH同步刷盘
flushDiskType=SYNC_FLUSH

image-20250815162946340

三、RabbitMQ

RabbitMQ官网:https://www.rabbitmq.com/docs

Classic 经典队列

是一种普通的FIFO队列,当消费者发送NACK时候会重新入队,这种不适合会导致消息积压的方式。下面是Classic经典队列的官方版本解释,在4.0版本完全移除了Version 1

Classic Queue Implementation Version 1

Version 1 is the default and the original implementation of classic queues. The index in version 1 uses a combination of a journal and segment files. When reading messages from segment files it loads an entire segment file in memory, which can lead to memory issues but does reduce the amount of I/O performed. Version 1 embeds small messages in the index, further worsening memory issues.

Classic Queue Implementation Version 2

Version 2 takes advantage of the improved performance of modern storage devices and is the recommended version. The index in version 2 only uses segment files and only loads messages from disk when necessary. It will load more messages based on the current consumption rate. Version 2 does not embed messages in its index, instead a per-queue message store is used.

因此,经典队列主要用在数据量比较小,并且生产者消息和消费者消息的速度比较稳定的业务场景,尽量不出现消息积压的状态,比如内部系统之间的服务调用

Quorum仲裁队列

如果消费者发送NACK则Quorum不会像Classic中对于毒消息不做处理,而是在消息头部设置重复投递的次数,队列设置delivery-limit如果大于直接丢弃或加入死信队列

在数据安全性方面,Quorum对列主要针对网络分区、通信失败等复杂网络情况下,可以提升数据的安全性。通常建议配合PublisherConfirms机制使用。RabbitMQ能够保证经生产者确认过的消息,在集群内时安全的。但是,对于未经生产者确认的消息,RabbitMQ并不能保证消息安全。

**Quorum队列更适合于队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。**例如电商系统的订单,引I入MQ后,处理速度可以慢一点,但是订单不能丢失。

也对应以下一些不适合使用的场景:

  1. 一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
  2. 对消息低延迟要求高:一致性算法会影响消息的延迟。
  3. 对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
  4. 队列消息积压严重:如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。这种情况下,stream流式对列是一种比较好的选择。

Sharding插件

能够提升消费者消费的速度

是交换机的一种类型,可以将消息均匀的划分给多个Queue,这里Queue命名会按照交换机+随机字符从而形成分片,在消费的时候就可以只设定交换机名称

对于这些伪队列,尽量不要单独对其进行消费

四、Kafka

1、Kafka运行架构

image-20250818170041146

2、Kafka运行过程

2.1、生产者拦截器机制

生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容。

这涉及到Producer中指定的一个参数:INTERCEPTOR_CLASSES_CONFIG

2.2、消费者分组消费机制

在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。一个partition只能被一个消费者组中其中一个消费者消费

image-20250818172659870

3、面试

1、消息分区序列化机制

image-20250818153446367

渔与鱼序列化机制是在高并发场景中非常重要的一个优化机制。高效的系列化实现能够极大的提升分布式系统的网络传输以及数据落盘的能力。例如对于一个User对象,即可以使用JSON字符串这种简单粗暴的序列化方式,也可以选择按照各个字段进行组合序列化的方式。但是显然后者的占用空间比较小,序列化速度也会比较快。而Kafka在文件落盘时,也设计了非常高效的数据序列化实现,这也是Kafka高效运行的一大支撑。

在很多其他业务场景中,也需要我们提供更高效的序列化实现。例如使用MapReduce框架时,就需要自行定义数据的序列化方式。使用Netty框架进行网络调用时,为了防止粘包,也需要定制数据的序列化机制。在这些场景下,进行序列化的基础思想,和我们这里介绍的也是一样的。当然,如果我们可以进一步设计出更简短高效的数据压缩算法,那也就能更进一步提高数据传输的效
率。比如对二进制数据进行压缩。而这就是算法最直接的作用。

2、消息分区路由机制

  • 默认是将producer中数据传到同一个partitioin中并且当数据量大于16k的时候就切换partition。
  • 但是当设置key时候会使用key的hash绑定相应的partition
  • 使用轮询
  • 自定义

image-20250818154755396

这里进行一个延申,消息的顺序消费,这里的思路就是将数据发送到同一个partition中

Kafka默认提供了三种消费者的分区分配策略

  • range策略:比如一个Topic有10个Partiton(partition 0-9)一个消费者组下有三个Consumer(consumer1-3)。Range策略就会将分区0-3分给一个Consumer,4-6给一个Consumer,7-9给一个Consumer。
  • round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如o,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer
  • sticky策略:粘性策略。这个策略有两个原则:
    • 在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。
    • 分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

另外可以通过继承AbstractPartitionAssignor抽象类自定义消费者的订阅方式。

官方默认提供的生产者端的默认分区器以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。

3、生产者的消息缓存机制

Kafka生产者为了避免高并发请求对服务端造成过大压力,每次发消息时并不是一条一条发往服务端,而是增加了一个高速缓存,将消息集中到缓存后,批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。

Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件:accumulator和sender

Dequeue:双端链表

实际上在producer先做好缓存,然后在本地针对每个partition做一个Dequeu,然后先放在Dequeu经过send后发送到Topic的Partition,其实这里的send是使用NIO进行发送的

image-20250818155833135

、使用NIO建立一个长连接image-20250818160227684

Sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。LINGER_MS_CONFIG默认值是0。

4、发送应答机制

在Producer将消息发送到Broker后,要怎么确定消息是不是成功发到Broker上了呢?

这是在开发过程中比较重要的一个机制,也是面试过程中最喜欢问的一个机制,被无数教程指导吹得神乎其神。所以这里也简单介绍一下。

其实这里涉及到的,就是在Producer端一个不太起眼的属性ACKS_CONFIG

ack取值有:0(不发ack,安全性低,吞吐量大)、1(只要Broker中LeaderPartition响应成功后直接异步的发送ack响应,安全性中、吞吐量低)、all/-1(除了LeaderPartition写成功、其它FlowerPartition也要写成功)

但是现在有个问题当副本过多时候或partition过多那该如何呢

可以设置broker中min.insync.replicas当响应多个副本就可以成功

所以经过上面可以得到一个结论,不能说ACK是保证安全性的唯一标志,而需要客户端和服务端共同发力,毕竟ack只是发送一个响应给客户端,重要的处理还是在客户端,需要根据客户端的业务逻辑进一步处理

5、生产者消息幂等性

之前分析过,当Producer的acks设置成1或-1时,Producer每次发送消息都是需要获取Broker端返回的RecordMetadata的。这个过程中就需要两次跨网络请求。

image-20250818162337914

当ack进行消息重试的时候默认是无限次,所以针对业务需要进行处理,这里首先需要理解分布式数据传递过程中的三个数据语义:at-least-once:至少一次;at-most-once:最多一次;exactly-once:精确一次。

回到Producer发消息给Broker这个场景,如果要保证at-most-once语义,可以将ack级别设置为o即可,此时,是不存在幂等性问题的。如果要保证at-least-once语义,就需要将ack级别设置为1或者-1,这样就能保证LeaderPartition中的消息至少是写成功了一次的,但是不保证只写了一次。如果要支持Exactly-once语义怎么办呢?这就需要使用到idempotence幂等性属性了。
Kafka为了保证消息发送的Exactly-once语义,增加了几个概念:

  • PID:每个新的Producer在初始化的过程中就会被分配一个唯一的PID。这个PID对用户是不可见的。
  • Sequence Numer:对于每个PID,这个Producer针对Partition会维护一个sequenceNumber。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时,这个SequenceNumber就会加1。然后会随着消息一起发往Broker。
  • Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber=SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。

这样,Kafka在打开idempotence幂等性控制后,在Broker端就会保证每条消息在一次发送过程中,Broker端最多只会刚刚好持久化一条。这样就能保证at-most-once语义。再加上之前分析的将生产者的acks参数设置成1或-1,保证at-least-once语义,这样就整体上保证了Exactaly-once语义。

image-20250818163127198

给Producer打开幂等性后,不管Producer往同一个Partition发送多少条消息,都可以通过幂等机制保证消息的Exactly-only语义。但是是不是这样消息就安全了呢?

6、生产者数据压缩机制

Kafka的生产者支持四种压缩算法。这几种压缩算法中,zstd算法具有最高的数据压缩比,但是吞吐量不高。Iz4在吞吐量方面的优势比较明显。在实际使用时,可以根据业务情况选择合适的压缩算法。但是要注意下,压缩消息必然增加CPU的消耗,如果CPU资源紧张,就不要压缩了。

关于数据压缩机制,在Broker端的broker.conf文件中,也是可以配置压缩算法的。正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改,但是如果Broker端和Producer端指定了不同的压缩算法,就会产生很多异常的表现。

如果开启了消息压缩,那么在消费者端自然是要进行解压缩的。在Kafka中,消息从Producer到Broker再到Consumer会一直携带消息的压缩方式,这样当Consumer读取到消息集合时,自然就知道了这些消息使用的是哪种压缩算法,也就可以自己进行解压了。但是这时要注意的是应用中使用的Kafka客户端版本和Kafka服务端版本是否匹配。

7、生产者消息事务

接下来,通过生产者消息幂等性问题,能够解决单生产者消息写入单分区的的幂等性问题。但是,如果是要写入多个分区呢(会导致一个分区成功一个分区失败这种不断重试)?比如生产者一次发送多条消息,然后给不同的消息指定不同的key。这批消息就有可能写入多个Partition,而这些Partition是分布在不同Broker上的。这意味着,Producer需要对多个Broker同时保证消息的幂等性、保证原子性

image-20250818164825261

保证这一批消息最好同时成功的保持幂等性。或者这一批消息同时失败,这样生产者就可以开始进行整体重试,消息不至于重复。

1
2
3
4
5
6
7
8
//1初始化事务
void initTransactionsC;
//2开启事务
void beginTransactionC)throws ProducerFencedException;
//3提交事务
voidcommitTransactionCthrowsProducerFencedException;
//4放弃事务(类似于回滚事务的操作)
voidabortTransactionCthrowsProducerFencedException;

Kafka的事务消息还会做两件事情:

  1. 一个Transactionld只会对应一个PID

    如果当前一个Producer的事务没有提交,而另一个新的Producer保持相同的Transactionld,这时旧的生产者会立即失效,无法继续发送消息。

  2. 跨会话事务对齐

    如果某个Producer实例异常岩机了,事务没有被正常提交。那么新的Transactionld相同的Producer实例会对旧的事务进行补齐。保证旧事务要么提交,要么终止。这样新的Producer实例就可以以一个正常的状态开始工作。