设置一个Kafka群集

最近需要改一个实时程序,数据源订阅的Kafka,虽然之前有看过一阵资料,但遗忘的差不多了,因此建个环境练习一下。本文会创建两个节点的Kafka群集,配置环境为MacOS。

Kafka介绍

这部分可以参考我以前翻译的几篇文章:

Kafka 官方文档翻译

Apache Kafka 初学者指南[译]

Spark Streaming + Kafka集成指南(Kafka broker version 0.8.2.1 or highe)

Cloudera Distribution of Apache Kafka[译]

下载安装Kafka

Kafka官方下载最新的版本,如果你Kafka要与Scala一起使用,那么请选择相应的Scala版本,否则随意。

解压

tar -xzf ~/Downloads/kafka_2.12-1.0.1.tgz

创建Kafka存储目录

mkdir kafka-log-1
mkdir kafka-log-2

配置kafka

设置第一个broker

编辑kafka_2.12-1.0.1/config/server.properties配置文件

修改log.dirs的目录

log.dirs=/Users/haseochen/app/kafka-log-1

设置第二个broker

复制一份server.properties

cp config/server.properties config/server2.properties

编辑kafka_2.12-1.0.1/config/server2.properties配置文件

修改log.dirs的目录

log.dirs=/Users/haseochen/app/kafka-log-2

增加port属性

port=9091

修改broker.id

broker.id=1

测试配置

启用服务

1.启动zookeeper,这里启动是kafka自带的zookeeper(请勿在生产环境使用)

kafka_2.12-1.0.1/bin/zookeeper-server-start.sh kafka_2.12-1.0.1/config/zookeeper.properties

2.启动第一个broker

kafka_2.12-1.0.1/bin/kafka-server-start.sh kafka_2.12-1.0.1/config/server.properties

3.启动第二个broker

kafka_2.12-1.0.1/bin/kafka-server-start.sh kafka_2.12-1.0.1/config/server2.properties

4.查看进程

使用ps -ef | grep kafka将看到两个kafka进程。

创建topic

通过bin/kafka-topics.sh可以创建Topic,zookeeper指向刚才启动的本地zookeeper,topic名字为first,分区数2,副本数2(因为我们才两台机器所以最多就是2个副本)

kafka_2.12-1.0.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first --partitions 2 --replication-factor 2

查看topic

kafka_2.12-1.0.1/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first

验证置producer和consumer的配置

创建数据

kafka_2.12-1.0.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

消费数据

kafka_2.12-1.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first
kafka_2.12-1.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first --from-beginning  #--from-beginning表示从头开始读取该topic的数据。

此时在producer界面里面输入数据回车后再consumer中就会获得数据

程序

Java版

生产数据

SimpleCounter

cd /kafka-examples/SimpleCounter
mvn install
./run_params.sh localhost:9092 v1 new async 500 10

消费数据

SimpleMovingAvg

cd /kafka-examples/SimpleMovingAvg
mvn install
java -cp target/uber-SimpleMovingAvg-1.0-SNAPSHOT.jar com.shapira.examples.newconsumer.simplemovingavg.SimpleMovingAvgNewConsumer localhost:9092 g1 v1 10

Python版

pip install kafka-python

Python版代码

打赏支持:支付宝/微信。如果你觉得我的文章对你有所帮助,可以打赏我哟。