Kubernetes部署Zookeeper Kafka集群
系统环境
- Kubernetes版本:1.20.0
- kafka版本:2.4.1
- zookeeper版本:3.5.9
部署zookeeper
下面的清单包含Headless Service,Service,PodDisruptionBudget和StatefulSet
[root@k8s01 kafka]# vim zookeeper.yaml
---
apiVersion: v1
kind: Service
metadata:
name: zk-hs
namespace: tools-env
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
namespace: tools-env
labels:
app: zk
spec:
ports:
- port: 2181
name: client
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: zk-pdb
namespace: tools-env
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
namespace: tools-env
spec:
selector:
matchLabels:
app: zk
serviceName: zk-hs
replicas: 3
updateStrategy:
type: RollingUpdate
podManagementPolicy: Parallel
template:
metadata:
labels:
app: zk
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- zk
topologyKey: kubernetes.io/hostname
containers:
- name: kubernetes-zookeeper
imagePullPolicy: IfNotPresent
image: 'kubebiz/zookeeper:3.5.9'
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- '-c'
- >-
start-zookeeper --servers=3
--log_dir=/logs
--data_dir=/data
--data_log_dir=/datalog
--conf_dir=/conf
--client_port=2181
--election_port=3888
--server_port=2888
--tick_time=2000
--init_limit=10
--sync_limit=5
--heap=512M
--max_client_cnxns=60
--snap_retain_count=3
--purge_interval=12
--max_session_timeout=40000
--min_session_timeout=4000
--log_level=INFO
readinessProbe:
exec:
command:
- sh
- '-c'
- zookeeper-ready 2181
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- '-c'
- zookeeper-ready 2181
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes:
- ReadWriteOnce
storageClassName: managed-nfs-storage
resources:
requests:
storage: 1Gi
StatefulSet控制器创建三个Pod,每个Pod都有一个带ZooKeeper服务器的容器
Leader 选举
由于在匿名网络中选择leader没有终止算法,因此Zab需要显式成员资格配置来执行leader选举。集合中的每个服务器都需要具有唯一标识符,所有服务器都需要知道全局标识符集,并且每个标识符需要与网络地址相关联
使用kubectl exec获取zk StatefulSet中Pod的主机名,StatefulSet控制器根据其序数索引为每个Pod提供唯一的主机名。主机名采用
[root@k8s01 efk]# for i in 0 1 2; do kubectl exec -n tools-env zk-$i -- hostname; done
zk-0
zk-1
zk-2
检查每个服务器的myid文件的内容
[root@k8s01 kafka]# for i in 0 1 2; do echo "myid zk-$i";kubectl exec -n tools-env zk-$i -- cat /data/myid; done
myid zk-0
1
myid zk-1
2
myid zk-2
3
获取zk StatefulSet中每个Pod的完全限定域名(FQDN)
[root@k8s01 efk]# for i in 0 1 2; do kubectl exec -n tools-env zk-$i -- hostname -f; done
zk-0.zk.tools-env.svc.cluster.local
zk-1.zk.tools-env.svc.cluster.local
zk-2.zk.tools-env.svc.cluster.local
Kubernetes DNS中的A记录将FQDN解析为Pod的IP地址。如果Kubernetes重新调度Pod,它将使用Pod的新IP地址更新A记录,但A记录名称不会更改
[root@k8s01 efk]# kubectl exec -n tools-env zk-0 -- cat /conf/zoo.cfg
#This file was autogenerated DO NOT EDIT
clientPort=2181
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
autopurge.snapRetainCount=3
autopurge.purgeInteval=12
4lw.commands.whitelist=*
server.1=zk-0.zk-hs.tools-env.svc.cluster.local:2888:3888
server.2=zk-1.zk-hs.tools-env.svc.cluster.local:2888:3888
server.3=zk-2.zk-hs.tools-env.svc.cluster.local:2888:3888
部署kafka
[root@k8s01 kafka]# vim kafka.yaml
---
apiVersion: v1
kind: Service
metadata:
name: kafka-hs
namespace: tools-env
labels:
app: kafka
spec:
ports:
- port: 9092
name: server
clusterIP: None
selector:
app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
namespace: tools-env
spec:
selector:
matchLabels:
app: kafka
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: tools-env
spec:
serviceName: kafka-hs
replicas: 3
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- kafka
topologyKey: kubernetes.io/hostname
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- zk
topologyKey: kubernetes.io/hostname
terminationGracePeriodSeconds: 300
containers:
- name: k8skafka
imagePullPolicy: IfNotPresent
image: 'kubebiz/kafka:2.13-2.4.1'
resources:
requests:
memory: 1Gi
cpu: '0.5'
ports:
- containerPort: 9092
name: server
command:
- sh
- '-c'
- >-
exec kafka-server-start.sh /opt/kafka/config/server.properties
--override broker.id=${HOSTNAME##*-}
--override listeners=PLAINTEXT://$(hostname -i):9092
--override zookeeper.connect=zk-cs.tools-env.svc.cluster.local:2181
--override log.dirs=/var/lib/kafka
--override auto.create.topics.enable=true
--override auto.leader.rebalance.enable=true
--override background.threads=10
--override compression.type=producer
--override delete.topic.enable=false
--override leader.imbalance.check.interval.seconds=300
--override leader.imbalance.per.broker.percentage=10
--override log.flush.interval.messages=9223372036854775807
--override log.flush.offset.checkpoint.interval.ms=60000
--override log.flush.scheduler.interval.ms=9223372036854775807
--override log.retention.bytes=-1
--override log.retention.hours=168
--override log.roll.hours=168
--override log.roll.jitter.hours=0
--override log.segment.bytes=1073741824
--override log.segment.delete.delay.ms=60000
--override message.max.bytes=1000012
--override min.insync.replicas=1
--override num.io.threads=8
--override num.network.threads=3
--override num.recovery.threads.per.data.dir=1
--override num.replica.fetchers=1
--override offset.metadata.max.bytes=4096
--override offsets.commit.required.acks=-1
--override offsets.commit.timeout.ms=5000
--override offsets.load.buffer.size=5242880
--override offsets.retention.check.interval.ms=600000
--override offsets.retention.minutes=1440
--override offsets.topic.compression.codec=0
--override offsets.topic.num.partitions=50
--override offsets.topic.replication.factor=3
--override offsets.topic.segment.bytes=104857600
--override queued.max.requests=500
--override quota.consumer.default=9223372036854775807
--override quota.producer.default=9223372036854775807
--override replica.fetch.min.bytes=1
--override replica.fetch.wait.max.ms=500
--override replica.high.watermark.checkpoint.interval.ms=5000
--override replica.lag.time.max.ms=10000
--override replica.socket.receive.buffer.bytes=65536
--override replica.socket.timeout.ms=30000
--override request.timeout.ms=30000
--override socket.receive.buffer.bytes=102400
--override socket.request.max.bytes=104857600
--override socket.send.buffer.bytes=102400
--override unclean.leader.election.enable=true
--override zookeeper.session.timeout.ms=6000
--override zookeeper.set.acl=false
--override broker.id.generation.enable=true
--override connections.max.idle.ms=600000
--override controlled.shutdown.enable=true
--override controlled.shutdown.max.retries=3
--override controlled.shutdown.retry.backoff.ms=5000
--override controller.socket.timeout.ms=30000
--override default.replication.factor=1
--override fetch.purgatory.purge.interval.requests=1000
--override group.max.session.timeout.ms=300000
--override group.min.session.timeout.ms=6000
--override log.cleaner.backoff.ms=15000
--override log.cleaner.dedupe.buffer.size=134217728
--override log.cleaner.delete.retention.ms=86400000
--override log.cleaner.enable=true
--override log.cleaner.io.buffer.load.factor=0.9
--override log.cleaner.io.buffer.size=524288
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
--override log.cleaner.min.cleanable.ratio=0.5
--override log.cleaner.min.compaction.lag.ms=0
--override log.cleaner.threads=1
--override log.cleanup.policy=delete
--override log.index.interval.bytes=4096
--override log.index.size.max.bytes=10485760
--override log.message.timestamp.difference.max.ms=9223372036854775807
--override log.message.timestamp.type=CreateTime
--override log.preallocate=false
--override log.retention.check.interval.ms=300000
--override max.connections.per.ip=2147483647
--override num.partitions=1
--override producer.purgatory.purge.interval.requests=1000
--override replica.fetch.backoff.ms=1000
--override replica.fetch.max.bytes=1048576
--override replica.fetch.response.max.bytes=10485760
--override reserved.broker.max.id=1000
env:
- name: KAFKA_HEAP_OPTS
value: '-Xmx512M -Xms512M'
- name: KAFKA_OPTS
value: '-Dlogging.level=INFO'
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
readinessProbe:
exec:
command:
- sh
- '-c'
- >-
/opt/kafka/bin/kafka-broker-api-versions.sh
--bootstrap-server=$(hostname -i):9092
timeoutSeconds: 5
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
[root@k8s01 kafka]# kubectl apply -f kafka.yaml
service/kafka-hs created
poddisruptionbudget.policy/kafka-pdb created
statefulset.apps/kafka created
[root@k8s01 efk]# kubectl get -n tools-env po -l app=kafka
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 3m42s
kafka-1 1/1 Running 0 3m42s
kafka-2 1/1 Running 0 3m42s
[root@k8s01 efk]# kubectl apply -f kafka.yaml
service/kafka created
poddisruptionbudget.policy/kafka-pdb created
statefulset.apps/kafka created
[root@k8s01 efk]# kubectl get -n tools-env po -l app=kafka
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 3m42s
kafka-1 1/1 Running 0 3m42s
kafka-2 1/1 Running 0 3m42s
创建topic
[root@k8s01 kafka]# kubectl exec -it -n tools-env kafka-0 bash
kafka@kafka-0:/$ kafka-topics.sh -zookeeper zk-cs.tools-env.svc.cluster.local:2181 -topic test -replication-factor 2 -partitions 5 --create
Created topic "test".
查看topic
[root@k8s01 efk]# kubectl exec -it -n tools-env kafka-1 bash
kafka@kafka-1:/$ kafka-topics.sh --zookeeper zk-cs.tools-env.svc.cluster.local:2181 --list
messages
消息发送及消费
[root@k8s01 kafka]# kubectl exec -it -n tools-env kafka-0 bash
kafka@kafka-0:/opt/kafka/bin$ kafka-console-producer.sh --broker-list kafka-0.kafka-hs.tools-env.svc.cluster.local:9092,kafka-1.kafka-hs.tools-env.svc.cluster.local:9092,kafka-2.kafka-hs.tools-env.svc.cluster.local:9092 --topic test
hello
#消费
[root@k8s01 ~]# kubectl exec -it -n tools-env kafka-2 bash
kafka@kafka-2:/$ kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-hs.tools-env.svc.cluster.local:9092,kafka-1.kafka-hs.tools-env.svc.cluster.local:9092,kafka-2.kafka-hs.tools-env.svc.cluster.local:9092 --topic test --from-beginning
[2023-05-05 02:46:38,503] WARN Removing server kafka-0.kafka-hs.tools-env.svc.cluster.local:9092 from bootstrap.servers as DNS resolution failed for kafka-0.kafka-hs.tools-env.svc.cluster.local (org.apache.kafka.clients.ClientUtils)
hello
#消费
[root@k8s01 ~]# kubectl exec -it -n tools-env kafka-1 bash
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.
kafka@kafka-1:/$ kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-hs.tools-env.svc.cluster.local:9092,kafka-1.kafka-hs.tools-env.svc.cluster.local:9092,kafka-2.kafka-hs.tools-env.svc.cluster.local:9092 --topic test --from-beginning
[2023-05-05 02:46:50,658] WARN Removing server kafka-0.kafka-hs.tools-env.svc.cluster.local:9092 from bootstrap.servers as DNS resolution failed for kafka-0.kafka-hs.tools-env.svc.cluster.local (org.apache.kafka.clients.ClientUtils)
hello
版权声明:
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
爱吃可爱多!
喜欢就支持一下吧
打赏
微信
支付宝