本文主要包括:
- spark-sql如何使用hive的udf
- spark-sql解决小文件问题
- spark-sql cli 输出日志级别为warn
- sparksql读取hive数据报错
如何使用hive的udf
- 可以使用
spark-sql --jars /opt/hive/udf.jar
,指定udf的路径 - 还可以在
spark-default.conf
里指定spark.jars /opt/hive/udf.jar
Truncated the string representation of a plan since it was too large
在spark-default.conf
里设置
spark.sql.debug.maxToStringFields 2000
spark.debug.maxToStringFields 2000
使用sparksql默认情况下会生成很多小文件,设置如下参数可以解决:
- 通过设置spark参数,
具体可以参考:如何避免Spark SQL做数据导入时产生大量小文件set spark.sql.adaptive.enabled=true; set set spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728; # 在sql中添加:distribute by cast(rand() * 5 as int)
这个方式不太靠谱,一开始设置的时候,没什么问题,但是后面不知道集群配置更改了什么,导致这个设置失效了。 - 通过REPARTITION或者COALESCE,将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
例如:
set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728;
insert overwrite table app.app_prom_realtime_marketing_use_coupon_effect_da partition(dt = '${dt}')
select /*+ COALESCE(1) */
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as etl_date,
business_id ,--商家ID
business_name --商家名称
from temp.app_prom_realtime_marketing_use_coupon_effect_da_20210311_01
注意:单纯使用/*+ COALESCE(1) */
,文件数不是1,但也不会有200个空文件了,通过设置两个set
,保证文件数可以为1
这种方式对spark的版本有要求,最好在2.4.x以上
还可以设置一下set spark.sql.hive.mergeFiles=true;
spark-sql cli 输出日志级别为warn
spark-sql --name 'gjc_spark_cli' \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:configure/log4j.properties" \ ## 用自己的log4j替代客户端的。公司客户端的是info级别,垃圾信息太多
--conf spark.ui.showConsoleProgress=true \ ## 这个参数可以看到执行进度条
--master yarn \
--num-executors 20 \
--executor-memory 6G \
--driver-cores 4 \
--driver-memory 6G
sparksql读取hive数据报错
- 问题:
Caused by: java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: Index: 0 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1016) ... 65 more Caused by: java.lang.IndexOutOfBoundsException: Index: 0 at java.util.Collections$EmptyList.get(Collections.java:4454) at org.apache.hadoop.hive.ql.io.orc.OrcProto$Type.getSubtypes(OrcProto.java:12240) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getColumnIndicesFromNames(ReaderImpl.java:651) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getRawDataSizeOfColumns(ReaderImpl.java:634) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:927) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:836) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:702) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) java.lang.RuntimeException: serious problem at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121)
- 原因:
sparksql生成的hive表有空文件,但是sparksql读取空文件的时候,因为表示orc格式的,导致sparksql解析orc文件出错。但是用hive却可以正常读取。
解决办法:
设置set spark.sql.hive.convertMetastoreOrc=true
单纯的设置以上参数还是会报错:
java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:540)
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:139)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:374)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:187)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:75)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.TraversableOnce$class.collectFirst(TraversableOnce.scala:145)
at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1336)
at org.apache.spark.sql.hive.orc.OrcFileOperator$.getFileReader(OrcFileOperator.scala:86)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:95)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:95)
at scala.collection.immutable.Stream.flatMap(Stream.scala:489)
需要再设置set spark.sql.orc.impl=native
参考SPARK-19809
两个大表关联,报OOM
主表数据量6000多万,从表1亿,关联后出现OOM的情况
Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
FetchFailed(null, shuffleId=4, mapId=-1, reduceId=121, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 4
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
分析了一下原因,在计算主表和从表的时候,都是使用spark-sql的默认并行度,
所以最终都是输出200个文件,最后关联完成还是写出200个文件
这里设置spark-sql的默认并行度为1000,这样主表和从表的数据文件都会变成1000,每次关联的数据量就会小很多set spark.sql.shuffle.partitions = 1000;
问题解决
spark-sql读取hive orc表报数组越界
最近公司上云,spark版本切换到3.0.0,很多数据上传到腾讯云上的环境后,spark-sql报数组越界,具体报错信息如下:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 11
at org.apache.orc.mapred.OrcStruct.getFieldValue(OrcStruct.java:49)
at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer.deserialize(OrcDeserializer.scala:60)
具体问题:使用spark-sql读取hive表,select *
可以读取,但是select
字段名,就会报数组越界
在网上查了很多办法,最终查到原因应该是spark-sql和hive引擎解析orc的方式不一样。
可以通过设置set spark.sql.orc.impl = hive;
来解决
参考了配置矢量化读取ORC数据
spark查询ORC表失败,在hive可以正常查询
Cannot overwrite a path that is also being read from.
1.设置 spark.sql.hive.convertMetastoreParquet=false或者spark.sql.hive.convertMetastoreOrc=false