环境准备

系统IP主机名
centos7.6192.168.51.195kafka01
centos7.6192.168.51.196kafka02
centos7.6192.168.51.197kafka03

配置hosts

[root@kafka01 ~]# cat /etc/hosts
192.168.51.195 kafka01
192.168.51.196 kafka02
192.168.51.197 kafka03

[root@kafka01 ~]# ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa): 
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
31:b4:04:7f:11:0f:ff:4f:f8:1a:ad:c5:4a:cf:4e:c8 root@rabbitmq01
The key's randomart image is:
+--[ RSA 2048]----+
|      ..o +.     |
|       + . =     |
|        = . o    |
|         +   . . |
|        S     o .|
|             . B |
|              E B|
|             . X |
|              +.+|
+-----------------+
[root@kafka01 ~]# for i in kafka0{1..3};do ssh-copy-id ${i};done
[root@kafka01 ~]# for i in kafka0{1..3};do scp /etc/hosts ${i}:/etc/;done

java配置

#上传jdk文件
[root@kafka01 ~]# mkdir -p /usr/local/java
[root@kafka01 ~]# mv jdk-8u251-linux-x64.tar.gz /usr/local/java/
[root@kafka01 ~]# cd /usr/local/java/
[root@kafka01 java]# tar xf jdk-8u251-linux-x64.tar.gz
[root@kafka01 jdk1.8.0_251]# vim /etc/profile
#添加如下配置
export JAVA_HOME=/usr/local/java/jdk1.8.0_251
export PATH=$PATH:$JAVA_HOME/bin
[root@kafka01 jdk1.8.0_251]# source /etc/profile
[root@kafka01 java]# java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

部署zookeeper集群

获取kafka软件包

[root@kafka01 ~]# cd /opt/
[root@kafka01 opt]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
[root@kafka01 opt]# tar xf kafka_2.13-2.5.0.tgz
[root@kafka01 opt]# mv kafka_2.13-2.5.0 kafka

配置zookeeper

[root@kafka01 opt]# rm -fr /opt/kafka/bin/windows
[root@kafka01 opt]# cat > /opt/kafka/config/zookeeper.properties << EOF
> dataDir=/data/zk
> clientPort=2181
> maxClientCnxns=0
> tickTime=2000
> initLimit=10
> syncLimit=5
> quorumListenOnAllIPs=true
> server.1=kafka01:2888:3888
> server.2=kafka02:2888:3888
> server.3=kafka03:2888:3888
> EOF

[root@kafka01 opt]# mkdir /data/zk -pv
mkdir: created directory ‘/data’
mkdir: created directory ‘/data/zk’
[root@kafka01 opt]# echo 1 > /data/zk/myid

dataDir:zookeeper数据存放目录
clientPort:client连接zookeeper需要指定的端口
maxClientCnxns:表示允许客户端最大连接数,如果设置为0,则表示不限制
tickTime:表示zookeeper集群之间的心跳检测时间间隔(单位是毫秒)
initLimit:表示zookeeper集群中的follower在启动时需要在多少个心跳检测时间内从leader同步数据(如果数据量过大,此值可适当调整)
syncLimit:表示超过多少个心跳时间收不到follower的响应,leader就认为follower已经下线
quorumListenOnAllIPs:该参数设置为true,配置为true可以避免入坑(尤其是多网卡主机),Zookeeper服务器将监听所有可用IP地址的连接。他会影响ZAB协议和快速Leader选举协议,默认是false
server.x=IP:Port1:Port2:指定zk节点列表
x:表示节点的编号,此编号需要写入到zk数据存放目录下,以myid命名的文件
IP:可以指定IP也可以是主机名,Port1表示该Zookeeper集群中的Follower节点与Leader节点通讯时使用的端口,作为Leader时监听该端口,Port2表示选举新的Leader时,Zookeeper节点之间互相通信的端口,比如当Leader挂掉时,其余服务器会互相通信,选出新的Leader,Leader和Follower都会监听该端口

配置kafka

[root@kafka01 opt]# cat > /opt/kafka/config/server.properties << EOF
> broker.id=0
> listeners=PLAINTEXT://:9092
> auto.create.topics.enable=true
> delete.topic.enable=true
> num.network.threads=15
> num.io.threads=30
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/data/kafka/log
> num.partitions=3
> num.recovery.threads.per.data.dir=3
> default.replication.factor=2
> num.replica.fetchers=2
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> zookeeper.connect=kafka01:2181:kafka02:2181,kafka03:2181
> zookeeper.connection.timeout.ms=10000
> group.initial.rebalance.delay.ms=0
> EOF

