0%

Kafka学习笔记

本文主要包括:

  • Kafka简介(后补)
  • Kafka可以干什么?(后补)
  • Kafka安装与启动
  • Kafka常用操作

    Kafka简介

Kafka可以干什么?

Kafka安装与启动

Kafka安装

Kafka需要用到java,安装前需要安装java,这里省略
Kafka可以使用内置的zookeeper,也可以使用单独的zookeeper,一般生产环境都是使用单独的zookeeper集群
zookeeper安装可以参考Zookeeper学习笔记(一)
官网下载Kafka安装包,并解压

tar zxvf kafka-2.11.0.tar.gz -C .
cd kafka-2.11.0/config

修改server.properties

script
host.name=golden-02 # 指定kafka日志文件的存储目录 log.dirs=/opt/modules/kafka-2.11.0/kafka-logs # 指定zookeeper的连接地址,多个地址用逗号分隔 zookeeper.connect=golden-02:2181

这里设置了kafka-logs,手动生成了这个文件夹

最后,设置Kakfa的环境变量

Kafka启动等操作

# 启动
cd /opt/modules/kafka-2.11.0/ && bin/kafka-server-start.sh config/server.properties &
# 关闭
cd /opt/modules/kafka-2.11.0/ && bin/kafka-server-stop.sh 

Kafka操作样例

## 查看所有topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --list
## 创建topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --create --replication-factor 3 --partitions 1 --topic first
## 删除topic
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --delete --topic first
## 发送消息
bin/kafka-console-producer.sh --broker-list golden-02:9092 --topic first
# 或者下面命令
bin/kafka-console-producer.sh --bootstrap-server golden-02:9092 --topic first
## 消费消息
bin/kafka-console-consumer.sh  --bootstrap-server golden-02:9092  --from-beginning --topic first
## 查看某个Topic的详情
bin/kafka-topics.sh --bootstrap-server golden-02:9092  --describe --topic first
## 查看某个group_id的消费情况,offset
bin/kafka-consumer-groups.sh --bootstrap-server golden-02:9092 --describe --group fink-test

## 重置offset
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset 210000 --execute
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset --to-latest --execute
kafka-consumer-groups --group dwd4rule_group --bootstrap-server ddp3.hadoop:9092 --reset-offsets --all-topics --to-offset --to-datetime 2022-10-25T12:30:00.000 --execute

# cdh版本命令
kafka-topics --zookeeper 172.16.2.205:2181 --list
kafka-topics --zookeeper 172.16.2.205:2181 --delete --topic users_source_mysql
kafka-topics --zookeeper 172.16.2.205:2181 --create --replication-factor 1 --partitions 1 --topic first
kafka-console-consumer --bootstrap-server 172.16.2.204:9092  --from-beginning --topic users_source_mysql
kafka-console-producer --broker-list 172.16.2.205:9092 --topic first

选项说明:

  • –topic 定义topic名
  • –replication-factor 定义副本数
  • –partitions 定义分区数

删除topic时删除不掉,日志提醒:
This will have no impact if delete.topic.enable is not set to true
kafka-delete-error
到kafka的server.properties里设置delete.topic.enable=true

Kafka常见问题

  • auto.offset.reset设置无效

    原因:auto.offset.reset只会在Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量的时候才会生效。
    换句话说,如果当前group_id已经消费过这个topic(可以查到offset),这个参数就没用了,要再想从头开始消费,就得换个group_id了
    官网说明:
    Kafka官网解释auto.offset.reset

Kafka数据迁移

公司的kafka数据存储所在磁盘空间太小,后期扩容后,也没有把扩容后的磁盘加入到kafka。现在想把之前的磁盘换到新扩容的磁盘里

[root@ddp3 ~]# df -h
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         16G     0   16G   0% /dev
tmpfs            16G     0   16G   0% /dev/shm
tmpfs            16G  856K   16G   1% /run
tmpfs            16G     0   16G   0% /sys/fs/cgroup
/dev/vda1        99G   86G  8.2G  92% /
/dev/vdb1       296G   90G  193G  32% /data
cm_processes     16G   29M   16G   1% /run/cloudera-scm-agent/process
tmpfs           3.1G     0  3.1G   0% /run/user/0
overlay          99G   86G  8.2G  92% /var/lib/docker/overlay2/eab5e26d05206a9ae6591a28f66bebd4dc79529d2fcc462a135c855052fd49bd/merged
shm              64M     0   64M   0% /var/lib/docker/containers/2733068e2bd0a33d90df3ce3a011a651e8d6a9ab81b4782fe2f9896701e48f1e/shm

kafka数据存储在/dev/vda1磁盘下,现在想换到/dev/vdb1
需要解决两个问题:

  1. kafka数据盘挂在新的文件夹下
  2. kakfa的历史数据需要移动到新的文件夹下

