本文主要包括:
Kafka可以干什么?
Kafka安装与启动
Kafka安装
Kafka需要用到java,安装前需要安装java,这里省略
Kafka可以使用内置的zookeeper,也可以使用单独的zookeeper,一般生产环境都是使用单独的zookeeper集群
zookeeper安装可以参考Zookeeper学习笔记(一)
官网下载Kafka安装包,并解压
tar zxvf kafka-2.11.0.tar.gz -C .
cd kafka-2.11.0/config
修改server.properties
host.name=golden-02
# 指定kafka日志文件的存储目录
log.dirs=/opt/modules/kafka-2.11.0/kafka-logs
# 指定zookeeper的连接地址,多个地址用逗号分隔
zookeeper.connect=golden-02:2181
这里设置了kafka-logs,手动生成了这个文件夹
最后,设置Kakfa的环境变量
Kafka启动等操作
# 启动
cd /opt/modules/kafka-2.11.0/ && bin/kafka-server-start.sh config/server.properties &
# 关闭
cd /opt/modules/kafka-2.11.0/ && bin/kafka-server-stop.sh
Kafka操作样例
## 查看所有topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --list
## 创建topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --create --replication-factor 3 --partitions 1 --topic first
## 删除topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --delete --topic first
## 发送消息
bin/kafka-console-producer.sh --broker-list golden-02:9092 --topic first
# 或者下面命令
bin/kafka-console-producer.sh --bootstrap-server golden-02:9092 --topic first
## 消费消息
bin/kafka-console-consumer.sh --bootstrap-server golden-02:9092 --from-beginning --topic first
## 查看某个Topic的详情
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --describe --topic first
## 查看某个group_id的消费情况,offset
bin/kafka-consumer-groups.sh --bootstrap-server golden-02:9092 --describe --group fink-test
## 重置offset
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset 210000 --execute
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset --to-latest --execute
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset --to-datetime 2022-10-25T12:30:00.000 --execute
# cdh版本命令
kafka-topics --zookeeper 172.16.2.205:2181 --list
kafka-topics --zookeeper 172.16.2.205:2181 --delete --topic users_source_mysql
kafka-topics --zookeeper 172.16.2.205:2181 --create --replication-factor 1 --partitions 1 --topic first
kafka-console-consumer --bootstrap-server 172.16.2.204:9092 --from-beginning --topic users_source_mysql
kafka-console-producer --broker-list 172.16.2.205:9092 --topic first
选项说明:
- –topic 定义topic名
- –replication-factor 定义副本数
- –partitions 定义分区数
删除topic时删除不掉,日志提醒:This will have no impact if delete.topic.enable is not set to true
到kafka的server.properties里设置delete.topic.enable=true
Kafka常见问题
- auto.offset.reset设置无效
原因:
auto.offset.reset
只会在Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量的时候才会生效。
换句话说,如果当前group_id
已经消费过这个topic
(可以查到offset),这个参数就没用了,要再想从头开始消费,就得换个group_id了
官网说明:
Kafka数据迁移
公司的kafka数据存储所在磁盘空间太小,后期扩容后,也没有把扩容后的磁盘加入到kafka。现在想把之前的磁盘换到新扩容的磁盘里
[root@ddp3 ~]# df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 16G 0 16G 0% /dev
tmpfs 16G 0 16G 0% /dev/shm
tmpfs 16G 856K 16G 1% /run
tmpfs 16G 0 16G 0% /sys/fs/cgroup
/dev/vda1 99G 86G 8.2G 92% /
/dev/vdb1 296G 90G 193G 32% /data
cm_processes 16G 29M 16G 1% /run/cloudera-scm-agent/process
tmpfs 3.1G 0 3.1G 0% /run/user/0
overlay 99G 86G 8.2G 92% /var/lib/docker/overlay2/eab5e26d05206a9ae6591a28f66bebd4dc79529d2fcc462a135c855052fd49bd/merged
shm 64M 0 64M 0% /var/lib/docker/containers/2733068e2bd0a33d90df3ce3a011a651e8d6a9ab81b4782fe2f9896701e48f1e/shm
kafka数据存储在/dev/vda1
磁盘下,现在想换到/dev/vdb1
需要解决两个问题:
- kafka数据盘挂在新的文件夹下
- kakfa的历史数据需要移动到新的文件夹下
但是不知道直接mv历史数据到新的磁盘下,数据会不会丢失,这里在自己电脑上做一下测试
本机的server.properties里配置的log.dirs=/opt/modules/kafka-2.8.0/kafka-logs
目标是把数据迁移到/tmp/kafka/kafka-logs
具体操作步骤:
# 1. 创建kafkatopic
bin/kafka-topics.sh --bootstrap-server golden-cloud:9092 --create --replication-factor 1 --partitions 3 --topic first
# 2. 向topic里发送4条数据,aaa/bbb/ccc/ddd
bin/kafka-console-producer.sh --broker-list golden-cloud:9092 --topic first
# 3.kafka服务不停止,直接mv文件夹
cd /tmp/kafka/ && mv /opt/modules/kafka-2.8.0/kafka-logs/ .
# 这时候kafka因为数据文件夹切换了,kafka服务报错停止
[2022-03-31 16:57:50,559] ERROR Error while reading checkpoint file /opt/modules/kafka-2.8.0/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.NoSuchFileException: /opt/modules/kafka-2.8.0/kafka-logs/cleaner-offset-checkpoint
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at java.nio.file.Files.newBufferedReader(Files.java:2784)
...
[2022-03-31 16:57:50,584] WARN [ReplicaManager broker=0] Broker 0 stopped fetcher for partitions __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8,__consumer_offsets-21,__consumer_offsets-4,__consumer_offsets-27,__consumer_offsets-7,__consumer_offsets-9,first-1,__consumer_offsets-46,__consumer_offsets-25,__consumer_offsets-35,__consumer_offsets-41,__consumer_offsets-33,__consumer_offsets-23,__consumer_offsets-49,__consumer_offsets-47,__consumer_offsets-16,__consumer_offsets-28,__consumer_offsets-31,__consumer_offsets-36,__consumer_offsets-42,__consumer_offsets-3,__consumer_offsets-18,first-2,__consumer_offsets-37,first-0,__consumer_offsets-15,__consumer_offsets-24,__consumer_offsets-38,__consumer_offsets-17,__consumer_offsets-48,__consumer_offsets-19,__consumer_offsets-11,__consumer_offsets-13,__consumer_offsets-2,__consumer_offsets-43,__consumer_offsets-6,__consumer_offsets-14,__consumer_offsets-20,__consumer_offsets-0,__consumer_offsets-44,__consumer_offsets-39,__consumer_offsets-12,__consumer_offsets-45,__consumer_offsets-1,__consumer_offsets-5,__consumer_offsets-26,__consumer_offsets-29,__consumer_offsets-34,__consumer_offsets-10,__consumer_offsets-32,__consumer_offsets-40 and stopped moving logs for partitions because they are in the failed log directory /opt/modules/kafka-2.8.0/kafka-logs. (kafka.server.ReplicaManager)
[2022-03-31 16:57:50,584] WARN Stopping serving logs in dir /opt/modules/kafka-2.8.0/kafka-logs (kafka.log.LogManager)
[2022-03-31 16:57:50,586] ERROR Shutdown broker because all log dirs in /opt/modules/kafka-2.8.0/kafka-logs have failed (kafka.log.LogManager)
# 4. 修改server.properties的配置
log.dirs=/tmp/kafka/kafka-logs
# 5. 重启kafka服务
bin/kafka-server-start.sh config/server.properties &
# 6. 消费kafka first topic的数据
bin/kafka-console-consumer.sh --bootstrap-server golden-cloud:9092 --from-beginning --topic first
bbb
aaa
ddd
ccc
结果表明,直接修改日志的文件夹,然后重启kafka服务,数据不会丢失
线上操作步骤应该如下:
- 修改
server.properties
配置文件的log.dirs
,或者在CM页面修改kafka的Data Directories log.dirs
- 停止kafka服务,这个步骤可以不操作,报错也没关系,但是最好做一下
- 复制kafka数据到新的数据文件夹
- 重启kafka服务
- 测试数据是否完整,一切正常后,删除原本数据文件夹
kafka配置外网访问
具体可以参考kafka配置外网访问
编译kafka
git clone https://github.com/apache/kafka.git -b 2.8.1
cd kafka
./gradlew -PscalaVersion=2.12.13 clean releaseTarGz -x signArchives
默认情况下,Kafka的发行版压缩包会生成在core/build/distributions目录中