首次接触到Kafka的背景,实时同步mysql数据到doris,Mysql binlog + kafka + flink + doris

消息队列发展史
主流消息队列对比

简介

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。
主要设计目标如下:

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

创建背景

Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。

Kafka基础概念

Kafka的特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性:kafka集群支持热扩展。
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高并发:支持数千个客户端同时读写。

Kafka为什么吞吐量大、速度快【重要】

  1. Partition并行 分区分段+索引
    Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。
    每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。
    这也非常符合分布式系统分区分桶的设计思想。

    通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。
    为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。
    这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

  2. 顺序读写磁盘
    Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。

    影响磁盘的关键因素是磁盘服务时间,即磁盘完成一个 I/O 请求所花费的时间,它由寻道时间、旋转延迟和数据传输时间三部分构成。
    机械硬盘的连续读写性能很好,但随机读写性能很差,这主要是因为磁头移动到正确的磁道上需要时间,随机读写时,磁头需要不停的移动,时间都浪费在了磁头寻址上,所以性能不高。衡量磁盘的重要主要指标是 IOPS 和吞吐量。
    在许多的开源框架如 Kafka、HBase 中,都通过追加写的方式来尽可能的将随机 I/O 转换为顺序 I/O,以此来降低寻址时间和旋转延时,从而最大限度的提高 IOPS。

    Kafka的每一个Partition其实都是一个文件,收到消息后Kafka会把数据插入到文件末尾。

    这种方法有一个缺陷:没有办法删除数据。所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据。
    如果不删除硬盘肯定会被撑满,所以Kakfa提供了两种策略来删除数据。一是基于时间,二是基于partition文件大小。具体配置可以参看它的配置文档。

  3. Page Cache

    操作系统层面引入 Cache 层的目的是为了提高 Linux 操作系统对磁盘访问的性能。Cache 层在内存中缓存了磁盘上的部分数据。当数据的请求到达时,如果在 Cache 中存在该数据且是最新的,则直接将数据传递给用户程序,免除了对底层磁盘的操作,提高了性能。
    在 Linux 的实现中,文件 Cache 分为两个层面,一是 Page Cache,另一个 Buffer Cache,每一个 Page Cache 包含若干 Buffer Cache。
    Page Cache 主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有 read/write 操作的时候。
    Buffer Cache 则主要是设计用来在系统对块设备进行读写的时候,对块进行数据缓存的系统来使用。

    通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。

    在写磁盘文件的时候,就可以先直接写入 os cache 中,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入到磁盘中, 这样大大提高写入效率和性能。

  4. 零拷贝
    Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)磁盘文件通过网络发送(Broker 到 Consumer)的过程。这一过程的性能直接影响 Kafka 的整体吞吐量。


    在实际应用中,把磁盘中的某个文件内容发送到远程服务器上,必须要经过几个拷贝的过程:

    1. 从磁盘中读取目标文件内容拷贝到内核缓冲区(OS Cache);
    2. CPU控制器再把内核缓冲区的数据赋值到用户空间的缓冲区中;
    3. 接着在应用程序中,调用write()方法,把用户空间缓冲区中的数据拷贝到内核下的Socket Buffer中;
    4. 最后,把在内核模式下的SocketBuffer中的数据赋值到网卡缓冲区(NIC Buffer),网卡缓冲区再把数据传输到目标服务器上。

    在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历4次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的,分别是:

    1. 从内核空间赋值到用户空间
    2. 从用户空间再次复制到内核空间

    零拷贝,就是把这两次多余的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核中直接传输给Socket,而不需要再经过应用程序所在的用户空间。

    零拷贝通过DMA(Direct Memory Access,直接存储器访问)技术把文件内容复制到内核空间中的Read Buffer,接着把包含数据位置和长度信息的文件描述符加载到Socket Buffer中,DMA引擎直接可以把数据从内核空间中传递给网卡设备。
    在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了2次cpu的上下文切换,对于效率有非常大的提高。
    所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是避免了在内核空间和用户空间之间的拷贝。

    • Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现磁盘的快速写入。
      mmap 文件映射(Memory Mapped Files):将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件
    • Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer 进行网络发送,减少 CPU 消耗。
  5. 批处理 批量读写
    Kafka数据读写也是批量的而不是单条的。

    除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。
    在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。
    假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

  6. 数据压缩
    在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络 IO。
    Producer 可将数据压缩后发送给 broker,从而减少网络传输代价,目前支持的压缩算法有:Snappy、Gzip、LZ4。数据压缩一般都是和批处理配套使用来作为优化手段的。

  7. 超高并发网络架构

