Kafka消息队列详解


一、基本结构

1. User

Kafka 中存在两类用户:生产者和消费者。消息的生产者叫 Producer ,消息的使用者和接收者是 Consumer,生产者将数据保存到 Kafka 集群中,消费者从中获取消息进行业务的处理。

2. Broker

Kafka 集群中可能存在多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个实例,也称为 Broker

当部署 Kafka 集群时则存在多个 Broker 节点,通常将主节点称之为 Leader 节点,其它 Broker 节点称为 Follwer 节点。  

3. Topic

TopicKafka 中的逻辑存储概念,一个 Topic 里保存的是同类消息,相当于对消息的分类。每个 Producer 将消息发送到 kafka 中都需要指明 Topic,即指明这个消息属于哪一类。

4. Partition

每个 Topic 都可以分成多个 Partition,每个 Partition 在存储层面是 append log 文件,任何发布到此 Partition 的消息都会被直接追加到 log 文件的尾部。

为什么要进行分区呢?最根本的原因就是 Kafka 基于文件存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此采用分区的办法,即一个分区对应一个文件,这样就可以将数据分别存储到不同的 Server 上去,同时也可以实现负载均衡从而容纳更多的消费者。

(1) Replica

分区 Partition 中的数据是以文件的形式存在,而为了实现故障高可用,Kafka 中引入了副本概念 (Replica)

当生产者发送数据时,主节点接收到消息后会先将数据写入自身节点的 append log 文件,并将消息复制给其它的所有的子节点。同时子节点也会定时从主节点同步数据,从而保证与主节点的消息一致性。即在集群模式下,每个 Partition 下的消息文件将按照一定规则均匀复制分布到各个节点,因此当存在某个故障节点仍可从其它节点读取从而保证数据的安全性。

  • KafkaReplica 包含主节点 (leader) 与子节点 (follower)
  • PartitionReplica 默认情况会均匀复制分布到所有 Broker 上。
  • Topicreplication-factorN(N>1) 时,则每个 Partition 都会有 N 个副本。
  • Replica 的个数小于或等于 Broker 的个数,对于每个 Partition 而言,每个 Broker 上最多只会有一个 Replica,因此可以使用 Broker id 指定 PartitionReplica

5. Offset

一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为偏移量 (Offset)Offset 为一个 long 型数字,它可以唯一标记条消息。

由于 Kafka 并没有提供基他额外的索引机制来存储 Offset ,文件只能顺序的读写,所以在 Kafka 中几乎不允许对消息进行”随机读写”。

二、传输过程

1. 生产者

对于生产者要写入的条记录,可以指定四个参数:分别是 Topic, Partition, Key, Value(需存入的数据),其中 TopicValue 是必须要指定的,而 KeyPartition 是可选的。

对于一条记录,先对其进行序列化,按照 TopicPartition 放进对应的发送队列中,若 Partition 未填,则分为两类情况:

  • Key 没填,通过 Round-Robin 算法确定 Partition
  • Key 有填,按照 Key 进行哈希,相同 Key 去一个 Partition

当完成 Patition 的确定并将数据存入分区后,每一条消息以 <Topic, Partition> 实现定位,并且每个分区都会为其的消息分配一个递增的偏移量,注意同一个 Topic 中不同分区的消息偏移量是相互独立的。

2. 消费者

消费者不是以单独的形式存在的,每一个消费者属于一个 Consumer Group ,一个 Group 可以包含多个 Consumer。当执行数据消费时, Consumer 会不断的拉取 Partition 中的数据,并记录一份当前 <Topic, Partition> 消费的数据偏移量 (offset),注意这份偏移量数据是记录在消费者中。

当一个 Topic 存在多个分区时,消费者在拉取消息会通过一定的算法轮询读取每个分区数据。注意消费者为不同的分区维护不同的偏移量,即为每个分区都单独记录一份偏移量标识当前读取的位置。因此,在 Topic 层面其数据是无序的,而在同一个 Topic 的同一个分区下数据是有序的并且可通过偏移量表示。

