KAFKA原理介绍

KAFKA原理介绍

AMQP

  • Advanced Message Queueing Protocol (高级消息队列协议)
  • 是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。 AMQP 定义了通过网络发送的字节流的数据格式。因此兼容性非常好, 任何实现 AMQP 协议的程序都可以和与 AMQP 协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。

介绍:

  • apache kafka 是一个分布式流媒体平台。
    • 功能:
      1. 发布和订阅消息流,类似于消息队列和企业级消息系统。这也是 kafka 归结为消息队列框架的原因。
      1. 以容错的方式来记录消息流,kafka 以文件的方式来存储消息流。
      1. 可以在消息发布的时候进行处理。

使用场景:

  • 日志收集:
  • 消息队列:解耦和生产者,消费者,消息缓存。
  • 用户活动跟踪
  • 运营指标:
  • 流式处理:
  • 事件源:

提供有 4 个核心的 API:

  • The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

基本概念:

概念名词

  • broker: kafka 以集成的方式运行,可以有一个或者多个服务组成,每个服务就叫一个 broker.
  • producer: 往 broker 的 topic 中生产消息。
  • consumer: 往 broker 的某个 topic 中读取消息。
  • topic:kafka 消息的分类方式,每一类的消息称为一个 topic。
  • partition:topic 物理上的分组,一个 topic 可以分为多个 partition。
  • replication: 副本 partition 的备份,保证 partition 的高可用。
  • segment: partition 物理上由多个 segment 组成。
  • message:也叫 record, 是有一个 key, value 和时间戳组成的。
  • controller: 负责管理分区和副本的状态并执行,以及这些分区的重新分配。
  • ISR: 同步副本组
  • leader:replication 的一个角色,producer 和 consumer 只跟 leader 进行交互。
  • follower:replication 的一个角色,从 leader 中复制数据。
  • controller:kafka 集群中的一个 broker,用来进行 leader 选举以及各种 failover
  • zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。

重要概念理解

Kafka 集群中的单台服务器被称为 Broker,Broker 中包含了多个 Partition。Partition 是一个有序的队列,消息最终将写入 Partition ,同时它也是 Topic 在物理上的分组。 每个 Topic 代表一类消息,一个 Topic 包含一个或多个 Partition,与数据库表类似,用户在发送或读取消息时需要指定具体的 Topic。Producer 意为生产者,代表着用户发送消息的一端,与此对应的是 Consumer,意为消费者,即接收消息并做出相应处理的一端。

image

  • Broker

    每台 Kafka 服务器被称为 Broker,多个 Broker 组成了 Kafka 集群。

  • Topic & Partition

    Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka 中的 Topics 总是多订阅者模式,一个 Topic 可以拥有一个或者多个消费者来订阅它的数据。

    对于每一个 topic, Kafka 集群都会维持一个分区日志,如下所示:

    image

    每个 Partition 都是有序且顺序不可变的记录集,并且不断地追加到结构化的 commit log 文件。Partition 中的每一个记录都会分配一个 id 号来表示顺序,我们称之为 offset,offset 用来作为 Partition 中每一条记录的唯一标识。

  • segment

    Partition 是一个有序的队列,每个 Partition 都会被映射到一个逻辑的日志文件之上。这个逻辑的日志文件由一个或多个被称为 segment 的文件组成,分成多个 segment 文件可以有效的控制单个物理文件的大小。同时因为 Partition 中的消息是有序的,所以每当一个消息被发送到一个 Partition 之上时,它都会被追加到该 Partition 的逻辑文件中的最后一个 segment 的末尾,如上图所示。

  • offset

    事实上,在每一个消费者中唯一保存的元数据是 offset(偏移量),即消费者当前消费的消息在 log 中的位置。offset 由消费者控制,通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从”现在”开始消费。

    image

  • Producer

    Producer 意为生产者,代表着用户发送消息的一方。用户在 Producer 一端通过指定具体的 Topic 来发送消息,默认的情况下消息会被随机发送到该 Topic 下的某个 Partition ,当然我们也可以通过指定具体 Partition 编号,来将消息发送到指定的 Partition 中,这也是保证消息顺序的一种方式。

  • Consuemr

    Consuemr 意为消费者,代表着用户接收消息并做出相应处理的一方。每个 Consumer 都会属于一个特定的 Consumer Group,而一个 Consumer Group 则可能包含一个或多个 Consumer,这样子区分有以下几个主要的特点:

  1. 正常情况下,同一条消息只会被发送到同一个 Consumer Group 中的其中一个 Consumer;
  2. Kafka 为每个 Consumer Group 维护了一个 offset,用于记录该 Consumer Group 的消费位置;
  3. 可以指定 Consumer Group 下的不同 Consumer 消费不同的 Partition 下的消息;

