本文主要包括:
- Flume简介
- Flume采集kafka数据到hive
Flume简介
略过,网上一大堆
Flume采集kafka数据到hive
使用hive-sink
Flume采集kafka数据进入hive,在网上看到,Flume有原生的hive-sink,跟着网上的操作了一下,最终报错
操作步骤:
- 把hive相关的jar放到flume下
cd /opt/cloudera/parcels/CDH/lib/flume-ng/lib/ ln -s ../../../jars/hive-hcatalog-core-2.1.1-cdh6.2.0.jar hive-hcatalog-core-2.1.1-cdh6.2.0.jar ln -s ../../../jars/hive-hcatalog-pig-adapter-2.1.1-cdh6.2.0.jar hive-hcatalog-pig-adapter-2.1.1-cdh6.2.0.jar ln -s ../../../jars/hive-hcatalog-server-extensions-2.1.1-cdh6.2.0.jar hive-hcatalog-server-extensions-2.1.1-cdh6.2.0.jar ln -s ../../../jars/hive-hcatalog-streaming-2.1.1-cdh6.2.0.jar hive-hcatalog-streaming-2.1.1-cdh6.2.0.jar
- 编写配置
a.sources=source_from_kafka a.channels=mem_channel a.sinks=hive_sink #kafka为souce的配置 a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource a.sources.source_from_kafka.batchSize=10 a.sources.source_from_kafka.kafka.bootstrap.servers=172.16.2.205:9092 a.sources.source_from_kafka.topic=flume-mqtt a.sources.source_from_kafka.channels=mem_channel a.sources.source_from_kafka.consumer.timeout.ms=1000 #hive为sink的配置 a.sinks.hive_sink.type=hive a.sinks.hive_sink.hive.metastore=thrift://172.16.2.204:9083 a.sinks.hive_sink.hive.database=test a.sinks.hive_sink.hive.table=flume-mqtt a.sinks.hive_sink.hive.txnsPerBatchAsk=2 a.sinks.hive_sink.batchSize=10 a.sinks.hive_sink.serializer=JSON a.sinks.hive_sink.serializer.fieldnames=id,name,age #channel的配置 a.channels.mem_channel.type=memory a.channels.mem_channel.capacity=1000 a.channels.mem_channel.transactionCapacity=100 #三者之间的关系 a.sources.source_from_kafka.channels=mem_channel a.sinks.hive_sink.channel=mem_channel
- 启动flume任务
报错如下:flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hive.conf -Dflume.root.logger=INFO,console
没找到解决办法:Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://172.16.2.204:9083', database='test', table='flume-mqtt', partitionVals=[] } at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:383) at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86) ... 6 more Caused by: org.apache.hive.hcatalog.streaming.StreamingException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/MetaException at org.apache.flume.sink.hive.HiveWriter.timedCall(HiveWriter.java:456) at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:376) ... 7 more
注意:flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hive.conf --classpath "/opt/cloudera/parcels/CDH/jars/*hive*" -Dflume.root.logger=INFO,console
官网说,flume中sink端需要采用orc格式传输数据,并且要分桶,但Impala不支持orc格式的Hive表,最终这里放弃
使用hdfs-sink
这个比较简单,一般公司都这么用
a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=k1
#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=172.16.2.205:9092
a.sources.source_from_kafka.topic=flume-mqtt
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
#具体定义sink
a.sinks.k1.type = hdfs
#%y-%m-%d/%H%M/%S
#这里对应就是hive 表的目录 此处如果是外部表,则直接对应你的localtion地址,如果普通则对应到你的hive表目录即可
a.sinks.k1.hdfs.path = hdfs://nameservice1/user/hive/warehouse/test.db/flume_mqtt/%Y-%m-%d_%H
a.sinks.k1.hdfs.filePrefix = flume-%Y-%m-%d_%H
a.sinks.k1.hdfs.fileSuffix = .log
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件达到128M时生成一个文件
a.sinks.k1.hdfs.rollSize = 2914560
#HDFS上的文件达到60秒生成一个文件
a.sinks.k1.hdfs.rollInterval = 60
a.sinks.k1.hdfs.useLocalTimeStamp = true
#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1000
a.channels.mem_channel.transactionCapacity=100
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.k1.channel=mem_channel
启动flume任务
flume-ng agent --name a --conf /etc/flume-ng/conf --conf-file hdfs.conf -Dflume.root.logger=INFO,console
在hive里添加添加分区或者指定文件路径
ALTER TABLE test.flume_mqtt ADD IF NOT EXISTS PARTITION (dt='20221108',hour='14')
location 'hdfs://nameservice1/user/hive/warehouse/test.db/flume_mqtt/20221108/14';
注意:
但是这样数据延迟会比较大,因为不可能一秒生成一个文件,生产上最起码也得1分钟生成一个文件。在flume写未完成的文件的时候,hive是读取不到这个文件内数据的
flume采集kafka数据到kafka
# Flume Agent Name
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = kafka-sink
# Source Configuration
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.kafka.bootstrap.servers = 47.100.1.105:9094,47.100.1.105:9095,47.100.1.105:9096
agent.sources.kafka-source.batchSize = 10
agent.sources.kafka-source.kafka.consumer.group.id = flume-consumer-group-device_ops_1
agent.sources.kafka-source.kafka.topics = device_ops
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = latest
# Channel Configuration
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 100000
# Sink Configuration
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = ddp3:9092,ddp4:9092,ddp5:9092
agent.sinks.kafka-sink.kafka.topic = device_ops
# 用于控制 Kafka Sink 将多少个事件打包成一个批处理(batch)后一次性发送给 Kafka Broker。这个参数对于提高 Kafka Sink 的性能和吞吐量非常重要。
# Flume Sink 并不会等待累积到 100 条数据才发送,而是在每个 Channel 事务提交时,根据当前 Channel 中的可用数据,组合成相应大小的批处理并发送给 Kafka Broker
agent.sinks.kafka-sink.kafka.flumeBatchSize= 1000
agent.sinks.kafka-sink.kafka.producer.acks= 1
# 设置为正整数值(例如 100),Producer 会等待指定的毫秒数,让更多的消息加入到批次中,从而减少发送消息的频率,以提高吞吐量
agent.sinks.kafka-sink.kafka.producer.linger.ms= 10
agent.sinks.kafka-sink.kafka.producer.max.request.size = 10485760
# 以在 Flume 的事件中指定不同的 Kafka Topic,而不必将所有事件发送到同一个 Topic。
agent.sinks.kafka-sink.allowTopicOverride = false
# Bind Source and Sink to Channel
agent.sources.kafka-source.channels = memory-channel
agent.sinks.kafka-sink.channel = memory-channel
注意:这里一开始没有设置agent.sinks.kafka-sink.allowTopicOverride = false
,导致数据写入不进去kafka,会报如下异常
23/07/19 17:35:33 ERROR kafka.KafkaSink: Failed to publish events
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
报错:
23/07/27 14:32:05 ERROR kafka.KafkaSink: Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1120815 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1239)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:918)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:227)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
报错原因:
RecordTooLargeException 是 Kafka 的一个异常,它会在消息大小超过 Kafka 生产者配置的 max.request.size 参数限制时抛出。默认情况下,Kafka 生产者的 max.request.size 参数的默认值为 1048576 字节(1 MB),即单个消息的最大大小为 1 MB。
解决方法:
- 增加 max.request.size 的配置值
agent.sinks.kafka-sink.kafka.producer.max.request.size = 10485760
可以参考:Apache flume with kafka source, kafka sink and memory channel