特别需要注意的是:订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息只会被订阅此 Topic 的每个 Group 中的一个 Consumer 消费,即同一个消费组的两个消费者不会同时消费一个 Partition。如果所有的 Consumer 都具有相同的 Group,那么就像是一个点对点的消息系统;如果每个 Consumer 都具有不同的 Group,那么消息会广播给所有的消费者。

具体说来,这实际上是根据 Partition 来分的,一个 Partition 只被消费组里的一个消费者消费,但是可以同时被多个消费组消费,消费组里的每个消费者是关联到一个 Partition 的,因此对于一个 Topic 而言,同一个 Group 中不能有多于 Partition 个数的 Consumer 同时消费,否则将意味着某些 Consumer 将无法得到消息。

三、数据同步

1. ISR机制

Kafka 中通过 ISR(in-sync replica set) 机制管理集群节点,其机制如下:

  • Leader 会维护一个与其基本保持同步的 Replica 列表,该列表称为 ISR ,每个 Partition 都会有一个 ISR,而且是由 Leader 动态维护。
  • 如果一个 Flower 比一个 Leader 落后太多,或者超过一定时间未发起数据复制请求,则 Leader 将其重 ISR 中移除。
  • ISR 中所有 Replica 都向 Leader 发送 ACK 时,Leadercommit
  • 该时间阈值由 replica.lag.time.max.ms 参数设定,当 Leader 发生故障后会从 ISR 中选举出新的 Leader

2. ACK机制

Kafka 采用的是至少一次 At least once,消息不会丢但是可能会重复传输,通过 ACK 参数配置传输模式。

  • 同步复制:只有所有的 follower 把数据拿过去后才 commit,一致性好,可用性不高。
  • 异步复制:只要 leader 拿到数据立即 commit,等 follower 慢慢去复制,可用性高立即返回,但一致性差一些。

设置等待 acks 返回的机制,有下述三个值:

  • -1:默认,等待所有 Flower 同步完消息后再发送(绝对不会丢数据);
  • 0:不等待返回的 ack (可能会丢数据,因为发送消息没有了失败重试机制,但是这是最低延迟);
  • 1:消息发送给 Kafka 分区中的 Leader 后就返回(如果 Flower 没有同步完成 Leader 就宕机了,就会丢数据);

acks 的默认值即为 1,代表我们的消息被 Leader 副本接收之后就算被成功发送,可以配置 acks = all  代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。即保证消息不丢失的,设置消息持久化后再返回消息发送成功响应,对应 Spring 配置 spring.kafka.producer.acks=-1

3. 故障转移

(1) Leader故障

Leader 发生故障后,会从 ISR 中选出一个新的 Leader,同时为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW(High Watermark, 消费者的消费进度) 的部分截掉,然后从新的 Leader 同步数据。

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

(2) Follower故障

Follower 发生故障后会被临时踢出 ISR 集合,待该 Follower 恢复后,将会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步数据操作,等该 FollowerLEO(Log End Offset,日志的末尾偏移量) 大于等于该 PartitionHW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。

四、数据幂等

1. 重复消费

Kafka 中提供了消息幂等从而避免消息的重复消息,即 Exactly once

上述介绍 ack 时提到在设置为 -1 时,当生产者发送消息时会等待所有 broker 返回确认信息,若未生产者未接受到返回确认值则会重发,保证了至少消息会成功一次。

在理想的情况下,broker 在接收到生产者发送的数据之后会添加到分区文件末端,然后返回一个 ack 值标识我已经成功获取消息,过程如下图:

Broker 返回的 ack 值并非每次都能成功,假设由于网络波动等异常导致返回失败,由于生产者未收到确定值因此将尝试重发,但 Broker 已接收并保存过数据,此时将会导致同一消息保存了两次。

根据上述所描述,若 acks 设置为 -1 虽保证了数据不丢失但却存在重复消息的问题。在现代的多数系统设计中,都需要保证数据的幂等性,即在相同情景下的执行的操作结果一定也是相同的。

例如在查询用户信息时,当请求过滤 ID123 的记录时,在后台数据未发生变更的前提,不论执行了多少次查询,每次返回的结果都应该相同,即操作的幂等性。同理,当 Kafka 发送的消息发生重复保存时,显然也就会导致消费者的重复消费,如果消费者不执行额外的过滤流程,显然同一业务逻辑将会被执行多次。