image

  • Zookeeper

    Zookeeper 作为 Kafka 集群管理的第三方中间件,其主要作用包括:

  1. Leader 选举;
  2. 在 Consumer Group 发生变化时进行 Rebalance;
  3. 持久化 Kafka 维护的 Consumer Group 对应的 offset 值;
  4. 其它;

设计思想:

broker controller

  • 在早期的版本中,对于分区和副本的状态管理依赖于 zookeeper 的 watcher 和队列。每一个 broker 都会在 zookeeper 注册 Watcher。 所以,就会出现大量的 watcher,如果宕机的 broker 上的 partition 比较多,会造成多个 watcher 的触发,造成集群内大规模的调整。 每一个 replica 都要去再次 zookeeper 上注册监视器,当集群规模很大的时候,zookeeper 负担很重。 这种设计很容易出现脑裂和羊群效应以及 zookeeper 集群过载。
  • 新版本该变了这种设计,使用 KafkaController。 Leader 会向 zookeeper 上注册 Watcher,其他 broker 几乎不用监听 zookeeper 的状态变化。
  • 当 broker 启动时,都会创建 broker controller, 但是集群中只有一个 broker controller 对外提供服务。 这些每个节点上的 KafkaController 会在指定的 zookeeper 路径下创建临时节点,

只有第一个成功创建的节点的 KafkaController 才可以成为 leader,其余的都是 follower。 当 leader 故障后,所有的 follower 会收到通知,再次竞争在该路径下创建节点从而选举新的 leader

  • Kafka 集群中多个 broker,有一个会被选举为 controller leader,负责管理整个集群中分区和副本的状态, 比如 partition 的 leader 副本故障,由 controller 负责为该 partition 重新选举新的 leader 副本; 当检测到 ISR 列表发生变化,有 controller 通知集群中所有 broker 更新其 MetadataCache 信息; 或者增加某个 topic 分区的时候也会由 controller 管理分区的重新分配工作

    producer 生产消息:

      1. producer 采用 push 模式讲消息发送到 broker, 每条消息都被 append 到 partition 中,属于顺序写磁盘。 (顺序写入的效率比随机写入的效率要高,保障 kafka 的吞吐率。)
      1. 消息发送到 broker 时,会根据分区算法选择将其存储到哪一个 partition。
    • 其算法为

      • 2.1 指定了 partition, 则直接使用
      • 2.2 没有指定 partition, 指定了 key, 则使用 key 的 value,进行 hash 取余选出一个 partition.
      • 2.3 key 和 partition 都没有指定,就采用轮询的方法选出一个 partition.
      1. kafka 接收到 proucer 发送过来的消息之后,将其持久化到硬盘,并设置保留消息的时长。不关注消息是否被消费者消费。
      1. consumer 从 kafka 集群中 pull 数据,并记录 offset。

消息写入的流程:

  • producer 从 zk 的”/brokers/…/state” 节点找到 partition 对应的 leader

  • producer 把消息发送给对应的 leader

  • leader 把消息写入本地 log

  • followers 从 leader 处 pull 消息,写入本地 log 后,向 leader 发送 ACK。

  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

    file

    producers 的参数 acks 设置

    request.required.acks:

    • 0:producer 不会等待 broker 发送 ack
    • 1: 在 leader 已经接收到数据后,producer 会得到一个 ack
    • -1: 所有的 ISR 都接收到数据后,producer 才得到一个 ack