auto.create.topics.enable:该配置项默认值是true,但在生产环境最好设置为false,这样可以控制创建Topic的人以及创建时间
background.threads:该配置项默认值是10,既整个Kafka在执行各种任务时会启动的线程数,如果你的CPU很强劲,那么可以将线程数设大一点
delete.topic.enable:该配置项默认值是false,可以根据实际需求改变,在生产环境还是建议保持默认值,这样至少不会出现Topic被误删的情况
log.flush.interval.messages:该配置项最好保持默认值,把这个任务交给操作系统的文件系统去处理
log.retention.hours:日志文件保留的时间默认是168小时,即7天,这个配置可以根据具体业务需求而定
message.max.bytes:每条Message或一批次Message的大小默认是1MB,这个配置也要根据具体需求而定,比如带宽的情况
min.insync.replicas:该配置项的默认值是1,既在acks=all时,最少得有一个Replica进行确认回执,建议在生产环境配置为2,保证数据的完整性
num.io.threads:处理I/O操作的线程数,默认是8个线程,如果觉得在这个环节达到了瓶颈,那么可以适当调整该参数
num.network.threads:处理网络请求和响应的线程数,默认是3个线程,如果觉得在这个环节达到了瓶颈,那么可以适当调整该参数
num.recovery.threads.per.data.dir:每个数据目录启用几个线程来处理,这里的线程数和数据目录数是乘积关系,并且只在Broker启动或关闭时使用,默认值是1,根据实际情况配置数据目录数,从而判断该配置项应该如何设置
num.replica.fetchers:该配置项影响Replicas同步数据的速度,默认值是1,如果发现Replicas同步延迟较大,可以提升该配置项
offsets.retention.minutes:Offset保留的时间,默认值是1440,既24小时,在生产环境建议将该配置项设大一点,比如设置为1个月,保证消费数据的完整性
unclean.leader.election.enable:该配置项的作用是,指定是否可以将非ISR的Replicas选举为Leader,默认值为false,在生产环境建议保持默认值,防止数据丢失
zookeeper.session.timeout.ms:Zookeeper会话超时时间,默认值为6000,按实际情况而定,通常情况下保持60秒即可
default.replication.factor:默认Replication Factor为1,建议设置为2或者3,以保证数据完整性和整个集群的健壮性
num.partitions:Topic默认的Partition数,默认是1,建议设置为3或者6,以保证数据完整性和整个集群的健壮性
group.initial.rebalance.delay.ms:当Consumer Group新增或减少Consumer时,重新分配Topic Partition的延迟时间

复制所需文件到其他节点

[root@kafka01 opt]# for i in kafka0{2..3};do scp -r /opt/kafka ${i}:/opt/;done
[root@kafka01 opt]# for i in kafka0{2..3};do scp -r /data ${i}:/;done

[root@kafka02 ~]# echo 2 > /data/zk/myid
[root@kafka02 ~]# sed -i 's#broker.id=0#broker.id=1#g' /opt/kafka/config/server.properties

[root@kafka03 ~]# echo 2 > /data/zk/myid
[root@kafka03 ~]# sed -i 's#broker.id=0#broker.id=2#g' /opt/kafka/config/server.properties

启动zookeeper

启动zookeeper

每个节点都需执行以下操作

[root@kafka01 ~]# cd /opt/kafka/bin
[root@kafka01 bin]# nohup ./zookeeper-server-start.sh ./config/zookeeper.properties >> zookeeper.file 2>&1 &
[root@kafka01 bin]# ss -lnpt | egrep '2181|3888|2888'   # 只有leader才会监听2888端口
LISTEN     0      50          :::2888                    :::*                   users:(("java",pid=423,fd=130))
LISTEN     0      50          :::3888                    :::*                   users:(("java",pid=423,fd=128))
LISTEN     0      50          :::2181                    :::*                   users:(("java",pid=423,fd=116))

验证

通过Zookeeper Client连接到集群来检验,选择任意一台服务器,首先连接kafka01主机

[root@kafka01 bin]# ./zookeeper-shell.sh kafka01:2181      # 连接到kafka01
create /my_zNode "some data"       # 连接成功后,创建一个zNode
ls /                   # 查看节点中所有znode
[my_zNode, zookeeper]


[root@kafka02 bin]# ./zookeeper-shell.sh kafka02:2181       # 再连接到kaka02
ls /          # 同样可以查看到my_zNode
[my_zNode, zookeeper]
get /my_zNode     # 查看my_zNode中的数据
some data