2. 实现机制

因此,Kafka0.11 版本之后加入了消息的幂等,解决重复数据的问题,即实现了数据的 Exactly Once

在开启幂等后,Producer 在初始化时会被分配一个 PID,同时发往同一 Partition 的消息会附带 Sequence Number,其为是一个递增的偏移量。当 Broker 在接收到数据进行保存时,会在分区中比对消息 PIDSN,接收消息的 SN 比分区数据中的 SN 值大于 1 时该消息将会被丢弃。

若需要开启消息幂等,生产者需要将 enable.idempotence 设置为 true,搭配 acks = -1 项从而实现 Producer 不论向 Broker 发送多少重复数据,Broker 端都只会持久化一条。

3. 失效场景

虽然通过 enable.idempotence 可以开启幂等,但并非所有情况下都能保证消息的幂等。

由于一个 Topic 可能存在多个 Partition,而幂等消息每个 <Partiton, PID, SN> 在不同 Partition 中是独立存在的,若重复消息发送至了不同 Partition 幂等也将失效。如下图示例中,若存在一个 Topic 两个分区,第一次发送数据存入 Partition-1 后若 ack 返回失败导致重发,而第二次发送数据存入了 Partition-2,而 Partition-2 中比对 PIDSN 后未读取到对应数据则仍会执行保存,此时数据仍然被重复保存。

因此,幂等性只能保证在同一个分区 (Partition) 下的消息去重,即无法保证跨分区会话的 Exactly Once

4. 注意事项

通过上述的描述可以看到 Kafka 提供了一定场景下的数据去重功能,但在真实的生产开发中,仍会出现某些异常场景导致重复消费。

即幂等乃至事务的确保证了数据的不重复保存,但若消费者在完成消费后提交 offset 失败,则在下次服务恢复时这条消息将会被再次重复消费。因此,在需要严格保证的幂等场景下,更为合适的方式即将业务系统设计为幂等。

同样以最常见消息发送消费为例,在 Producer 端发送数据中添加一个唯一标识符字段,消费端 Consumer 在接收到数据时基于该字段验证数据是否已经被消费过,此时不论是 Producer 端重复发送亦或是 Consumer 重复拉取消息,我们都能够保证业务的执行唯一性。针对唯一字段的验证方式,对于单体应用可利用各类本地缓存框架,而对于分布式应用而言,也可通过 Redis 等中间件实现命中判断。

五、消息事务

1. 事务介绍

在介绍幂等时提到了其无法实现了跨分区的原子性,而事务正是解决了这个问题。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

若需要开启事务,生产者通过配置 transactional.id 值开启事务,且必须同时开启幂等。生产者启动后同时还会获取一个单调递增的 producer epoch,如果使用同一个 transactional.id 开启两个生产者,Kafka 收到事务提交请求时检查当前事务提交者的 epoch 不是最新的,那么就会拒绝该 Producer 的请求。

2. 事务机制

从生产者的角度分析,通过事务 Kafka 可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。前者表示具有相同 transactional.id 的新生产者实例被创建且工作的时候,旧的且拥有相同 transactional.id 的生产者实例将不再工作。后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交 (Commit),要么被中止 (Abort),如此可以使新的生产者实例从一个正常的状态开始工作。

3. 事务方法

KafkaProducer 中提供了 5 个与事务相关的方法,详细如下:

// 初始化事务
void initTransactions();

// 开启事务
void beginTransaction() throws ProducerFencedException;

// 为 Consumer 提供的在事务内 Commit Offsets 的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction() throws ProducerFencedException;

// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

4. 事务隔离

在消费端有一个参数 isolation.level 与事务有关,这个参数的默认值为 read_uncommitted,即表明说消费者可以读到未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为 read_committed,表示消费者不可以读取尚未提交的事务内的消息。另外,需设置 enable.auto.commit = false 关闭自动提交 Offset 功能。

六、优点特性

1. Broker

(1) IO多路复用

Kafka 采用 Reactor 网络通信模型实现复用以提高效率。

Acceptor线程负责监听新的连接,Processor线程都有自己的 selector,负责从 Socket 中读写数据。KafkaRequestHandler业务处理线程进行业务处理,然后生成 Response,再交由给 Processor 线程。