概念1 生产者与消费者

对于 Kafka 来说,客户端有两种基本类型:生产者(Producer)和消费者(Consumer)。
除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层做了封装。

  • Producer
    消息生产者,就是向 kafka broker 发消息的客户端。
  • Consumer
    消息消费者,向 kafka broker 取消息的客户端。
  • Consumer Group
    消费者组,由多个 consumer 组成,消费者组是逻辑上的一个订阅者。
    • 重平衡:Rebalance
      消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。
      Rebalance 是 Kafka 消费者端实现高可用的重要手段。

概念2 主题(Topic)与分区(Partition)

  • Topic
    Topic是一个逻辑上的消息队列,同一类型的消息可以放到一个Topic(消息队列)中。
    主要作用是用来屏蔽底层分区和副本的复杂逻辑。

  • Partition
    Partition(分区),是Kafka下数据存储的基本单元,这个是物理上的概念。

    一个 topic 可以分为多个 partition,每个 partition 都是一个有序的队列。
    主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。

    同一个分区(partition)可以被不同的消费者组同时消费;
    但是在同一个消费者组内,一个分区只能被一个消费者消费。

当broker里面的topic数量过多时,kafka的性能不如rocketMq?

Kafka与RocketMQ在topic处理上的不同

  1. kafka中partition增多会存在随机写的可能性,partition之间刷盘的冲撞率会高,但是RocketMQ是把消息都写到一个CommitLog文件中,所以相当于一个文件的顺序写。
  2. RockertMQ的consumerQueue消息格式大小固定(20字节),写入pagecache之后被触发刷盘频率相对较低。
  • Replica
    Replica(副本),就是Partition的一个备份,副本的数量是可以配置的。
    Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
    一个分区(Partition)只能有一个leader,但是可以设置多个副本(follower),同一分区的副本不能在同一台机器上。

    leader partition
    1、写数据、读数据操作都是从leader partition去操作的。
    2、会维护一个ISR(in-sync-replica)列表,但是会根据一定的规则删除ISR列表里面的值。生产者发送来一个消息,消息首先要写入到leader partition中,写完了以后,还要把消息写入到ISR列表里面的其它分区,写完后才算这个消息提交。
    follower partition:从leader partition同步数据。
    当 leader 发生故障时,某个 follower 会成为新的 leader,以此来保证kafka的可用性。

    自 Kafka 2.4 之后,Kafka 提供了有限度的读写分离,也就是说,Follower 副本能够对外提供读服务。

概念3 Broker和集群(Cluster)

一台 kafka 服务器就是一个 broker。
一个kafka集群由多个 broker 组成,然后通过Zookeeper来进行集群的管理。

  • Zookeeper
    2.8.0版本之前,Kafka 将 Broker、Topic 和 Partition 的元数据信息存储在 Zookeeper 上。

    通过在 Zookeeper 上建立相应的数据节点,并监听节点的变化。

  • Controller
    Controller 是从 Broker 中选举出来的,负责整个集群中所有分区、副本的管理。
    当分区中Leader副本出现问题时及时选举新的Leader副本。

核心API

  • Producer API
    允许应用程序向一个或多个 topics 上发送消息记录。
  • Consumer API
    允许应用程序订阅一个或多个 topics 并对发布给他们的流式数据进行处理。
  • Streams API
    它允许应用程序作为流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • Connector API
    它允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。
    比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

