0%

Flume使用笔记

本文主要包括:

  • Flume简介
  • Flume采集kafka数据到hive

Flume简介

略过,网上一大堆

Flume采集kafka数据到hive

使用hive-sink

Flume采集kafka数据进入hive,在网上看到,Flume有原生的hive-sink,跟着网上的操作了一下,最终报错
操作步骤:

  1. 把hive相关的jar放到flume下
    cd /opt/cloudera/parcels/CDH/lib/flume-ng/lib/
    ln -s ../../../jars/hive-hcatalog-core-2.1.1-cdh6.2.0.jar hive-hcatalog-core-2.1.1-cdh6.2.0.jar
    ln -s ../../../jars/hive-hcatalog-pig-adapter-2.1.1-cdh6.2.0.jar hive-hcatalog-pig-adapter-2.1.1-cdh6.2.0.jar
    ln -s ../../../jars/hive-hcatalog-server-extensions-2.1.1-cdh6.2.0.jar hive-hcatalog-server-extensions-2.1.1-cdh6.2.0.jar
    ln -s ../../../jars/hive-hcatalog-streaming-2.1.1-cdh6.2.0.jar hive-hcatalog-streaming-2.1.1-cdh6.2.0.jar
  2. 编写配置
    a.sources=source_from_kafka
    a.channels=mem_channel
    a.sinks=hive_sink
    #kafka为souce的配置
    a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
    a.sources.source_from_kafka.batchSize=10
    a.sources.source_from_kafka.kafka.bootstrap.servers=172.16.2.205:9092
    a.sources.source_from_kafka.topic=flume-mqtt
    a.sources.source_from_kafka.channels=mem_channel
    a.sources.source_from_kafka.consumer.timeout.ms=1000
    #hive为sink的配置
    a.sinks.hive_sink.type=hive
    a.sinks.hive_sink.hive.metastore=thrift://172.16.2.204:9083
    a.sinks.hive_sink.hive.database=test
    a.sinks.hive_sink.hive.table=flume-mqtt
    a.sinks.hive_sink.hive.txnsPerBatchAsk=2
    a.sinks.hive_sink.batchSize=10
    a.sinks.hive_sink.serializer=JSON
    a.sinks.hive_sink.serializer.fieldnames=id,name,age
    #channel的配置
    a.channels.mem_channel.type=memory
    a.channels.mem_channel.capacity=1000
    a.channels.mem_channel.transactionCapacity=100
    #三者之间的关系
    a.sources.source_from_kafka.channels=mem_channel
    a.sinks.hive_sink.channel=mem_channel
  3. 启动flume任务
    flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hive.conf -Dflume.root.logger=INFO,console
    报错如下:
    Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://172.16.2.204:9083', database='test', table='flume-mqtt', partitionVals=[] }
    	at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:383)
    	at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
    	... 6 more
    Caused by: org.apache.hive.hcatalog.streaming.StreamingException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/MetaException
    	at org.apache.flume.sink.hive.HiveWriter.timedCall(HiveWriter.java:456)
    	at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:376)
    	... 7 more
    没找到解决办法:
    flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hive.conf --classpath "/opt/cloudera/parcels/CDH/jars/*hive*" -Dflume.root.logger=INFO,console 
    注意:
    官网说,flume中sink端需要采用orc格式传输数据,并且要分桶,但Impala不支持orc格式的Hive表,最终这里放弃

使用hdfs-sink

这个比较简单,一般公司都这么用