(2) 磁盘顺序写

Kafka 本质上就是一个队列,是先进先出的,而且消息一旦生产了就不可变,这种有序性和不可变性使得 Kafka 完全可以「顺序写」日志文件。

对于普通的机械磁盘,如果是随机写入性能确实极差,但如果是顺序写入,则大大节省磁盘寻道和盘片旋转的时间,因此性能提升了 3 个数量级。

(3) Page Cache

在读写磁盘日志文件时,利用了操作系统本身的缓存技术,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘,类似于 MySQL 中的 Change Buffer

(4) 分区分段

当面对海量消息时,单机的存储容量和读写性能有限,对数据进行分区存储,可以更好的利用不同机器的读写能力,应对海量数据的存储。

Kafka 通过水平拆分方案,对数据进行拆分,拆分后的数据子集叫做 Partition,各个分区的数据合集即全量数据。每个 Partition 又被分成了多个 Segment,引入 Segment 可以防止 Partition 过大m,同时做历史消息删除时,常见的操作时需要将文件前面的内容删除,这有悖顺序写的设计。而 Segment 的引入,只需将旧的 Segment 文件删除即可,保证了每个Segment 的顺序写。

2. 生产者

(1) 批量发送

Kafka 采用了批量发送消息的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合,从而大大减少了网络传输的 overhead

(2) 消息压缩

消息压缩的目的是为了进一步减少网络传输带宽,但其不仅仅减少了网络 IO,它还大大降低了磁盘 IO。因为批量消息在持久化到 Broker 中的磁盘时保持的仍是压缩状态,最终是在 Consumer 端做了解压缩操作。

(3) 高效序列化

只需要提供相应的序列化和反序列化器,用户可以根据实际情况选用快速且紧凑的序列化方式来减少实际的网络传输量以及磁盘存储量,进一步提高吞吐量。

(4) 内存池复用

Producer 一上来就会占用一个固定大小的内存块,比如 64MB,然后将 64 MB 划分成 M 个小内存块(比如一个小内存块大小是 16KB)。

当需要创建一个新的 Batch 时,直接从内存池中取出一个 16 KB 的内存块即可,然后往里面不断写入消息,但最大写入量就是 16 KB,接着将 Batch 发送给 Broker ,此时该内存块就可以还回到缓冲池中继续复用了,根本不涉及垃圾回收。

3. 消费者

(1) 稀疏索引

Kafka 查询的场景主要是能按照 offset 或者 timestamp 查到消息即可。因此 Kafka 消息的 offset 设计成有序的,将消息划分成若干个 block,而稀疏索引记录每个 block 第一条消息的 offset,查找的时候便可以便捷的使用二分查找高效定位。

稀疏索引不会为 每个搜索关键字创建索引记录,此处的索引记录包含搜索键和指向磁盘上数据的实际指针,搜索记录时首先按索引记录进行操作,然后到达数据的实际位置再进行顺序搜索,直到找到所需的数据为止。B+ 树随着记录插入需要频繁的页分裂效率较低,而 Hash 索引的常驻内存,若高达几百万的消息写入,会将内存撑爆。

(2) 零拷贝

零拷贝是指数据直接从磁盘文件复制到网卡设备,而无需经过应用程序,减少了内核和用户模式之间的上下文切换,从而提高了消息消费效率。

(3) 批量拉取

和生产者批量发送消息类似,消息者也是批量拉取消息的,每次拉取一个消息集合以降低了频次,从而大大减少了网络传输的 overhead

(4) MMAP

kafka索引文件的读写 中用到了 MMAP(memory mapped files)

MMAP 是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回 写脏页面 到对应的文件磁盘上,即完成了对文件的操作而不必再调用read,write等系统调用函数。

Kafkalog 文件为什么不使用 MMAPMMAP 有多少字节可以映射到内存中与地址空间有关,32 位的体系结构只能处理 4GB 甚至更小的文件。Kafka 日志通常足够大,可能一次只能映射部分,因此读取它们将变得非常复杂。然而,索引文件是稀疏的,它们相对较小,将它们映射到内存中可以加快查找过程,这是内存映射文件提供的主要好处。


参考链接

  1. Kafka事务

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录