topic 的注册流程

  • controller 在 ZK 的 /brokers/topics 节点上注册 watcher. 当 topic 被创建时,controller 会通过 watcher 得到该 topic 的 partition 和 replica 的分配情况。
  • controller 从 /brokers/ids 中读取当前可用的 broker 列表。对于 set_p 中的每一个 partition.
    • 从分配给该 partition 的所有 replica(称为 AR)中任选一个可用的 broker 作为新的 leader,并将 AR 设置为新的 ISR
    • 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
    • controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

consumer group

  • 是 kafka 提供的可扩展的具有容错性的消费机制。
  • 组内有多个消费者或者消费者实例,共享一个 group id.
  • 组内所有的消费者协调在一起来消费订阅的 topic 中的所有分区。
  • 每一个分区只能由一个组内的一个消费者消费。其他的组也可以订阅同一个 topic.

rebalance

  • 其本质是一种协议,规定了 consumer group 下所有的 consumer 如何达成一致来分配订阅 topic 的每个分区。

  • 什么时候触发:

      1. 当消费组中的 consumer 发生变化时,(新 consumer 加入,有 consumer 主动离开,consumer 崩溃)
      1. 订阅的主题数发生变更
      1. 主题的分区数发生变化
  • 两种策略:

    • range: 将消费者的线程总数除以分区个数,如果有余数。那么前面几个消费者将会多消费几个分区。

    在这里插入图片描述

    例如:一个 topic 有 10 个分区,有 3 个消费线程,那消费者分配的分区就是:

    • C1-0 将消费 0, 1, 2, 3 分区
    • C2-0 将消费 4, 5, 6 分区
    • C2-1 将消费 7, 8, 9 分区

    • roundrobin:RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询消费者方式逐个将分区分配给每个消费者。

    在这里插入图片描述

    StickyAssignor分配策略 我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

    ① 分区的分配要尽可能的均匀; ② 分区的分配尽可能的与上次分配的保持相同。

    当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。

关于 partition

  • 一个 partition 只能被同组的一个 consumer 消费。同组的其他的 consumer 起到负载的作用。
  • 可以被多个消费组消费。即一个消息被消费多次。
  • 一个 topic 中 partition 的数量,是 userGroup 的最大并行数量。

负载均衡

  • kafka 集群中的任何一个 broker, 都可以向 producer 提供 metadata 信息, 这些 metadata 中包含” 集群中存活的 servers 列表”/“partitions leader 列表” 等信息。 当 producer 获取到 metadata 信息之后,producer 将会和 Topic 下所有 partition leader 保持 socket 连接; 消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何” 路由层”.
    • 异步发送,将多条消息暂且在客户端 buffer 起来,并将他们批量发送到 broker; 小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率; 不过这也有一定的隐患,比如当 producer 失效时,那些尚未发送的消息将会丢失。

关于 leader election 算法

  • 本质上是一个分布式锁,有两种方式实现基于 ZK 的分布式锁
      1. 节点名称唯一性:多个客户端创建一个临时节点,创建成功的客户端获得锁。
      1. 临时顺序节点:所有客户端在某个目录下创建一个临时节点,序号最小的那个获得锁。

问题与应用

消息丢失的问题

在旧的版本中,Producer 的发送类型分为同步与异步两种,通过参数 producer.type=[sync | async]进行设置。而在新的版本中去掉了producer.type参数,改用以下方式可得到相同的效果。

在新版本的 Kafka 中,批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。通过设置linger.msbatch.size等参数可控制请求间隔时间以及批处理大小等(详情参见:http://kafka.apachecn.org/documentation.html#producerconfigs)。当设置`linger.ms=0`时将立即发送消息(默认为 0),或者设置batch.size=0以禁用批处理。

而在使用 Producer API 发送消息时,使用的是异步发送消息方法,它将在确认发送时调用回掉函数,示例如下:

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
producer.send(record,
                new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         // doing something 
                     }
                });

注:即使异步的情形下,发送到同一分区的记录的回调也会保证按顺序执行。

producer.send方法返回一个java.util.concurrent.Future<RecordMetadata>对象,通过调用Future#get()方法可模拟同步阻塞的效果。

producer.send(record).get()