a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=k1
#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=172.16.2.205:9092
a.sources.source_from_kafka.topic=flume-mqtt
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
#具体定义sink
a.sinks.k1.type = hdfs
#%y-%m-%d/%H%M/%S
#这里对应就是hive 表的目录 此处如果是外部表,则直接对应你的localtion地址,如果普通则对应到你的hive表目录即可
a.sinks.k1.hdfs.path = hdfs://nameservice1/user/hive/warehouse/test.db/flume_mqtt/%Y-%m-%d_%H
a.sinks.k1.hdfs.filePrefix = flume-%Y-%m-%d_%H
a.sinks.k1.hdfs.fileSuffix = .log
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a.sinks.k1.hdfs.rollSize = 2914560
#HDFS上的文件达到60秒生成一个文件
a.sinks.k1.hdfs.rollInterval = 60
a.sinks.k1.hdfs.useLocalTimeStamp = true


#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1000
a.channels.mem_channel.transactionCapacity=100
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.k1.channel=mem_channel

启动flume任务

flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hdfs.conf -Dflume.root.logger=INFO,console

在hive里添加添加分区或者指定文件路径

ALTER TABLE test.flume_mqtt ADD IF NOT EXISTS PARTITION (dt='20221108',hour='14')
location 'hdfs://nameservice1/user/hive/warehouse/test.db/flume_mqtt/20221108/14';

注意:
但是这样数据延迟会比较大,因为不可能一秒生成一个文件,生产上最起码也得1分钟生成一个文件。在flume写未完成的文件的时候,hive是读取不到这个文件内数据的

flume采集kafka数据到kafka

# Flume Agent Name
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = kafka-sink

# Source Configuration
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.kafka.bootstrap.servers = 47.100.1.105:9094,47.100.1.105:9095,47.100.1.105:9096
agent.sources.kafka-source.batchSize = 10
agent.sources.kafka-source.kafka.consumer.group.id = flume-consumer-group-device_ops_1
agent.sources.kafka-source.kafka.topics = device_ops
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = latest

# Channel Configuration
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 100000

# Sink Configuration
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = ddp3:9092,ddp4:9092,ddp5:9092
agent.sinks.kafka-sink.kafka.topic = device_ops
# 用于控制 Kafka Sink 将多少个事件打包成一个批处理(batch)后一次性发送给 Kafka Broker。这个参数对于提高 Kafka Sink 的性能和吞吐量非常重要。
# Flume Sink 并不会等待累积到 100 条数据才发送,而是在每个 Channel 事务提交时,根据当前 Channel 中的可用数据,组合成相应大小的批处理并发送给 Kafka Broker
agent.sinks.kafka-sink.kafka.flumeBatchSize= 1000
agent.sinks.kafka-sink.kafka.producer.acks= 1
# 设置为正整数值(例如 100),Producer 会等待指定的毫秒数,让更多的消息加入到批次中,从而减少发送消息的频率,以提高吞吐量
agent.sinks.kafka-sink.kafka.producer.linger.ms= 10
agent.sinks.kafka-sink.kafka.producer.max.request.size = 10485760

# 以在 Flume 的事件中指定不同的 Kafka Topic,而不必将所有事件发送到同一个 Topic。
agent.sinks.kafka-sink.allowTopicOverride = false
# Bind Source and Sink to Channel
agent.sources.kafka-source.channels = memory-channel
agent.sinks.kafka-sink.channel = memory-channel

注意:这里一开始没有设置agent.sinks.kafka-sink.allowTopicOverride = false,导致数据写入不进去kafka,会报如下异常

23/07/19 17:35:33 ERROR kafka.KafkaSink: Failed to publish events
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)

报错:

23/07/27 14:32:05 ERROR kafka.KafkaSink: Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1120815 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1239)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:918)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:227)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:748)

报错原因:
RecordTooLargeException 是 Kafka 的一个异常,它会在消息大小超过 Kafka 生产者配置的 max.request.size 参数限制时抛出。默认情况下,Kafka 生产者的 max.request.size 参数的默认值为 1048576 字节(1 MB),即单个消息的最大大小为 1 MB。

解决方法:

  1. 增加 max.request.size 的配置值
    agent.sinks.kafka-sink.kafka.producer.max.request.size = 10485760

可以参考:Apache flume with kafka source, kafka sink and memory channel