[root@kafka03 bin]# ./zookeeper-shell.sh kafka03:2181      # 再连接到kafka03节点
ls /       # 查看
[my_zNode, zookeeper]
get /my_zNode             # 获取数据
some data
set /my_zNode "new data"          # 修改my_zNode中的数据
get /my_zNode      # 确认数据已修改
new data


[root@kafka01 bin]# ./zookeeper-shell.sh kafka01:2181      # 再连接到kafka1,确定数据修改

get /my_zNode
new data

过程虽然比较繁琐,但是充分说明了我们的Zookeeper集群是搭建成功的,无论从哪个Zookeeper节点创建的zNode,都可以同步到集群中的其他节点,无论从哪个Zookeeper节点修改的zNode中的数据,也可以同步到起群中的其他节点

启动kafka

启动kafka

三个节点都需启动kafka

[root@kafka01 ~]#  cd /opt/kafka/bin/
[root@kafka01 bin]#  ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka01 bin]#  ss -lnt | grep 9092           # 三台kafka都启动完成后,端口才会监听
LISTEN     0      50          :::9092                    :::*

kafka常用指令

当三个节点都启动后,并且确认9092端口在监听,那么就可以执行下面的指令,来测试kafka是否正常

[root@kafka01 ~]#  cd /opt/kafka/bin/
#显示消息列表
[root@kafka01 bin]# ./kafka-topics.sh --zookeeper kafka01:2181,kafka02:2181,kafka03:2181  --list
#创建一个topic,并指定topic属性(副本数、分区数等)
[root@kafka01 bin]# ./kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 3 --topic test
Created topic test.
--partitions(分区)应等于或大于消费者,--replication-factor(副本数)不能大于kafka集群内主机节点

#查看某个topic的状态
[root@kafka02 bin]# ./kafka-topics.sh --zookeeper kafka01:2181 --topic test --describe
Topic: test	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
	Topic: test	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
 
 #生产消息
[root@kafka01 bin]# ./kafka-console-producer.sh --broker-list kafka01:9092 --topic test
>test
>test
>learn
 
 #消费消息
[root@kafka02 bin]# ./kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka01:9092 --topic test
test
learn
 
#查看实时消息,如果从头看可在后面加   --from-beginning
[root@kafka03 bin]# ./kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka01:9092 --topic test --from-beginning
test
test
learn

#删除topic
[root@kafka03 bin]# ./kafka-topics.sh --delete --zookeeper kafka01:2181  --topic   test
Topic test is marked for deletion.

安装kafka manager

kafka有一个管理工具叫kafka-manager,它支持管理多个集群、选择副本、副本重新分配以及创建Topic,同时这个管理工具也是一个非常好的可以快速浏览这个集群的工具
kafka manager有如下功能:

  • 管理多个kafka集群
  • 便捷的检查kafka集群状态(topics,brokers,备份分布情况,分区分布情况)
  • 选择你要运行的副本
  • 基于当前分区状况进行
  • 可以选择topic配置并创建topic(0.8.1.1和0.8.2的配置不同)
  • 删除topic(只支持0.8.2以上的版本并且要在broker配置中设置delete.topic.enable=true)
  • Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
  • 为已存在的topic增加分区
  • 为已存在的topic更新配置
  • 在多个topic上批量重分区
  • 在多个topic上批量重分区(可选partition broker位置)
    建议使用docker来部署这个kafka-manager工具
$ docker run -itd --rm  -p 9000:9000 -e ZK_HOSTS="192.168.51.195:2181,192.168.51.196:2181,192.168.51.197:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager

docker启动后,访问9000端口,界面如下
kafkaos7.png

kafka开启jmx polling

在使用kafka-amnager时,有一个功能为Enable JMX Polling,该部分直接影响部分 kafka broker 和 topic 监控指标指标的获取,那么如何开启此功能呢?方法有两种:
方法1:
启动kafka时增加JMX_PORT=9988

$ JMX_PORT=9988 ./kafka-server-start.sh -daemon ../config/server.properties

方法2:
修改kafka-run-class.sh脚本,第一行增加JMX_PORT=9988即可
不管是哪种方法,都只是定义了一个变量而已,剩下的事情交给程序去做就好

文章作者: 鲜花的主人
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 爱吃可爱多
Kafka Kafka
喜欢就支持一下吧
打赏
微信 微信
支付宝 支付宝