本文主要包括:
- Spark面试题总结
1. 对于 Spark 中的数据倾斜问题你有什么好的方案?
数据倾斜是因为数据中的key分布的不均匀导致的。
- 可以把小表广播出去或者使用
map join
- 采样倾斜key并分拆join操作,对倾斜key做添加随机数处理。
- 如果其中一个表存在多个key倾斜,把主表的所有key都添加1-n随机数,把从表每条数据都复制n次,并添加1-n的随机数,然后再join
- 自定义Partitioner:原始的spark使用的是hashPartition,可以自定义一个paritioner,把key拆分到多个task里去
- 提高shuffle操作的并行度,使用reduceByKey(1000),sparkSQL里设置
spark.sql.shuffle.partitions=1000
2. 描述一下RDD,DataFrame,DataSet,DataStream的区别和联系?
RDD:弹性分布式数据集,RDD是SparkCore的基本数据结构
DataFrame: 从源码上看,DataFrame其实就是指定row的DataSet,type DataFrame = Dataset[Row]
DataSet:Dataset是一个由特定领域的对象组成强类型集合,可以使用函数(DSL)或关系运算(SQL)进行并行的转换操作。 每个Dataset 还有一个称为“DataFrame”的无类型(untypedrel)视图,它是[[Row]]的数据集。
Dataset和DataFrame的区别与联系:
1、Dataset是强类型,会在编译的时候进行类型检测;而DataFrame是弱类型的,在执行的时候进行类型检测;
2、序列化方式不通,Dataset是通过Encoder进行序列化,支持动态的生成代码,直接在bytes的层面进行排序,过滤等的操作;而DataFrame是采用可选的java的标准序列化或是kyro进行序列化
3、DataFrame和Dataset实质上都是一个逻辑计划,并且是懒加载的,都包含着scahema信息,只有到数据要读取的时候,才会将逻辑计划进行分析和优化,并最终转化为RDD
4、二者由于api是统一的,所以都可以采用DSL和SQL方式进行开发,都可以通过sparksession对象进行创建或者是通过transform转化操作得到
Dataset和RDD的区别与联系:
首先,Dataset的底层不是RDD,但是它可以和RDD互相转化,DS的一些算子例如filter等都是自己实现的,并没有调用RDD的算子
DStream:离散化流(DStream),SparkStreaming中的基本抽象,DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据,一个DStream 对应了时间维度上的多个RDD。
3. 简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?
窄依赖:每一个父RDD的partition最多被子RDD的一个partition使用
宽依赖:多个子RDD的partition会依赖同一个父RDD的partition
spark根据RDD的依赖关系划分stage,遇到一个宽依赖,就划分一个stage
stage中task数目由stage末端的RDD分区个数来决定
4. 列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?
transformation算子:map、flatMap、reduceByKey、groupByKey、filter等
action算子:take、ccollect、count、reduce等
reduceByKey、groupByKey等会产生Shuffle
注意:
reduceByKey和groupByKey只是会产生shuffle,但是不是action算子
5. 简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系
cache缓存到内存中
persist可以缓存到磁盘和内存里
checkpoint只能缓存到hdfs上(磁盘上)
Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链
6. Spark开发调优总结
- 避免创建重复的RDD
- 尽可能复用同一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle类算子
- 使用map-side预聚合的shuffle操作
- 使用高性能的算子
6.1 使用reduceByKey/aggregateByKey替代groupByKey
6.2 使用mapPartitions替代普通map
6.3 使用foreachPartitions替代foreach
6.4 使用filter之后进行coalesce操作
6.4 使用repartitionAndSortWithinPartitions替代repartition与sort类操作 - 广播大变量,减少大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收)
- 使用Kryo优化序列化性能
- 优化数据结构
7. Spark为什么快,Spark SQL 一定比 Hive 快吗
1、Spark 计算比 MapReduce 快的根本原因在于 DAG 计算模型,消除了冗余的 HDFS 读写,Spark不需要将计算的中间结果写入磁盘
如果计算不涉及与其他节点进行数据交换,Spark 可以在内存中一次性完成这些操作,也就是中间结果无须落盘,减少了磁盘 IO 的操作
2、Spark 是基于内存的计算并不是spark快的根本原因
3、MapReduce每启动一个Task便会启动一次JVM,基于进程的操作。而Spark每次MapReduce操作是基于线程的,只在启动Executor时启动一次JVM,内存的Task操作是在线程复用的
总结:Spark比Mapreduce运行更快,主要得益于其对mapreduce操作的优化以及对JVM使用的优化
Spark SQL 不一定比Hive快,例如没有shuffle的或者只有一次shuffle的,有可能hive的速度不会低于spark-sql
8. Spark Streaming小文件问题
1、增加 batch 大小
2、在输出到hdfs的时候,Coalesce一下
3、单独起一个定时任务来合并小文件
9. Checkpoint和持久化有什么区别?
1、持久化只是将数据保存在BlockManager中,而RDD的lineage是不变的,但是checkpoint执行完后,RDD已经没有之前所谓的依赖RDD了,而只有一个强行为其设置的checkpointRDD,RDD的lineage(血缘关系,依赖关系)改变了
2、持久化的数据丢失可能性更大,磁盘、内存都可能会存在数据丢失的情况。但是checkpoint的数据通常是存储在如HDFS等容错、高可用的文件系统,数据丢失可能性较小。
注:默认情况下,如果某个RDD没有持久化,但是设置了checkpoint,会存在问题,本来这个job都执行结束了,但是由于中间RDD没有持久化,checkpoint job想要将RDD的数据写入外部文件系统的话,需要全部重新计算一次,
再将计算出来的RDD数据checkpoint到外部文件系统。所以,建议对checkpoint()的RDD使用persist(StorageLevel.DISK_ONLY),该RDD计算之后,就直接持久化到磁盘上。后面进行checkpoint操作时就可以直接从磁盘上读取RDD的数据,并checkpoint到外部文件系统。
10. SparkStreaming读取Kafka数据的两种方式,有什么区别?
SparkStreaming读取Kafka数据有两种方式,分别为基于Receiver方式和基于Direct(No Receiver)方式
1、基于Receiver方式:
1.1 需要使用单独的Receiver线程来异步获取Kafka数据。
1.2 Receiver底层实现中使用了Kafka高级消费者API,因此,不需要自己管理Offset,只需指定Zookeeper和消费者组GroupID,系统便会自行管理。
1.3 执行过程: Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,获取的数据先存储在Receiver中(存储方式由StorageLevel决定),后续,当Batch Job触发后,这些数据会被转移到剩下的Executor中被处理。处理完毕后,Receiver会自动更新Zookeeper中的Offset。
1.4 默认情况下,程序失败或Executor宕掉后可能会丢失数据,为避免数据丢失,可启用预写日志(Write Ahead Log,WAL)。将Receiver收到的数据再备份一份到更可靠的系统如HDFS分布式文件中,以冗余的数据来换取数据不丢失。
1.5 生产下,为保证数据完全不丢失,一般需要启用WAL。启用WAL,在数据量较大,网络不好情况下,会严重降低性能
2、基于Direct(No Receiver)方式
2.1 不需要使用单独的Receiver线程从Kafka获取数据。
2.2 使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据。
2.3 执行过程:Spark Streaming Batch Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。
2.4 为保证整个应用EOS, Offset管理一般需要借助外部存储实现。如Mysql、HBase等。
2.5 由于不需要WAL,且Spark Streaming会创建和Kafka Topic Partition一样多的RDD Partition,且一一对应,这样,就可以并行读取,大大提高了性能。
2.6 Spark Streaming应用启动后,自己通过内部currentOffsets变量跟踪Offset,避免了基于Receiver的方式中Spark Streaming和Zookeeper中的Offset不一致问题。
11. 如何使用Spark实现TopN的获取(描述思路或使用伪代码)?
最简单的就是,直接reduceByKey计算key的个数,然后按照value排序
但是,这样会有一个问题,就是当数据量很大的时候,有可能会导致OOM,或者数据倾斜
可以使用map算子,把每个key添加一个随机数,然后再reduceByKey,最后再把key拆出来,再做一次reduceByKey
12. Spark在什么情况下会OOM
- map过程产生大量对象导致内存溢出
- 数据不平衡导致内存溢出
- coalesce调用导致内存溢出
- shuffle后内存溢出
- 广播了大变量
指定spark的垃圾回收算法G1GC:
-conf spark.driver.extraJavaOptions="-XX:+UseG1GC -Dlog4j.configuration=log4j.properties" \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -Dlog4j.configuration=log4j.properties" \
13. RDD的弹性表现在哪几点?
- 自动的进行内存和磁盘的存储切换
- 基于Linage的高效容错
- task如果失败会自动进行特定次数的重试
- stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片
- checkpoint和persist,数据计算之后持久化缓存
- 数据调度弹性,DAG TASK调度和资源无关
- 数据分片的高度弹性
14. RDD通过Linage(记录数据更新)的方式为何很高效?
14. Spark与MapReduce以及Tez的区别?
- spark与tez都是以dag方式处理数据
- MR相比tez和spark,每次计算结果都会落盘,增加了磁盘IO
tez和spark的区别:
- spark更像是一个通用的计算引擎,提供内存计算,实时流处理,机器学习等多种计算方式,适合迭代计算
- tez作为一个框架工具,特定为hive和pig提供批量计算
- spark属于内存计算,支持多种运行模式,可以跑在standalone,yarn上;而tez只能跑在yarn上;虽然spark与yarn兼容,但是spark不适合和其他yarn应用跑在一起
- tez能够及时的释放资源,重用container,节省调度时间,对内存的资源要求率不高; 而spark如果存在迭代计算时,container一直占用资源;
可以参考Spark面试题(一)