kafka集群搭建
1、环境介绍
环境:CentOS7-3.10.0-693.el7.x86_64(CentOS Linux release 7.4.1708 (Core) )
hdss-7-21(192.168.6.21) | hdss-7-11(192.168.6.11) | hdss-7-12(192.168.6.12) |
---|---|---|
zookeeper | zookeeper | zookeeper |
kafka | kafka | kafka |
官方地址:Apache Kafka
2、Zookeeper集群
更多知识可参考:Zookeeper基础
1. JDK
cat >> /etc/profile.d/my_env.sh << EOF
#jdk environment
export JAVA_HOME=/usr/local/jdk1.8.0_291
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
EOF
2. 解压配置
解压到相应目录,并在目录下新建dataDir文件夹
在conf目录在做配置:先拷贝一份 zoo_sample.cfg为zoo.cfg,然后编辑zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/dataDir/
clientPort=2181
#######################cluster##########################
server.1=0.0.0.0:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
这里注意,node2、node3替换为真实服务器的ip地址或能被解析的主机名,其中在node1中,这个ip地址要设置为0.0.0.0,不然肯会出现
Cannot open channel to 1 at election address /xx.xx.xx.xx:3888
的错误,特别是在真实的云服务器上面很容易出现这个问题。
3. 修改另外两台配置
参照前面3步,分别在node2,node3机器上面进行配置,在node2中cong文件就修改为:
server.1=node1:2888:3888
server.2=0.0.0.0:2888:3888
server.3=node3:2888:3888
在node3中cong文件就修改为:
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=0.0.0.0:2888:3888
4. 分别在3台服务进行启动
bin/zkServer.sh start
# 作态查看
bin/zkServer.sh status
3、kafka集群
下载地址:http://kafka.apache.org/downloads
1. 环境部署
1、安装
- 解压到相应目录,并在安装目录下新建logs目录
- 编辑config中的
server.properties
文件,注意这里的broker.id
,部署在不同机器上面的id需要不同,例如在node1中部署,这里就填写1,在node2机器上面就填写2,依次类推。
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/
mv kafka_2.11-0.11.0.0/ kafka
mkdir logs
cd config/
vi server.properties
# 删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的现成数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
###########最为常用的几个配置###########
# broker 的全局唯一编号,不能重复
broker.id=0
# kafka 运行日志存放的路径
log.dirs=/opt/kafka/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 配置连接 Zookeeper 集群地址
zookeeper.connect=hdss-7-21:2181,hdss-7-11:2181,hdss-7-12:2181
2、配置环境变量
vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
3、分发安装包
参考:文件同步
xsync kafka/
注意:分发之后记得配置其他机器的环境变量
4、broker.id唯一
分别在另外两个服务上修改配置文件
/opt/kafka/config/server.properties
中的broker.id=1
、broker.id=2
注:broker.id 不得重复
5、集群管理
1)启动集群
kafka-server-start.sh -daemon config/server.properties # 相对路径
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 绝对路径
2)停止集群
kafka-server-stop.sh stop
3)kafka 群起脚本
for i in hdss-7-21 hdss-7-11 hdss-7-12
do
echo "========== $i =========="
ssh $i '/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties'
done
验证失败,可能没有做免密登录
2. 基本操作命令
1、查看当前服务器中的所有 topic
kafka-topics.sh --zookeeper hdss-7-12:2181 --list
kafka-topics.sh --zookeeper hdss-7-11:2181 --list
kafka-topics.sh --zookeeper hdss-7-21:2181 --list
2、创建topic
kafka-topics.sh --zookeeper hdss-7-12:2181 --create --replication-factor 3 --partitions 1 --topic first
# 创建之后进行查看
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-21:2181 --list
first
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-11:2181 --list
first
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-12:2181 --list
first
–topic 定义 topic 名 –replication-factor 定义副本数 –partitions 定义分区数
3、删除topic
kafka-topics.sh --zookeeper hdss-7-21:2181 --delete --topic first
实例:
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-21:2181 --delete --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-12:2181 --list
[root@hdss-7-21 bin]#
Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
主题优先被标记为删除。
注意:如果删除.topic.enable未设置为true
如果在server.properties中没有开启删除topic的功能,则无法删除(参数如下)
#删除 topic 功能使能 delete.topic.enable=true
4、发送消息
[root@hdss-7-21 bin]# kafka-console-producer.sh --broker-list hdss-7-11:9092 --topic first
>hello world
[2021-03-11 10:46:31,941] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>
[2021-03-11 10:46:31,941] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first=LEADER_NOT_AVAILABLE}
该错误是由于没有配置advertised.listeners
listeners
就是主要用来定义Kafka Broker的Listener的配置项。
advertised.listeners
参数的作用就是将Broker的Listener信息发布到Zookeeper中
server.properties中配置advertised.listeners并重启后
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.6.11:9092
发布消息
[root@hdss-7-21 ~]# kafka-console-producer.sh --broker-list hdss-7-11:9092 --topic first
>hello world
>1 你好啊
>nihao
你好
5、消费消息
创建消费者消费消息
kafka-console-consumer.sh --zookeeper hdss-7-11:2181 --topic first
# 上面的命令会报错,consumer zookeeper is not a recognized option
kafka-console-consumer.sh --bootstrap-server hdss-7-11:9092 --topic first
kafka-console-consumer.sh –zookeeper hdss-7-11:2181 –topic first 这种启动方式在新版本中已经被删除了改用了新的启动方式
kafka-console-consumer.sh –bootstrap-server hdss-7-11:9092 –topic first
[root@hdss-7-11 ~]# kafka-console-consumer.sh --bootstrap-server hdss-7-11:9092 --topic first
nihao
你好
[root@hdss-7-11 ~]# kafka-console-consumer.sh --bootstrap-server hdss-7-12:9092 --from-beginning --topic first
hello world
hello world
1 你就�好啊
www
nihao
nihao
你好
只有生产者生产出消息才会被消费者消费掉
–from-beginning:会把主题中以往所有的数据都读取出来
6、查看某个 Topic 的详情
[root@hdss-7-11 ~]# kafka-topics.sh --zookeeper hdss-7-12:2181 --describe --topic first
Topic: first PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0
7、 修改分区数
[root@hdss-7-21 ~]# kafka-topics.sh --zookeeper hdss-7-21:2181 --alter --topic first --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# 查看Topic详情
[root@hdss-7-11 ~]# kafka-topics.sh --zookeeper hdss-7-12:2181 --describe --topic first
Topic: first PartitionCount: 6 ReplicationFactor: 1 Configs:
Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: first Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: first Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: first Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: first Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: first Partition: 5 Leader: 2 Replicas: 2 Isr: 2
4、zookeeper和kafka的安全认证机制SASL
zookeeper在生产环境中,如果不是只在内网开放的话,就需要设置安全认证,可以选择SASL的安全认证。以下是和kafka的联合配置,如果不需要kafka可以去掉kafka相关的权限即可,以下基于zk3.5.5和kafka2.12进行操作。
下面就是详细的部署步骤:
1. zk
zookeeper所有节点都是对等的,只是各个节点角色可能不相同。以下步骤所有的节点配置相同。
1、导入kafka的相关jar
从kafka/lib目录下复制以下几个jar包到zookeeper的lib目录下:
kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar
2、zoo.cfg文件配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
3、编写JAAS文件,zk_server_jaas.conf,放置在conf目录下
这个文件定义需要链接到Zookeeper服务器的用户名和密码。JAAS配置节默认为Server:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_kafka="kafka-2019"
user_producer="prod-2019";
};
这个文件中定义了两个用户,一个是kafka,一个是producer,这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。
4、修改zkEnv.sh
在zkEnv.sh添加以下内容,路径按你直接的实际路径来填写:
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/conf/zk_server_jaas.conf "
5、各节点启动服务
bin/zkServer.sh start
2. kafka
zookeeper启动之后,就配置kafka,下面步骤的配置在所有节点上都相同。
1、在kafka的config目录下,新建kafka_server_jaas.conf文件,内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_admin="admin-2019"
user_producer="prod-2019"
user_consumer="cons-2019";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-2019";
};
KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。
2、配置server.properties,同样的在config目录下
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://node1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
这里注意listeners配置项,将主机名部分(本例主机名是node1)替换成当前节点的主机名。其他在各个节点的配置一致。注意,allow.everyone.if.no.acl.found这个配置项默认是false,若不配置成true,后续生产者、消费者无法正常使用Kafka。
3、修改启动脚本,添加kafka_server_jaas.conf配置文件
在server启动脚本JVM参数,在bin目录下的
kafka-server-start.sh
中,配置如下
export KAFKA_HEAP_OPTS="-Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
4、配置其他节点
配置剩余的kafka broker节点,注意server.properties的listeners配置项
5、启动各节点kafka服务端
./kafka-server-start.sh ../config/server.properties
# 后台启动
./kafka-server-start.sh -daemon ../config/server.properties