Kubernetes部署kafka集群
之前文章有用过statefulset部署kafka集群,但是如果集群外部想消费的话,是无法消费的,因为不通无法解析kafka地址,想到通过nodeport来暴露每个pod,zookeeper也需要,做起来挺麻烦,本篇是在helm安装后来写的,仅个人学习测试记录,生产环境上kafka集群建议还是放在集群外部,通过endpoint到集群
概述
在k8s里面部署kafka、zookeeper这种有状态的服务,不能使用deployment和RC,k8s提供了一种专门用来部署这种有状态的服务的API--statefulset,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等
statefulset应用场景:
- 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
- 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
- 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
- 有序收缩,有序删除(即从N-1到0)
statefulset组成: - 用于定义网络标志(DNS domain)的Headless Service
- 用于创建PersistentVolumes的volumeClaimTemplates
- 定义具体应用的StatefulSet
StatefulSet中每个Pod的DNS格式为statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中 - 0..N-1为Pod所在的序号,从0开始到N-1
- serviceName为Headless Service的名字
- statefulSetName为StatefulSet的名字
- namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
- .cluster.local为Cluster Domain
部署zookeeper
zookeeper配置文件
[root@k8s01 k8s-kafka]# cat zk-kafka-cm.yaml
apiVersion: v1
data:
ok: |
#!/bin/sh
echo ruok | nc 127.0.0.1 ${1:-2181}
ready: |
#!/bin/sh
echo ruok | nc 127.0.0.1 ${1:-2181}
run: |
#!/bin/bash
set -a
ROOT=$(echo /apache-zookeeper-*)
ZK_USER=${ZK_USER:-"zookeeper"}
ZK_LOG_LEVEL=${ZK_LOG_LEVEL:-"INFO"}
ZK_DATA_DIR=${ZK_DATA_DIR:-"/data"}
ZK_DATA_LOG_DIR=${ZK_DATA_LOG_DIR:-"/data/log"}
ZK_CONF_DIR=${ZK_CONF_DIR:-"/conf"}
ZK_CLIENT_PORT=${ZK_CLIENT_PORT:-2181}
ZK_SERVER_PORT=${ZK_SERVER_PORT:-2888}
ZK_ELECTION_PORT=${ZK_ELECTION_PORT:-3888}
ZK_TICK_TIME=${ZK_TICK_TIME:-2000}
ZK_INIT_LIMIT=${ZK_INIT_LIMIT:-10}
ZK_SYNC_LIMIT=${ZK_SYNC_LIMIT:-5}
ZK_HEAP_SIZE=${ZK_HEAP_SIZE:-2G}
ZK_MAX_CLIENT_CNXNS=${ZK_MAX_CLIENT_CNXNS:-60}
ZK_MIN_SESSION_TIMEOUT=${ZK_MIN_SESSION_TIMEOUT:- $((ZK_TICK_TIME*2))}
ZK_MAX_SESSION_TIMEOUT=${ZK_MAX_SESSION_TIMEOUT:- $((ZK_TICK_TIME*20))}
ZK_SNAP_RETAIN_COUNT=${ZK_SNAP_RETAIN_COUNT:-3}
ZK_PURGE_INTERVAL=${ZK_PURGE_INTERVAL:-0}
ID_FILE="$ZK_DATA_DIR/myid"
ZK_CONFIG_FILE="$ZK_CONF_DIR/zoo.cfg"
LOG4J_PROPERTIES="$ZK_CONF_DIR/log4j.properties"
HOST=$(hostname)
DOMAIN=`hostname -d`
ZOOCFG=zoo.cfg
ZOOCFGDIR=$ZK_CONF_DIR
JVMFLAGS="-Xmx$ZK_HEAP_SIZE -Xms$ZK_HEAP_SIZE"
APPJAR=$(echo $ROOT/*jar)
CLASSPATH="${ROOT}/lib/*:${APPJAR}:${ZK_CONF_DIR}:"
if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then
NAME=${BASH_REMATCH[1]}
ORD=${BASH_REMATCH[2]}
MY_ID=$((ORD+1))
else
echo "Failed to extract ordinal from hostname $HOST"
exit 1
fi
mkdir -p $ZK_DATA_DIR
mkdir -p $ZK_DATA_LOG_DIR
echo $MY_ID >> $ID_FILE
echo "clientPort=$ZK_CLIENT_PORT" >> $ZK_CONFIG_FILE
echo "dataDir=$ZK_DATA_DIR" >> $ZK_CONFIG_FILE
echo "dataLogDir=$ZK_DATA_LOG_DIR" >> $ZK_CONFIG_FILE
echo "tickTime=$ZK_TICK_TIME" >> $ZK_CONFIG_FILE
echo "initLimit=$ZK_INIT_LIMIT" >> $ZK_CONFIG_FILE
echo "syncLimit=$ZK_SYNC_LIMIT" >> $ZK_CONFIG_FILE
echo "maxClientCnxns=$ZK_MAX_CLIENT_CNXNS" >> $ZK_CONFIG_FILE
echo "minSessionTimeout=$ZK_MIN_SESSION_TIMEOUT" >> $ZK_CONFIG_FILE
echo "maxSessionTimeout=$ZK_MAX_SESSION_TIMEOUT" >> $ZK_CONFIG_FILE
echo "autopurge.snapRetainCount=$ZK_SNAP_RETAIN_COUNT" >> $ZK_CONFIG_FILE
echo "autopurge.purgeInterval=$ZK_PURGE_INTERVAL" >> $ZK_CONFIG_FILE
echo "4lw.commands.whitelist=*" >> $ZK_CONFIG_FILE
for (( i=1; i<=$ZK_REPLICAS; i++ ))
do
echo "server.$i=$NAME-$((i-1)).$DOMAIN:$ZK_SERVER_PORT:$ZK_ELECTION_PORT" >> $ZK_CONFIG_FILE
done
rm -f $LOG4J_PROPERTIES
echo "zookeeper.root.logger=$ZK_LOG_LEVEL, CONSOLE" >> $LOG4J_PROPERTIES
echo "zookeeper.console.threshold=$ZK_LOG_LEVEL" >> $LOG4J_PROPERTIES
echo "zookeeper.log.threshold=$ZK_LOG_LEVEL" >> $LOG4J_PROPERTIES
echo "zookeeper.log.dir=$ZK_DATA_LOG_DIR" >> $LOG4J_PROPERTIES
echo "zookeeper.log.file=zookeeper.log" >> $LOG4J_PROPERTIES
echo "zookeeper.log.maxfilesize=256MB" >> $LOG4J_PROPERTIES
echo "zookeeper.log.maxbackupindex=10" >> $LOG4J_PROPERTIES
echo "zookeeper.tracelog.dir=$ZK_DATA_LOG_DIR" >> $LOG4J_PROPERTIES
echo "zookeeper.tracelog.file=zookeeper_trace.log" >> $LOG4J_PROPERTIES
echo "log4j.rootLogger=\${zookeeper.root.logger}" >> $LOG4J_PROPERTIES
echo "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender" >> $LOG4J_PROPERTIES
echo "log4j.appender.CONSOLE.Threshold=\${zookeeper.console.threshold}" >> $LOG4J_PROPERTIES
echo "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout" >> $LOG4J_PROPERTIES
echo "log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n" >> $LOG4J_PROPERTIES
if [ -n "$JMXDISABLE" ]
then
MAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
else
MAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
set -x
exec java -cp "$CLASSPATH" $JVMFLAGS $MAIN $ZK_CONFIG_FILE
kind: ConfigMap
metadata:
labels:
app: zookeeper
name: kafka-zookeeper
namespace: test-env
部署zookeeper
[root@k8s01 k8s-kafka]# cat zookeeper.yaml
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: test-env
spec:
podManagementPolicy: OrderedReady
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app: zookeeper
component: server
release: kafka
serviceName: zookeeper-headless
template:
metadata:
creationTimestamp: null
labels:
app: zookeeper
component: server
release: kafka
spec:
containers:
- command:
- /bin/bash
- -xec
- /config-scripts/run
env:
- name: ZK_REPLICAS
value: "3"
- name: JMXAUTH
value: "false"
- name: JMXDISABLE
value: "false"
- name: JMXPORT
value: "1099"
- name: JMXSSL
value: "false"
- name: ZK_HEAP_SIZE
value: 1G
- name: ZK_SYNC_LIMIT
value: "10"
- name: ZK_TICK_TIME
value: "2000"
- name: ZOO_AUTOPURGE_PURGEINTERVAL
value: "0"
- name: ZOO_AUTOPURGE_SNAPRETAINCOUNT
value: "3"
- name: ZOO_INIT_LIMIT
value: "5"
- name: ZOO_MAX_CLIENT_CNXNS
value: "60"
- name: ZOO_PORT
value: "2181"
- name: ZOO_STANDALONE_ENABLED
value: "false"
- name: ZOO_TICK_TIME
value: "2000"
image: zookeeper:3.5.5
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
command:
- sh
- /config-scripts/ok
failureThreshold: 2
initialDelaySeconds: 20
periodSeconds: 30
successThreshold: 1
timeoutSeconds: 5
name: zookeeper
ports:
- containerPort: 2181
name: client
protocol: TCP
- containerPort: 3888
name: election
protocol: TCP
- containerPort: 2888
name: server
protocol: TCP
readinessProbe:
exec:
command:
- sh
- /config-scripts/ready
failureThreshold: 2
initialDelaySeconds: 20
periodSeconds: 30
successThreshold: 1
timeoutSeconds: 5
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /data
name: data
- mountPath: /config-scripts
name: config
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
fsGroup: 1000
runAsUser: 1000
terminationGracePeriodSeconds: 1800
volumes:
- configMap:
defaultMode: 365
name: kafka-zookeeper
name: config
updateStrategy:
type: RollingUpdate
volumeClaimTemplates:
- metadata:
creationTimestamp: null
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: managed-nfs-storage
---
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper
release: kafka
name: zookeeper
namespace: test-env
spec:
ports:
- name: client
port: 2181
protocol: TCP
targetPort: client
selector:
app: zookeeper
release: kafka
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper
release: kafka
name: zookeeper-headless
namespace: test-env
spec:
clusterIP: None
ports:
- name: client
port: 2181
protocol: TCP
targetPort: client
- name: election
port: 3888
protocol: TCP
targetPort: election
- name: server
port: 2888
protocol: TCP
targetPort: server
selector:
app: zookeeper
release: kafka
部署kafka
部署kafka
[root@k8s01 k8s-kafka]# cat kafka.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
name: kafka
namespace: test-env
spec:
podManagementPolicy: OrderedReady
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
serviceName: kafka-headless
template:
metadata:
creationTimestamp: null
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
spec:
containers:
- command:
- sh
- -exc
- |
unset KAFKA_PORT && \
export KAFKA_BROKER_ID=${POD_NAME##*-} && \
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092,EXTERNAL://192.168.50.205:$((31090 + ${KAFKA_BROKER_ID})) && \
exec /etc/confluent/docker/run
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: KAFKA_HEAP_OPTS
value: -Xmx1G -Xms1G
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181
- name: KAFKA_LOG_DIRS
value: /opt/kafka/data/logs
- name: KAFKA_ADVERTISED_LISTENERS
value: EXTERNAL://192.168.50.205:$((31090 + ${KAFKA_BROKER_ID}))
- name: KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE
value: "false"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- name: KAFKA_JMX_PORT
value: "5555"
image: confluentinc/cp-kafka:5.0.1
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
command:
- sh
- -ec
- /usr/bin/jps | /bin/grep -q SupportedKafka
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
name: kafka-broker
ports:
- containerPort: 9092
name: kafka
protocol: TCP
- containerPort: 31090
name: external-0
protocol: TCP
- containerPort: 31091
name: external-1
protocol: TCP
- containerPort: 31092
name: external-2
protocol: TCP
readinessProbe:
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka
timeoutSeconds: 5
volumeMounts:
- mountPath: /opt/kafka/data
name: datadir
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 60
updateStrategy:
type: OnDelete
volumeClaimTemplates:
- metadata:
creationTimestamp: null
name: datadir
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: managed-nfs-storage
---
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
name: kafka
namespace: test-env
spec:
ports:
- name: broker
port: 9092
protocol: TCP
targetPort: kafka
selector:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
name: kafka-headless
namespace: test-env
spec:
clusterIP: None
ports:
- name: broker
port: 9092
protocol: TCP
targetPort: 9092
selector:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
pod: kafka-0
name: kafka-0-external
namespace: test-env
spec:
externalTrafficPolicy: Cluster
ports:
- name: external-broker
nodePort: 31090
port: 19092
protocol: TCP
targetPort: 31090
selector:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
statefulset.kubernetes.io/pod-name: kafka-0
type: NodePort
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
pod: kafka-1
name: kafka-1-external
namespace: test-env
spec:
externalTrafficPolicy: Cluster
ports:
- name: external-broker
nodePort: 31091
port: 19092
protocol: TCP
targetPort: 31091
selector:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
statefulset.kubernetes.io/pod-name: kafka-1
type: NodePort
---
apiVersion: v1
kind: Service
metadata:
labels:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
pod: kafka-2
name: kafka-2-external
namespace: test-env
spec:
externalTrafficPolicy: Cluster
ports:
- name: external-broker
nodePort: 31092
port: 19092
protocol: TCP
targetPort: 31092
selector:
app.kubernetes.io/component: kafka-broker
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
statefulset.kubernetes.io/pod-name: kafka-2
type: NodePort
测试
[root@k8s01 k8s-kafka]# kubectl get -n test-env po
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 38h
kafka-1 1/1 Running 0 38h
kafka-2 1/1 Running 0 38h
zookeeper-0 1/1 Running 0 38h
zookeeper-1 1/1 Running 0 38h
zookeeper-2 1/1 Running 0 38h
[root@nfs kafka_2.12-2.3.0]# bin/kafka-topics.sh --list --bootstrap-server 192.168.50.205:31092
__consumer_offsets
axhome
版权声明:
本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
爱吃可爱多!
喜欢就支持一下吧
打赏
微信
支付宝