https://mp.weixin.qq.com/s?__biz=Mzg3MTcxMDgxNA==&mid=2247488849&idx=1&sn=febda095589f02553d9191528f271c07&chksm=cefb3c60f98cb576fd9c58d760b9a5e4ae32a0c001e2049b591297d904a0401646448999c78a&scene=178&cur_album_id=2147575846151290880#rd

Kafka Producer

在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。
它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。

那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢?
初始化和发送过程是怎么样的呢?

Producer初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
1)、设置分区器(partitioner), 分区器是支持自定义的
2)、设置重试时间(retryBackoffMs)默认100ms
3)、设置序列化器(Serializer)
4)、设置拦截器(interceptors)
5)、初始化集群元数据(metadata),刚开始空的
6)、设置最大的消息为多大(maxRequestSize), 默认最大1M, 生产环境可以提高到10M
7)、设置缓存大小(totalMemorySize) 默认是32M
8)、设置压缩格式(compressionType)
9)、初始化RecordAccumulator也就是缓冲区指定为32M
10)、定时更新(metadata.update)
11)、创建NetworkClient
12)、创建Sender线程
13)、KafkaThread将Sender设置为守护线程并启动

Producer发送过程

KafkaProducer.send(ProducerRecord, Callback)

  1. 序列化+计算目标分区
  2. 追加写入消息缓冲区(accumulator)
  3. Sender线程预处理及消息发送
  4. Sender线程处理response

Producer内存池设计

Ack应答确认机制

ack作用是确认收到消息,一是producer发送消息到leader收到消息之后发送ack,二是leader和follower之间同步完成数据会发送ack

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的,这个参数对于消息是否丢失起着重要作用。
该参数的配置具体如下:

  • acks=0
    producer不等待broker的ack,这一种操作提供了最低的延迟,broker一接受到还没有写入到磁盘就已经返回了,当broker故障的时候 丢失数据(相当于异步发送)
  • acks=1
    producer等待broker的ack,partition的leader落盘成功后返回ack。
    如果follower同步数据之前leader故障,此时会丢失数据。
    此时follower需要同步leader中的数据,但是leader宕机了,挂了之后kafka集群会重新选举leader,选举出leader之后,并没有同步到原有的数据,就会造成数据的丢失。
  • acks=-1
    producer等待broker的ackpartition的leader和follower全部落盘成功后,才会返回ack。
    但是如果follower同步完成之后,在broker发送ack之前,leader发生故障,生产者收不到ack确认,会重新发送消息,那么会出现数据的重复,但不会造成数据丢失。

Producer网络架构

Kafka Consumer

消费方式

Kafka Consumer 采用从 Broker 中主动拉取数据。

不采用 push(推)模式是因为:由 broker 决定消息发送速率,很难适应所有消费者的消费速率。

pull 模式不足之处是,如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。

Consumer初始化

消费组概念

为什么 Kafka 要设计 Consumer Group, 只有 Consumer 不可以吗?
我们知道 Kafka 是一款高吞吐量,低延迟,高并发, 高可扩展性的消息队列产品,那么如果某个 Topic 拥有数百万到数千万的数据量,仅仅依靠 Consumer 进程消费,消费速度可想而知,所以需要一个扩展性较好的机制来保障消费进度,这个时候 Consumer Group 应运而生。
Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

特点:

  • 每个 Consumer Group 有一个或者多个 Consumer;
  • 每个 Consumer Group 拥有一个公共且唯一的 Group ID;
  • Consumer Group 在消费 Topic 的时候,Topic 的每个 Partition 只能分配给组内的某个 Consumer,只要被任何 Consumer 消费一次, 那么这条数据就可以认为被当前 Consumer Group 消费成功。

Group Coordinator

所谓协调者,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
每个Consumer Group都会选择一个broker作为自己的Coordinator。

