0%

spark-sql使用笔记

本文主要包括:

  • spark-sql如何使用hive的udf
  • spark-sql解决小文件问题
  • spark-sql cli 输出日志级别为warn
  • sparksql读取hive数据报错

    如何使用hive的udf

  • 可以使用spark-sql --jars /opt/hive/udf.jar,指定udf的路径
  • 还可以在spark-default.conf里指定spark.jars /opt/hive/udf.jar

Truncated the string representation of a plan since it was too large

spark-default.conf里设置

spark.sql.debug.maxToStringFields   2000
spark.debug.maxToStringFields   2000

使用sparksql默认情况下会生成很多小文件,设置如下参数可以解决:

  • 通过设置spark参数,
    set spark.sql.adaptive.enabled=true;
    set set spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728;
    # 在sql中添加:distribute by cast(rand() * 5 as int)
    具体可以参考:如何避免Spark SQL做数据导入时产生大量小文件
    这个方式不太靠谱,一开始设置的时候,没什么问题,但是后面不知道集群配置更改了什么,导致这个设置失效了。
  • 通过REPARTITION或者COALESCE,将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

例如:

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728;
insert overwrite table app.app_prom_realtime_marketing_use_coupon_effect_da partition(dt = '${dt}')
select /*+ COALESCE(1) */
       from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as etl_date,
       business_id                                                                                        ,--商家ID
       business_name                                                                                       --商家名称
from temp.app_prom_realtime_marketing_use_coupon_effect_da_20210311_01

注意:单纯使用/*+ COALESCE(1) */,文件数不是1,但也不会有200个空文件了,通过设置两个set,保证文件数可以为1
这种方式对spark的版本有要求,最好在2.4.x以上
还可以设置一下set spark.sql.hive.mergeFiles=true;

spark-sql cli 输出日志级别为warn

spark-sql --name 'gjc_spark_cli' \
          --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:configure/log4j.properties" \  ## 用自己的log4j替代客户端的。公司客户端的是info级别,垃圾信息太多
          --conf spark.ui.showConsoleProgress=true \      ## 这个参数可以看到执行进度条
          --master yarn \
          --num-executors 20 \
          --executor-memory 6G \
          --driver-cores 4 \
          --driver-memory 6G

sparksql读取hive数据报错

  • 问题:
    Caused by: java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: Index: 0
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1016)
    	... 65 more
    Caused by: java.lang.IndexOutOfBoundsException: Index: 0
    	at java.util.Collections$EmptyList.get(Collections.java:4454)
    	at org.apache.hadoop.hive.ql.io.orc.OrcProto$Type.getSubtypes(OrcProto.java:12240)
    	at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getColumnIndicesFromNames(ReaderImpl.java:651)
    	at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getRawDataSizeOfColumns(ReaderImpl.java:634)
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:927)
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:836)
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:702)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:748)
    java.lang.RuntimeException: serious problem
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
    	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
    	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
    	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    	at scala.Option.getOrElse(Option.scala:121)
    	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
    	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    	at scala.Option.getOrElse(Option.scala:121)
  • 原因:
    sparksql生成的hive表有空文件,但是sparksql读取空文件的时候,因为表示orc格式的,导致sparksql解析orc文件出错。但是用hive却可以正常读取。

解决办法:

设置set spark.sql.hive.convertMetastoreOrc=true
单纯的设置以上参数还是会报错:

java.lang.IndexOutOfBoundsException
	at java.nio.Buffer.checkIndex(Buffer.java:540)
	at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:139)
	at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:374)
	at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
	at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:187)
	at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:75)
	at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:73)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.TraversableOnce$class.collectFirst(TraversableOnce.scala:145)
	at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1336)
	at org.apache.spark.sql.hive.orc.OrcFileOperator$.getFileReader(OrcFileOperator.scala:86)
	at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:95)
	at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:95)
	at scala.collection.immutable.Stream.flatMap(Stream.scala:489)

需要再设置set spark.sql.orc.impl=native
参考SPARK-19809

两个大表关联,报OOM

主表数据量6000多万,从表1亿,关联后出现OOM的情况

script
Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. FetchFailed(null, shuffleId=4, mapId=-1, reduceId=121, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 4 at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867) at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

分析了一下原因,在计算主表和从表的时候,都是使用spark-sql的默认并行度,
所以最终都是输出200个文件,最后关联完成还是写出200个文件

这里设置spark-sql的默认并行度为1000,这样主表和从表的数据文件都会变成1000,每次关联的数据量就会小很多
set spark.sql.shuffle.partitions = 1000;
问题解决

spark-sql读取hive orc表报数组越界

最近公司上云,spark版本切换到3.0.0,很多数据上传到腾讯云上的环境后,spark-sql报数组越界,具体报错信息如下:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 11
	at org.apache.orc.mapred.OrcStruct.getFieldValue(OrcStruct.java:49)
	at org.apache.spark.sql.execution.datasources.orc.OrcDeserializer.deserialize(OrcDeserializer.scala:60)

具体问题:使用spark-sql读取hive表,select *可以读取,但是select字段名,就会报数组越界
在网上查了很多办法,最终查到原因应该是spark-sql和hive引擎解析orc的方式不一样。

可以通过设置set spark.sql.orc.impl = hive;来解决
参考了配置矢量化读取ORC数据
spark查询ORC表失败,在hive可以正常查询

Cannot overwrite a path that is also being read from.

1.设置 spark.sql.hive.convertMetastoreParquet=false或者spark.sql.hive.convertMetastoreOrc=false