设置一个Kafka群集

最近需要改一个实时程序,数据源订阅的Kafka,虽然之前有看过一阵资料,但遗忘的差不多了,因此建个环境练习一下。本文会创建两个节点的Kafka群集,配置环境为MacOS。

Kafka介绍

这部分可以参考我以前翻译的几篇文章:

Kafka 官方文档翻译

Apache Kafka 初学者指南[译]

Spark Streaming + Kafka集成指南(Kafka broker version 0.8.2.1 or highe)

Cloudera Distribution of Apache Kafka[译]

下载安装Kafka

Kafka官方下载最新的版本,如果你Kafka要与Scala一起使用,那么请选择相应的Scala版本,否则随意。

解压

tar -xzf ~/Downloads/kafka_2.12-1.0.1.tgz

创建Kafka存储目录

mkdir kafka-log-1
mkdir kafka-log-2

配置kafka

设置第一个broker

编辑kafka_2.12-1.0.1/config/server.properties配置文件

修改log.dirs的目录

log.dirs=/Users/haseochen/app/kafka-log-1

设置第二个broker

复制一份server.properties

cp config/server.properties config/server2.properties

编辑kafka_2.12-1.0.1/config/server2.properties配置文件

修改log.dirs的目录

log.dirs=/Users/haseochen/app/kafka-log-2

增加port属性

port=9091

修改broker.id

broker.id=1

测试配置

启用服务

1.启动zookeeper,这里启动是kafka自带的zookeeper(请勿在生产环境使用)

kafka_2.12-1.0.1/bin/zookeeper-server-start.sh kafka_2.12-1.0.1/config/zookeeper.properties

2.启动第一个broker

kafka_2.12-1.0.1/bin/kafka-server-start.sh kafka_2.12-1.0.1/config/server.properties

3.启动第二个broker

kafka_2.12-1.0.1/bin/kafka-server-start.sh kafka_2.12-1.0.1/config/server2.properties

4.查看进程

使用ps -ef | grep kafka将看到两个kafka进程。

创建topic

通过bin/kafka-topics.sh可以创建Topic,zookeeper指向刚才启动的本地zookeeper,topic名字为first,分区数2,副本数2(因为我们才两台机器所以最多就是2个副本)

kafka_2.12-1.0.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first --partitions 2 --replication-factor 2

查看topic

kafka_2.12-1.0.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first

验证置producer和consumer的配置

创建数据

kafka_2.12-1.0.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

消费数据

kafka_2.12-1.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
kafka_2.12-1.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first --from-beginning  #--from-beginning表示从头开始读取该topic的数据。

此时在producer界面里面输入数据回车后再consumer中就会获得数据

程序

Java版

生产数据

SimpleCounter

cd /kafka-examples/SimpleCounter
mvn install
./run_params.sh localhost:9092 v1 new async 500 10

消费数据

SimpleMovingAvg

cd /kafka-examples/SimpleMovingAvg
mvn install
java -cp target/uber-SimpleMovingAvg-1.0-SNAPSHOT.jar com.shapira.examples.newconsumer.simplemovingavg.SimpleMovingAvgNewConsumer localhost:9092 g1 v1 10

Python版

pip install kafka-python

Python版代码

问题汇总

producer无法连接Kafka

Fromthe producer you will see:
kafka.common.KafkaException: fetching topic metadata for topics [Set (t1)] from broker [ArrayBuffer(id:0,host:localhost,port:9093)] failed
And below that:
Caused by: java.net.ConnectException: Connection refused

  • 检查broker的host/port
  • 使用telnet尝试连接
  • host错误,调整advertised.hosts配置

producer LeaderNotAvailable错误

From the producer you will see:
WARN Error while fetching metadata [{TopicMetadata for topic t1 -> No partition metadata for topic t1 due to kafka.common.LeaderNotAvailableException

  • 是否创建了topic?
  • 是否设置了auto.create.topics.enable = true?
  • default.replication.factor 是否小于存活的broker数?
  • 使用kafka-topics.sh --describe来确认leader是否存在,本文中为kafka_2.12-1.0.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe

无法删除topic

尝试删除但无任何结果
一直在’marked for deletion’状态
Topic删除以后又再现了

  • 在0.8.2.0之前根本无法删除topic
  • 在0.8.2.0之后发现’marked for deletion’状态,确保delete.topic.enable被设置为true
  • 如果设置了auto.create.topics.enable=true则客户端尝试访问这个topic的时候,topic会自动重新创建。
打赏支持:支付宝/微信。如果你觉得我的文章对你有所帮助,可以打赏我哟。