Kafka基础(一)- 环境搭建

目录

kafka集群搭建

1、环境介绍

环境:CentOS7-3.10.0-693.el7.x86_64(CentOS Linux release 7.4.1708 (Core) )

hdss-7-21(192.168.6.21) hdss-7-11(192.168.6.11) hdss-7-12(192.168.6.12)
zookeeper zookeeper zookeeper
kafka kafka kafka

官方地址:Apache Kafka

Apache Zookeeper

2、Zookeeper集群

更多知识可参考:Zookeeper基础

1. JDK

cat >> /etc/profile.d/my_env.sh << EOF
#jdk environment
export JAVA_HOME=/usr/local/jdk1.8.0_291
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
EOF

2. 解压配置

  1. 解压到相应目录,并在目录下新建dataDir文件夹

  2. 在conf目录在做配置:先拷贝一份 zoo_sample.cfg为zoo.cfg,然后编辑zoo.cfg:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/zookeeper/dataDir/
    clientPort=2181
    #######################cluster##########################
    server.1=0.0.0.0:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888

这里注意,node2、node3替换为真实服务器的ip地址或能被解析的主机名,其中在node1中,这个ip地址要设置为0.0.0.0,不然肯会出现Cannot open channel to 1 at election address /xx.xx.xx.xx:3888的错误,特别是在真实的云服务器上面很容易出现这个问题。

3. 修改另外两台配置

参照前面3步,分别在node2,node3机器上面进行配置,在node2中cong文件就修改为:

    server.1=node1:2888:3888
    server.2=0.0.0.0:2888:3888
    server.3=node3:2888:3888

在node3中cong文件就修改为:

    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=0.0.0.0:2888:3888

4. 分别在3台服务进行启动

bin/zkServer.sh  start

# 作态查看
bin/zkServer.sh  status

3、kafka集群

下载地址:http://kafka.apache.org/downloads

1. 环境部署

1、安装

  1. 解压到相应目录,并在安装目录下新建logs目录
  2. 编辑config中的server.properties文件,注意这里的broker.id,部署在不同机器上面的id需要不同,例如在node1中部署,这里就填写1,在node2机器上面就填写2,依次类推。
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/

mv kafka_2.11-0.11.0.0/ kafka

mkdir logs

cd config/
vi server.properties
# 删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的现成数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
###########最为常用的几个配置###########
# broker 的全局唯一编号,不能重复
broker.id=0
# kafka 运行日志存放的路径
log.dirs=/opt/kafka/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 配置连接 Zookeeper 集群地址
zookeeper.connect=hdss-7-21:2181,hdss-7-11:2181,hdss-7-12:2181

2、配置环境变量

vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

3、分发安装包

参考:文件同步

xsync kafka/

注意:分发之后记得配置其他机器的环境变量

4、broker.id唯一

分别在另外两个服务上修改配置文件/opt/kafka/config/server.properties 中的 broker.id=1broker.id=2

注:broker.id 不得重复

5、集群管理

1)启动集群

kafka-server-start.sh -daemon config/server.properties	# 相对路径
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 绝对路径

2)停止集群

kafka-server-stop.sh stop

3)kafka 群起脚本

for i in hdss-7-21 hdss-7-11 hdss-7-12
do
echo "========== $i =========="
ssh $i '/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties'
done

验证失败,可能没有做免密登录

2. 基本操作命令

1、查看当前服务器中的所有 topic

kafka-topics.sh --zookeeper hdss-7-12:2181 --list
kafka-topics.sh --zookeeper hdss-7-11:2181 --list
kafka-topics.sh --zookeeper hdss-7-21:2181 --list

2、创建topic

kafka-topics.sh --zookeeper hdss-7-12:2181 --create --replication-factor 3 --partitions 1 --topic first
# 创建之后进行查看
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-21:2181 --list
first
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-11:2181 --list 
first
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-12:2181 --list 
first

–topic 定义 topic 名 –replication-factor 定义副本数 –partitions 定义分区数

3、删除topic

kafka-topics.sh --zookeeper hdss-7-21:2181 --delete --topic first

实例:

[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-21:2181 --delete --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hdss-7-21 bin]# kafka-topics.sh --zookeeper hdss-7-12:2181 --list                
[root@hdss-7-21 bin]# 

Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

主题优先被标记为删除。

注意:如果删除.topic.enable未设置为true

如果在server.properties中没有开启删除topic的功能,则无法删除(参数如下)

#删除 topic 功能使能 delete.topic.enable=true

4、发送消息

[root@hdss-7-21 bin]# kafka-console-producer.sh --broker-list hdss-7-11:9092 --topic first
>hello world
[2021-03-11 10:46:31,941] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>

[2021-03-11 10:46:31,941] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first=LEADER_NOT_AVAILABLE}

该错误是由于没有配置advertised.listeners

listeners就是主要用来定义Kafka Broker的Listener的配置项。