但是不知道直接mv历史数据到新的磁盘下,数据会不会丢失,这里在自己电脑上做一下测试
本机的server.properties里配置的log.dirs=/opt/modules/kafka-2.8.0/kafka-logs
目标是把数据迁移到/tmp/kafka/kafka-logs
具体操作步骤:

# 1. 创建kafkatopic
bin/kafka-topics.sh --bootstrap-server golden-cloud:9092 --create --replication-factor 1 --partitions 3 --topic first
# 2. 向topic里发送4条数据,aaa/bbb/ccc/ddd
bin/kafka-console-producer.sh --broker-list golden-cloud:9092 --topic first
# 3.kafka服务不停止,直接mv文件夹
cd /tmp/kafka/ && mv /opt/modules/kafka-2.8.0/kafka-logs/ .
# 这时候kafka因为数据文件夹切换了,kafka服务报错停止
[2022-03-31 16:57:50,559] ERROR Error while reading checkpoint file /opt/modules/kafka-2.8.0/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
java.nio.file.NoSuchFileException: /opt/modules/kafka-2.8.0/kafka-logs/cleaner-offset-checkpoint
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
	at java.nio.file.Files.newByteChannel(Files.java:361)
	at java.nio.file.Files.newByteChannel(Files.java:407)
	at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
	at java.nio.file.Files.newInputStream(Files.java:152)
	at java.nio.file.Files.newBufferedReader(Files.java:2784)
...
[2022-03-31 16:57:50,584] WARN [ReplicaManager broker=0] Broker 0 stopped fetcher for partitions __consumer_offsets-22,__consumer_offsets-30,__consumer_offsets-8,__consumer_offsets-21,__consumer_offsets-4,__consumer_offsets-27,__consumer_offsets-7,__consumer_offsets-9,first-1,__consumer_offsets-46,__consumer_offsets-25,__consumer_offsets-35,__consumer_offsets-41,__consumer_offsets-33,__consumer_offsets-23,__consumer_offsets-49,__consumer_offsets-47,__consumer_offsets-16,__consumer_offsets-28,__consumer_offsets-31,__consumer_offsets-36,__consumer_offsets-42,__consumer_offsets-3,__consumer_offsets-18,first-2,__consumer_offsets-37,first-0,__consumer_offsets-15,__consumer_offsets-24,__consumer_offsets-38,__consumer_offsets-17,__consumer_offsets-48,__consumer_offsets-19,__consumer_offsets-11,__consumer_offsets-13,__consumer_offsets-2,__consumer_offsets-43,__consumer_offsets-6,__consumer_offsets-14,__consumer_offsets-20,__consumer_offsets-0,__consumer_offsets-44,__consumer_offsets-39,__consumer_offsets-12,__consumer_offsets-45,__consumer_offsets-1,__consumer_offsets-5,__consumer_offsets-26,__consumer_offsets-29,__consumer_offsets-34,__consumer_offsets-10,__consumer_offsets-32,__consumer_offsets-40 and stopped moving logs for partitions  because they are in the failed log directory /opt/modules/kafka-2.8.0/kafka-logs. (kafka.server.ReplicaManager)
[2022-03-31 16:57:50,584] WARN Stopping serving logs in dir /opt/modules/kafka-2.8.0/kafka-logs (kafka.log.LogManager)
[2022-03-31 16:57:50,586] ERROR Shutdown broker because all log dirs in /opt/modules/kafka-2.8.0/kafka-logs have failed (kafka.log.LogManager)

# 4. 修改server.properties的配置
log.dirs=/tmp/kafka/kafka-logs
# 5. 重启kafka服务
bin/kafka-server-start.sh config/server.properties &
# 6. 消费kafka first topic的数据
bin/kafka-console-consumer.sh  --bootstrap-server golden-cloud:9092  --from-beginning --topic first
bbb
aaa
ddd
ccc

结果表明,直接修改日志的文件夹,然后重启kafka服务,数据不会丢失

线上操作步骤应该如下:

  1. 修改server.properties配置文件的log.dirs,或者在CM页面修改kafka的Data Directories log.dirs
  2. 停止kafka服务,这个步骤可以不操作,报错也没关系,但是最好做一下
  3. 复制kafka数据到新的数据文件夹
  4. 重启kafka服务
  5. 测试数据是否完整,一切正常后,删除原本数据文件夹

kafka配置外网访问

具体可以参考kafka配置外网访问

编译kafka

git clone https://github.com/apache/kafka.git -b 2.8.1
cd kafka
./gradlew -PscalaVersion=2.12.13 clean releaseTarGz -x signArchives

默认情况下,Kafka的发行版压缩包会生成在core/build/distributions目录中