具体来讲,Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移,同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有Broker在启动时,都会创建和开启相应的Coordinator组件。也就是说,「所有Broker都有各自的Coordinator组件」。

Consumer Group如何确定为它服务的Coordinator在哪台Broker上?
通过Kafka内部主题__consumer_offsets

Kafka为某个Consumer Group确定Coordinator所在的Broker的算法有2个步骤:

  1. 确定由__consumer_offsets主题的哪个分区来保存该Group数据.
    partitionId = Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
  2. 找出该分区Leader副本所在的Broker,该Broker即为对应的Coordinator。

Rebalance消费者组重分配机制

消费者组内所有消费者自动重新分配订阅主题分区的过程。
Rebalance是Kafka消费者端实现高可用的重要手段。
Consumer group靠Coordinator实现了Rebalance。

Rebalance 的触发条件

  1. 当 Consumer Group 组成员数量发生变化(主动加入或者主动离组,故障下线等)
  2. 当订阅主题数量发生变化
  3. 当订阅主题的分区数发生变化

Rebalance 如何通知其他 consumer 进程?
Rebalance 的通知机制是靠 Consumer 端的心跳线程,它会定期发送心跳请求到 Broker 端的 Coordinator,当协调者决定开启 Rebalance 后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中发送给 Consumer,当 Consumer 发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就知道 Rebalance开始了。

Partition分区分配策略

  • rage
    RangeAssignor 是 Kafka 默认的分区分配算法。
    它是按照 Topic 的维度进行分配的,对于每个 Topic,首先对 Partition 按照分区ID进行排序,然后对订阅这个 Topic 的 Consumer Group 的 Consumer 再进行排序,之后尽量均衡的按照范围区段将分区分配给 Consumer。此时可能会造成先分配分区的 Consumer 进程的任务过重(分区数无法被消费者数量整除)。
  • round-robin
    RoundRobinAssignor 的分区分配策略是将 Consumer Group 内订阅的所有 Topic 的 Partition 及所有 Consumer 进行排序后按照顺序尽量均衡的一个一个进行分配。
    如果 Consumer Group 内,每个 Consumer 订阅都订阅了相同的Topic,那么分配结果是均衡的。
    如果订阅 Topic 是不同的,那么分配结果是不保证“尽量均衡”的,因为某些 Consumer 可能不参与一些 Topic 的分配。
  • sticky
    StickyAssignor 分区分配算法是 Kafka Java 客户端提供的分配策略中最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整。
    其主要实现了以下2个目标:
    1. Topic Partition 的分配要尽量均衡。
    2. 当 Rebalance(重分配,后面会详细分析) 发生时,尽量与上一次分配结果保持一致。
      注意:当两个目标发生冲突的时候,优先保证第一个目标,这样可以使分配更加均匀,其中第一个目标是3种分配策略都尽量去尝试完成的,而第二个目标才是该算法的精髓所在。

偏移量管理 位移提交机制

对于Kafka Partition而言,每条消息都有一个offset,用来表示消息的位置。存储层面。
对于消费者而言,也有一个offset,用来表示消费到分区中某个消息所在的位置,称为消费位移。消费层面。

在消费者每次调用pull方法的时候,拉取到的是还没有消费过的消费集,要做到这一点,就需要记录上一次消费时候的消费位移,并且这个位移必须做持久化的保存,而不是单单保存在消费者内存中,否则消费者重启后就无法获取之前的消费位移,以及如果新增一个消费者,分区再均衡的时候,新的消费者无法获取消费位移。

在 Kafka 0.9 版本之前,Consumer 默认将 Offset 保存在 ZooKeeper 中。
由于Zookeeper并不适合大批量的频繁写入操作,从 0.9 版本开始,消费者去掉了对ZK的依赖,当启动一个消费者时不再向ZK进行注册,而是由消费者协调器(Group Coordinator)统一管理,消费者已消费消息的偏移量提交会保存在名为**__consumer_offsets**的Kafka内部主题中,以支持高并发的读写。