advertised.listeners参数的作用就是将Broker的Listener信息发布到Zookeeper中

server.properties中配置advertised.listeners并重启后

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.6.11:9092

发布消息

[root@hdss-7-21 ~]# kafka-console-producer.sh --broker-list hdss-7-11:9092 --topic first
>hello world
>1 你好啊   
>nihao
你好

5、消费消息

创建消费者消费消息

kafka-console-consumer.sh --zookeeper hdss-7-11:2181 --topic first
# 上面的命令会报错,consumer zookeeper is not a recognized option
kafka-console-consumer.sh --bootstrap-server hdss-7-11:9092 --topic first

kafka-console-consumer.sh –zookeeper hdss-7-11:2181 –topic first 这种启动方式在新版本中已经被删除了改用了新的启动方式

kafka-console-consumer.sh –bootstrap-server hdss-7-11:9092 –topic first

[root@hdss-7-11 ~]# kafka-console-consumer.sh --bootstrap-server hdss-7-11:9092 --topic first
nihao
你好

[root@hdss-7-11 ~]# kafka-console-consumer.sh --bootstrap-server hdss-7-12:9092 --from-beginning --topic first
hello world
hello world
1 你就�好啊
www
nihao
nihao
你好

只有生产者生产出消息才会被消费者消费掉

–from-beginning:会把主题中以往所有的数据都读取出来

6、查看某个 Topic 的详情

[root@hdss-7-11 ~]# kafka-topics.sh --zookeeper hdss-7-12:2181 --describe --topic first
Topic: first    PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

7、 修改分区数

[root@hdss-7-21 ~]# kafka-topics.sh --zookeeper hdss-7-21:2181 --alter --topic first --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# 查看Topic详情
[root@hdss-7-11 ~]# kafka-topics.sh --zookeeper hdss-7-12:2181 --describe --topic first
Topic: first    PartitionCount: 6       ReplicationFactor: 1    Configs: 
        Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: first    Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: first    Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: first    Partition: 3    Leader: 0       Replicas: 0     Isr: 0
        Topic: first    Partition: 4    Leader: 1       Replicas: 1     Isr: 1
        Topic: first    Partition: 5    Leader: 2       Replicas: 2     Isr: 2

4、zookeeper和kafka的安全认证机制SASL

zookeeper在生产环境中,如果不是只在内网开放的话,就需要设置安全认证,可以选择SASL的安全认证。以下是和kafka的联合配置,如果不需要kafka可以去掉kafka相关的权限即可,以下基于zk3.5.5和kafka2.12进行操作。

下面就是详细的部署步骤:

1. zk

zookeeper所有节点都是对等的,只是各个节点角色可能不相同。以下步骤所有的节点配置相同。

1、导入kafka的相关jar

从kafka/lib目录下复制以下几个jar包到zookeeper的lib目录下:

kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar

2、zoo.cfg文件配置

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

3、编写JAAS文件,zk_server_jaas.conf,放置在conf目录下

这个文件定义需要链接到Zookeeper服务器的用户名和密码。JAAS配置节默认为Server:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="admin" 
        password="admin-2019" 
        user_kafka="kafka-2019" 
        user_producer="prod-2019";
};

这个文件中定义了两个用户,一个是kafka,一个是producer,这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。

4、修改zkEnv.sh

在zkEnv.sh添加以下内容,路径按你直接的实际路径来填写:

export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/mnt/tools/zookeeper/apache-zookeeper-3.5.5/conf/zk_server_jaas.conf "

5、各节点启动服务

bin/zkServer.sh start

2. kafka

zookeeper启动之后,就配置kafka,下面步骤的配置在所有节点上都相同。

1、在kafka的config目录下,新建kafka_server_jaas.conf文件,内容如下:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-2019"
    user_admin="admin-2019"
    user_producer="prod-2019"
    user_consumer="cons-2019";
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka-2019";
};

KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。

2、配置server.properties,同样的在config目录下

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://node1:9092
security.inter.broker.protocol=SASL_PLAINTEXT  
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN  
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

这里注意listeners配置项,将主机名部分(本例主机名是node1)替换成当前节点的主机名。其他在各个节点的配置一致。注意,allow.everyone.if.no.acl.found这个配置项默认是false,若不配置成true,后续生产者、消费者无法正常使用Kafka。

3、修改启动脚本,添加kafka_server_jaas.conf配置文件

在server启动脚本JVM参数,在bin目录下的kafka-server-start.sh中,配置如下

export KAFKA_HEAP_OPTS="-Xmx1g -Xms1g -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"

4、配置其他节点

配置剩余的kafka broker节点,注意server.properties的listeners配置项

5、启动各节点kafka服务端

./kafka-server-start.sh ../config/server.properties

# 后台启动
./kafka-server-start.sh -daemon ../config/server.properties

参考文档:https://developer.aliyun.com/article/708449#slide-5