针对以上情形,可得到以下两种主要的情形:

  1. 当使用同步阻塞以及禁用批处理发送消息时,Producer 将会等待回调函数执行后,再继续执行,且消息为单条发送;
  2. 当使用异步非阻塞且未启用批处理发送消息时,Producer 在调用完发送消息的方法后,将立即执行后续程序,且消息为批量发送;

同时为了保证发送消息的可靠性,在 Producer 端可通过参数acks配置在确认一个请求发送完成之前需要收到的反馈信息的数量,其中:

  • acks=0 如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
  • acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收消息之后,并且在 follower 节点复制数据完成之前产生错误,则这条消息会丢失。
  • acks=all || -1 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。

基于以上配置情形,有如下情形可能发生消息丢失:

  1. 当异步发送消息时, Producer 不会等待服务器的反馈,如果网络发生异常或其它情况,则可能会丢失消息;
  2. 当异步批量发送消息时,如果 Producer down 掉,则缓冲区消息可能丢失;
  3. acks=0时,Producer 不会等待服务器的反馈,如果网络发生异常或其它情况,可能会丢失消息;
  4. acks=1时,如果 leader 节点在接收到消息之后,并且在 follower 节点复制数据完成之前产生错误,则这条消息会丢失;

所以如果需要保证消息不丢失,至少需要满足以下条件:

  1. 同步阻塞方式发送消息
  2. 设置 acks=-1或者acks=all

消息重复消费的问题

在 Consumer 中,offset 提交的方式有两种:

  1. 自动提交;
  2. 手动提交;

在新的版本中,通过 enable.auto.commit=[true || false](默认为 true)以及auto.commit.interval.ms(默认=5000)参数来控制是否由 Consumer 自动在后台提交 offset 以及自动提交 offset 的频率(以毫秒为单位),而在旧的版本中,则通过auto.commit.enable=[true || false](默认为 true)以及auto.commit.interval.ms(默认=60 * 1000)参数来达到同样的效果。

当 Consumer 在消费完数据并提交 offset 之后,offset 将被持久化在 zookeeper 之中。如果程序发生异常或重启,那么它将接着上一次的 offset 继续消费消息。所以如果当 Consumer 消费完消费之后,却在提交 offset 时发生异常,那么将可能导致消息被重复消费,根据消息重复消费的数量,可分为以下情形:

  1. 自动提交 offset 时,由于 offset 不会立即提交,所以可能会造成单次异常却重复消费多条连续的消息;
  2. 手动提交 offset 时,如果选中每消费一条消息,都手动提交一次 offset,那么针对每个分区来讲,单次异常只会至多重复消费一条消息;

通常而言,如果我们需要保证消息是全局或以键为单位的顺序消息时,选择手动提交 offset 会是更保险的做法。

顺序消息

保证消息的顺序入队与消费,通常分为两种情况:

  1. 全局有序,比如操作 【OP_1(id=1), OP_2(id=2), OP_3(id=1)】,它们的消费顺序也必须是 【OP_1, OP_2, OP_3】 ;
  2. 以 key 为单位的有序,比如以上操作,允许它们的操作为【OP_1, OP_3,OP_2】;

Kakfka 保证以 Partition 为单位的分区有序,所以如果选择全局有序,那么只能选择单个分区写入,以及如果消费者如果需要保证异常重启后也严格按照之前的顺序消费,那么也仅能使用单线程消费且手动提交 offset 的方式。但是好在实际的业务中,更多的是保证以 key 为单位的消息有序,所以我们可以通过将数据发送至多个 Partition,以提高程序的并发量,只要保证相同的 key 在同一个分区即可。

综合消息丢失与重复消费的问题,如果我们需要实现一个可靠的且保证以 key 为单位的有序消息,且消费者也严格按照顺序消费的程序,那么必须保证以下条件:

  1. Producer 端同步发送消息,且反馈的配置为acks=-1 || all
  2. 相同的 key 写入相同的 Partition;
  3. Consumer 使用手动提交 offset,每消费完一个消息后,手动提交 offset;
  4. 做好幂等消费操作,因为重复消费的问题理论上不可避免;

———————————————— 参考文章:

  1. kafka 笔记 | Go 技术论坛 (learnku.com)
  2. Kafka 学习与使用总结 – Kent's Tech Blog (kentt.top)