消费位移做持久化操作的动作称为提交,消费者在消费完消息之后需要执行消费位移的提交。

__consumer_offsets

__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

__consumer_offsets 的每条消息格式大致如图所示:

<K, V> = <group.id + topic + 分区号, offset>

考虑到一个 kafka 生成环境中可能有很多 consumer 和 consumer group,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id 做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将负载分散到不同的 __consumer_offsets 分区上。

一般情况下,当集群中第一次有消费者消费消息时会自动创建 __consumer_offsets。
它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化);
分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。

Kafka位移提交方式

  • 自动提交
    kafka消费者中默认的消费位移提交方式是自动提交,由enable.autto.commit配置,默认每5s提交一次。
  • 手动提交
    • 同步提交
    • 异步提交

位移提交当中涉及到了很多消息重复消费和消息丢失的问题!!!

https://mp.weixin.qq.com/mp/appmsgalbum?__biz=Mzg3MTcxMDgxNA==&action=getalbum&album_id=2147575846151290880&scene=173&from_msgid=2247488847&from_itemidx=1&count=3&nolastread=1#wechat_redirect

Kafka Broker

Controller 控制器机制

每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。
Controller 在 Zookeeper 的帮助下管理和协调整个 Kafka 集群。

Controller 选举

Kafka 2.8.0 以前用ZooKeeper管理元数据,Controller的选择是在Zookeeper上完成的。

Kafka 当前选举控制器的规则是:Kafka 集群中第一个启动的 broker 通过在 ZooKeeper 里创建一个临时节点 /controller 让自己成为 controller 控制器。其他 broker 在启动时也会尝试创建这个节点,但是由于这个节点已存在,所以后面想要创建 /controller 节点时就会收到一个节点已存在的异常。然后其他 broker 会在这个控制器上注册一个 ZooKeeper 的 watch 对象,/controller节点发生变化时,其他 broker 就会收到节点变更通知。这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题。

Controller Failover
如果控制器关闭或者与 ZooKeeper 断开链接,ZooKeeper 上的临时节点就会消失。集群中的其他节点收到 watch 对象发送控制器下线的消息后,其他 broker 节点都会尝试让自己去成为新的控制器。其他节点的创建规则和第一个节点的创建原则一致,都是第一个在 ZooKeeper 里成功创建控制器节点的 broker 会成为新的控制器,那么其他节点就会收到节点已存在的异常,然后在新的控制器节点上再次创建 watch 对象进行监听。

Controller 作用

  1. Topic管理
    控制器帮助完成对Kafka主题的创建、删除以及分区增加的操作
  2. 分区重分配
  3. Partition Leader选举
  4. 集群Broker管理
    新增Broker、Broker主动关闭、Broker故障

日志存储 分区分段保存

Kafka中的消息是以主题为基本单位进行归类的,每个主题在逻辑上相互独立。
每个主题又可以分为一个或多个分区,在不考虑副本的情况下,一个分区会对应一个日志。
但设计者考虑到随着时间推移,日志文件会不断扩大,因此为了防止Log过大,设计者引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,便于后续的消息维护和清理工作。
主题、分区、Log、LogSegment

LogSegment

在Kafka中,每个Log对象又可以划分为多个LogSegment文件,每个LogSegment文件包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。

其中,每个LogSegment中的日志数据文件大小均相等(该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的「log.segment.bytes」进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件)。

index, log, snapshot, timeindex 文件以当前 Segment 的第一条消息的 Offset 命名。
“.index” 文件存储大量的索引信息。
“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。

