kafka概要整理
初识Kafka
什么是 Kafka
Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统
Kafka 的基本术语
- 消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录
- 批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息
- 主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类,主题就像是数据库中的表
- 分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序
- 生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息
- 消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息
- 消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体
- 偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据
- broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存
- broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)
- 副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随
- 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段
Kafka 的特性(设计原则)
- 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒
- 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中
- 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储
- 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
- 高并发: 支持数千个客户端同时读写
Kafka 的使用场景
- 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等
- 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的
- 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
- 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等
- 流式处理:流式处理是有一个能够提供多种应用程序的领域
- 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃
Kafka 的消息队列
Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式
Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式
如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列
Kafka 系统架构
如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance,Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息
核心 API
Kafka 有四个核心API,它们分别是
Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流
Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者,例如,关系数据库的连接器可能会捕获对表的所有更改
Kafka 为何如此之快
Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据
批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费
总结一下其实就是四个要点
- 顺序读写
- 零拷贝
- 消息压缩
- 分批发送
Kafka 安装和重要配置
kafka安装见本博客,主要说一下kafka重要参数配置
broker 端配置
- broker.id
每个 kafka broker 都有一个唯一的标识来表示,这个唯一的标识符即是 broker.id,它的默认值是 0,这个值在 kafka 集群中必须是唯一的,这个值可以任意设定 - port
如果使用配置样本来启动 kafka,它会监听 9092 端口,修改 port 配置参数可以把它设置成任意的端口,要注意,如果使用 1024 以下的端口,需要使用 root 权限启动 kakfa - zookeeper.connect
用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。比如我可以这么指定 localhost:2181 表示这个 Zookeeper 是运行在本地 2181 端口上的。我们也可以通过 比如我们可以通过 zk1:2181,zk2:2181,zk3:2181 来指定 zookeeper.connect 的多个参数值。该配置参数是用冒号分割的一组 hostname:port/path 列表,其含义如下
hostname 是 Zookeeper 服务器的机器名或者 ip 地址
port 是 Zookeeper 客户端的端口号
/path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2 - log.dirs
Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来制定的,它是用一组逗号来分割的本地系统路径,log.dirs 是没有默认值的,你必须手动指定他的默认值。其实还有一个参数是 log.dir,如你所知,这个配置是没有 s 的,默认情况下只用配置 log.dirs 就好了,比如你可以通过/home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值 - num.recovery.threads.per.data.dir
对于如下3种情况,Kafka 会使用可配置的线程池来处理日志片段
服务器正常启动,用于打开每个分区的日志片段
服务器崩溃后重启,用于检查和截断每个分区的日志片段
服务器正常关闭,用于关闭日志片段
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程 - auto.create.topics.enable
默认情况下,kafka 会使用三种方式来自动创建主题,下面是三种情况:
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户端向主题发送元数据请求时
auto.create.topics.enable参数我建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故
主题默认配置
Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数
- num.partitions
num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数 - default.replication.factor
这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效 - log.retention.ms
Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 - - log.retention.ms - log.retention.bytes
另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加 - log.segment.bytes
上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率 - log.segment.ms
上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足 - message.max.bytes
broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值,这个值对性能有显著的影响,值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求,它还会增加磁盘写入块的大小,从而影响 IO 吞吐量 - retention.ms
规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值 - retention.bytes
retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间
JVM 参数配置
JDK 版本一般推荐直接使用 JDK1.8,这个版本也是现在中国大部分程序员的首选版本,
说到 JVM 端设置,就绕不开堆这个话题,业界最推崇的一种设置方式就是直接将 JVM 堆大小设置为 6GB,这样会避免很多 Bug 出现
JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置,如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:
- 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC
- 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC
当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了
一般 G1 的调整只需要这两个参数即可 - MaxGCPauseMillis
该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是 200ms,也就是说,每一轮垃圾回收大概需要200 ms 的时间 - InitiatingHeapOccupancyPercent
该参数指定了 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45,这就表明G1在堆使用率到达45之前不会启用垃圾回收。这个百分比包括新生代和老年代
Kafka Producer
在 Kafka 中,我们把产生消息的那一方称为生产者,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka 后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka 后台,然后淘宝会根据你的爱好做智能推荐,致使你的钱包从来都禁不住诱惑,那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢?发送过程是怎么样的呢?
尽管消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图
从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成,在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器
如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了
ProducerRecord 还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型
- 如果将主题配置为使用 CreateTime,则生产者记录中的时间戳将由 broker 使用。
- 如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由 broker 重写
然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上,由一个独立的线程负责把它们发到 Kafka Broker 上,Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户,如果写入失败,会返回一个错误,生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息
Kafka Consumer
应用程序使用 KafkaConsumer 从 Kafka 中订阅主题并接收来自这些主题的消息,然后再把他们保存起来。应用程序首先需要创建一个 KafkaConsumer 对象,订阅主题并开始接受消息,验证消息并保存结果。一段时间后,生产者往主题写入的速度超过了应用程序验证数据的速度,这时候该如何处理?如果只使用单个消费者的话,应用程序会跟不上消息生成的速度,就像多个生产者像相同的主题写入消息一样,这时候就需要多个消费者共同参与消费主题中的消息,对消息进行分流处理
Kafka 消费者从属于消费者群组,一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。下面是一个 Kafka 分区消费示意图
上图中的主题 T1 有四个分区,分别是分区0、分区1、分区2、分区3,我们创建一个消费者群组1,消费者群组中只有一个消费者,它订阅主题T1,接收到 T1 中的全部消息。由于一个消费者处理四个生产者发送到分区的消息,压力有些大,需要帮手来帮忙分担任务,于是就演变为下图
这样一来,消费者的消费能力就大大提高了,但是在某些环境下比如用户产生消息特别多的时候,生产者产生的消息仍旧让消费者吃不消,那就继续增加消费者
如上图所示,每个分区所产生的消息能够被每个消费者群组中的消费者消费,如果向消费者群组中增加更多的消费者,那么多余的消费者将会闲置,如下图所示
向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么就演变为下图这样
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用
总结起来就是如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者
消费者组是什么
消费者组(Consumer Group)是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制,消费者组内的消费者共享一个消费者组ID,这个ID 也叫做 Group ID,组内的消费者共同对一个主题进行订阅和消费,同一个组中的消费者只能消费一个分区的消息,多余的消费者会闲置,派不上用场
我们在上面提到了两种消费方式
- 一个消费者群组消费一个主题中的消息,这种消费模式又称为点对点的消费方式,点对点的消费方式又被称为消息队列
- 一个主题中的消息被多个消费者群组共同消费,这种消费模式又称为发布-订阅模式
消费者重平衡
从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做 Rebalance ,如下图所示
重平衡非常重要,它为消费者群组带来了高可用性 和 伸缩性,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为,在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用,另外当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序
消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区,对于不同不的消费群体来说,其组织协调者可以是不同的,只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息,当消费者检索记录或者提交它所消费的记录时就会发送心跳
如果过了一段时间 Kafka 停止发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡,如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息,在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿
重平衡是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改
重平衡的过程对消费者组有极大的影响,因为每次重平衡过程中都会导致万物静止,参考 JVM 中的垃圾回收机制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虚拟机》中 p76 关于 Serial 收集器的描述):
更重要的是它在进行垃圾收集时,必须暂停其他所有的工作线程,直到它收集结束,Stop The World 这个名字听起来很帅,但这项工作实际上是由虚拟机在后台自动发起并完成的,在用户不可见的情况下把用户正常工作的线程全部停掉,这对很多应用来说都是难以接受的
也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费,等待重平衡的完成,而且重平衡这个过程很慢......
本文参考链接:传送门