计算机系统应用教程网站

网站首页 > 技术文章 正文

kafka和zookeeper在k8s集群踩的一些坑

btikc 2024-10-25 11:02:02 技术文章 14 ℃ 0 评论

zookeeper配置istio sidecar后存在的网络不可用问题

如果zookeeper配置了istio sidecar ,在选举阶段就会报connection refused(Connection refused)错误,如下图:

这主要是因为 zookeeper 在server之间通信默认是监听 pod IP 地址,而istio要求监听0.0.0.0,因此需要设置quorumListenOnAllIPs=true

具体问题可以参考:https://istio.io/latest/faq/applications/

这个不止在 zookeeper 中会出现,包括 Apache NiFi 、 Cassandra、 Elasticsearch、Redis 中安装 sidecar 模式都会存在这个问题。

由于docker官方的zookeeper镜像没有提供 quorumListenOnAllIPs 的参数,我们需要直接手动添加,详细参考这个issue:
https://github.com/31z4/zookeeper-docker/issues/117

或者可以用 bitnami/zookeeper 这个镜像,这个镜像提供了 quorumListenOnAllIPs 支持,可以通过设置ZOO_LISTEN_ALLIPS_ENABLED环境变量来控制,下面是简单的deployment文件:

kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-1
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-1
  template:
    metadata:
      labels:
        app: zookeeper-1
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "1"
        - name: ZOO_SERVERS
          value: 0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
--- 
kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-2
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-2
  template:
    metadata:
      labels:
        app: zookeeper-2
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "2"
        - name: ZOO_SERVERS
          value: zookeeper-1:2888:3888,0.0.0.0:2888:3888,zookeeper-3:2888:3888
--- 

kind: Deployment
apiVersion: apps/v1
metadata:
  name: zookeeper-3
  namespace: rcmd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-3
  template:
    metadata:
      labels:
        app: zookeeper-3
    spec:
      containers:
      - name: zookeeper
        image: bitnami/zookeeper:3.6.2
        imagePullPolicy: Always
        ports:
        - containerPort: 2181
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_LISTEN_ALLIPS_ENABLED
          value: "true"
        - name: ZOO_SERVER_ID
          value: "3"
        - name: ZOO_SERVERS
          value: zookeeper-1:2888:3888,zookeeper-2:2888:3888,0.0.0.0:2888:3888

再设置好 services 就可以 running 了。

kafka在k8s外网访问设置项

k8s 对外暴露一般都会走ingress,但kafka由于其自身特殊的connect机制,我们需要专门设置kafka让其客户端感知到其目标连接。

kafka 和客户端建立连接:

  • 客户端向 kafka server 发起 findCoordinator 请求,寻找可以建立连接的协调者,server 会返回broker连接地址
  • 客户端获得地址后,会创建该 Broker 的 Socket 连接,并保持心跳上报,连接建立起来之后初始和第一个borker的连接会被关闭

由于 kafka 会主要告诉客户端 broker 的连接地址,因为在对外网开放的时候我们需要把 broker 地址设置成外网可访问的地址,这里以wurstmeister/kafka的kafka为例,可以通过以下设置让外网访问:

kind: Deployment
apiVersion: apps/v1
metadata:
  name: kafka-broker0
  namespace: databases
spec:
  replicas: 1
  selector:
    matchLabels:
        app: kafka
        id: "kafka-broker0"
  template:
    metadata:
      labels:
        app: kafka
        id: "kafka-broker0"
    spec:
      containers:
      - name: kafka
        image: "wurstmeister/kafka:2.12-2.5.0"
        imagePullPolicy: "IfNotPresent"
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "INSIDE://kafka-broker0:9092,OUTSIDE://kafka.db.tensorbytes.com:10000"
        - name: KAFKA_LISTENERS
          value: "INSIDE://:9092,OUTSIDE://:10000"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "INSIDE"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-1:2181
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_CREATE_TOPICS
          value: mp_post_slog:1:1
        - name: LOG4J_LOGGER_KAFKA_AUTHORIZER_LOGGER
          value: "DEBUG"
        resources:
          limits:
            cpu: 200m
            memory: 512Mi

环境变量:

KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

istio virtualserver 配置文件:

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: kafka-router
  namespace: databases
spec:
  gateways:
  - db-external-gateway
  hosts:
  - kafka.db.tensorbytes.com
  tcp:
  - match:
    - port: 10000
    route:
    - destination:
        host: kafka-broker0.databases.svc.cluster.local
        port:
          number: 10000

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker0
  labels:
    name: kafka
  namespace: rcmd
spec:
  ports:
  - port: 9092
    name: internal-port
    protocol: TCP
    targetPort: 9092
  - port: 10000
    name: external-port
    protocol: TCP
    targetPort: 10000
  selector:
    app: kafka
    id: "kafka-broker0"
  type: ClusterIP

集群外通过网关访问的测试脚本

生产者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka.db.tensorbytes.com:10000')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

#coding:utf-8
from kafka import KafkaConsumer

consumer = KafkaConsumer('mp_post_slog', bootstrap_servers='kafka.db.tensorbytes.com:10000', group_id='my_favorite_group')

for msg in consumer:
    print(msg)

集群内测试脚本

生产者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
    producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表