副本机制

  • AR(Assigned Replicas):分区中的所有副本统称为AR。
    所有消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。
    但是在同步期间,follower对于leader而言会有一定程度的滞后,这个时候follower和leader并非完全同步状态
  • OSR(Out Sync Replicas):follower副本与leader副本没有完全同步或滞后的副本集合
  • ISR(In Sync Replicas):AR中的一个子集,ISR中的副本都是与leader保持完全同步的副本
    如果某个在ISR中的follower副本落后于leader副本太多,则会被从ISR中移除,否则如果完全同步,会从OSR中移至ISR集合。
    在默认情况下,当leader副本发生故障时,只有在ISR集合中的follower副本才有资格被选举为新leader,而OSR中的副本没有机会(可以通过unclean.leader.election.enable进行配置)。
  • HW(High Watermark):高水位
    它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位 offset 之前的消息
  • LEO(Log End Offset):标识当前日志文件中下一条待写入的消息的offset
    HW 之前的消息数据对消费者是可见的, 属于 commited 状态;
    HW 之后的消息数据对消费者是不可见的,属于 uncommited 状态。

HW和LEO更新机制


每个 kafka 副本对象都有两个重要的属性:LEO 和 HW。注意是所有的副本(leader + Follower)。
但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。
Kafka之所以要在Leader副本上保存这些所有 Follower LEO, 就是为了帮助 Leader 副本确定其高水位,也就是分区高水位。

1. follower副本何时更新LEO?
2. follower副本何时更新HW?
3. leader副本何时更新LEO?
leader写log时就会自动地更新它自己的LEO值。
4. leader副本何时更新HW?

  1. producer向leader副本写入消息时:因为写入消息会更新leader的LEO,故有必要再查看下HW值是否也需要修改
  2. leader处理follower FETCH请求时:当leader处理follower的FETCH请求时首先会从底层的log读取数据,之后会尝试更新分区HW值

当尝试确定分区HW时,它会选出所有满足条件的副本,比较它们的LEO(当然也包括leader自己的LEO),并选择最小的LEO值作为HW值。
这里的满足条件主要是指副本只需满足以下两个条件之一即可:

  1. 处于ISR中
  2. 副本LEO落后于leader LEO的时长不大于 replica.lag.time.max.ms 参数值(默认是10s)

LeaderEpoch机制

Kafka 0.11 引入了leader epoch来取代HW值。

很多原因都可能造成 Leader 和 Follower 保存的消息序列不一致,比如程序 Bug、网络问题等。这是很严重的错误,必须要完全规避。
之前确保一致性的主要手段是高水位机制 High watermark,但高水位值无法保证 Leader 连续变更场景下的数据一致性。
因此,社区在 0.11 版本引入了 Leader Epoch 机制,来规避因 HW 更新错配导致的各种不一致问题。

Leader端多开辟一段内存区域专门保存leader的epoch信息。
所谓leader epoch实际上是一对值:<epoch, offset>

  • epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1。
  • offset则对应于该epoch版本的leader写入第一条消息的offset。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。
当 Leader Partition 写入消息到磁盘时,Broker 会尝试更新这部分缓存。
如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。
这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。

规避数据丢失

规避数据不一致

Kafka 网络通信设计

KAFKA 并没有使用现有的网络框架比如 Netty 去作为自己通信的底座,而是基于 NIO 自行研发了一套适合 KAFKA 自身的网络框架模型。

https://mp.weixin.qq.com/s?__biz=Mzg3MTcxMDgxNA==&mid=2247497246&idx=2&sn=4d700a976a3c531bd77aab58f00bcb83&chksm=cef8df2ff98f563949117c8055e7ff271346b84d12b698f7690181dca16ea9e679a9939a3f46&scene=178&cur_album_id=2147575846151290880#rd

版本演进

0.7.x

只提供最基础的消息队列功能。

0.8.x

  • Kafka 0.8.0增加了副本机制
    至此 Kafka 成为了一个真正意义上完备的分布式高可用消息队列解决方案。
  • Kafka 0.8.2.0引入了新版本Producer API

0.9.x

Kafka 0.9 是一个重大的版本迭代,增加了非常多的新特性,主要体现在三个方面:

  • 安全方面
    在0.9.0之前,Kafka安全方面的考虑几乎为0。
    Kafka 0.9.0 在安全认证、授权管理、数据加密等方面都得到了支持,包括支持Kerberos等。
  • 新版本Consumer APi
    Kafka 0.9.0 重写并提供了新版消费端API,使用方式也是从连接Zookeeper切到了连接Broker,但是此时新版Consumer API也不太稳定、存在不少Bug,生产使用可能会比较痛苦;
    而0.9.0版本的Producer API已经比较稳定了,生产使用问题不大。
  • Kafka Connect
    Kafka 0.9.0 引入了新的组件 Kafka Connect ,用于实现Kafka与其他外部系统之间的数据抽取。

0.10.x

Kafka 0.10 是一个重要的大版本,因为Kafka 0.10.0.0 引入了 Kafka Streams,使得Kafka不再仅是一个消息引擎,而是往一个分布式流处理平台方向发展。
0.10 大版本包含两个小版本:0.10.1 和 0.10.2,它们的主要功能变更都是在 Kafka Streams 组件上。

值得一提的是,自 0.10.2.2 版本起,新版本 Consumer API 已经比较稳定了,而且 Producer API 的性能也得到了提升,因此对于使用 0.10.x 大版本的用户,建议使用或升级到 Kafka 0.10.2.2 版本。

0.11.x

Kafka 0.11 是一个里程碑式的大版本,主要有两个大的变更。

  1. Kafka从这个版本开始支持Exactly-Once 语义即精准一次语义
    主要是实现了Producer端的消息幂等性,以及事务特性,这对于Kafka流式处理具有非常大的意义。
  2. Kafka消息格式的重构
    Kafka 0.11主要为了实现Producer幂等性与事务特性,重构了投递消息的数据结构。
    这一点非常值得关注,因为Kafka 0.11之后的消息格式发生了变化,所以我们要特别注意Kafka不同版本间消息格式不兼容的问题。

1.x

Kafka 1.x 更多的是Kafka Streams方面的改进,以及Kafka Connect的改进与功能完善等。
但仍有两个重要特性:

  1. Kafka 1.0.0实现了磁盘的故障转移
    当Broker的某一块磁盘损坏时数据会自动转移到其他正常的磁盘上,Broker还会正常工作,这在之前版本中则会直接导致Broker宕机,因此Kafka的可用性与可靠性得到了提升
  2. Kafka 1.1.0开始支持副本跨路径迁移
    分区副本可以在同一Broker不同磁盘目录间进行移动,这对于磁盘的负载均衡非常有意义。

2.x

Kafka 2.x 更多的也是Kafka Streams、Connect方面的性能提升与功能完善,以及安全方面的增强等。

  • Kafka 2.1.0开始支持ZStandard的压缩方式,提升了消息的压缩比,显著减少了磁盘空间与网络io消耗。
  • Kafka 2.8.0 用自管理的Quorum代替ZooKeeper管理元数据

为什么Kafka在2.8版本中会“抛弃”Zookeeper

3.x

问题

Kafka在多Partition多实例情况下,消息的消费和生产情况验证

Kafka在多Partition多实例情况下,消息的消费和生产情况验证

写Kafka,包括以下场景:

  1. 不带key写入 - 随机写入
    • 1个topic,2个partition的情况
    • 1个topic,3个partition的情况
  2. 带key写入 - 通过key的hash选出partition
    • 1个topic,2个partition,1个key的情况
    • 1个topic,3个partition,2个key的情况
    • 1个topic,3个partition,3个key的情况

读Kafka,包括以下场景:

  1. 1个topic,1个partition,2个消费实例(同一消费组)的情况
  2. 1个topic,2个partition,3个消费实例(同一消费组)的情况
  3. 1个topic,2个partition,3个消费实例,两个消费组的情况(其中2个实例一个消费组,第3个实例属于另一个消费组)