yield-bytes

沉淀、分享与无限进步

深入理解Spark

  在前面博客文章里,已经把大数据实时分析项目在spark组件之前的各个组件原理、部署和测试都给出相关讨论,接下来是项目最核心的内容:实时计算部分,因为项目将使用spark streaming做微批计算(准实时计算),因此接下的文章内容将深入spark以及spark streaming架构原理,为后面实际计算编程做铺垫。

1、Spark 是什么?

  Spark是一种分布式的并行计算框架,什么是计算框架?所谓的计算(在数据层面理解)其实是用于数据处理和分析的一套解决方案,例如Python的Pandas,相信用过Pandas都很容易理解Pandas擅长做什么,加载数据、对数据进行各类加工、分析数据等,只不过Pandas只适合在单机上的、数据量百万到千万级的计算组件,而Spark则是分布式的、超大型多节点可并行处理数据的计算组件。

  Spark通常会跟MapReduce做对比,它与MapReduce 的最大不同之处在于Spark是基于内存的迭代式计算——Spark的Job处理的中间(排序和shuffling)输出结果可以保存在内存中,而不是在HDFS磁盘上反复IO浪费时间。除此之外,一个MapReduce 在计算过程中只有Map 和Reduce 两个阶段。而在Spark的计算模型中,它会根据rdd依赖关系预选设计出DAG计算图,把job分为n个计算阶段(Stage),因为它内存迭代式的,在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是两个阶段。
  Spark提供了超过80种不同的Transformation和Action算子,如map,reduce,filter,reduceByKey,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小(尤其用Scala和Python写计算业务代码方面)。正是基于使用易用性,因此Spark能更好地用于基于分布式大数据的数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark生态如下:
在这里插入图片描述

2、Spark 运行模式

目前最为常用的Spark运行模式有:

  • Local:本地进程运行,例如启动一个pyspark交互式shell,一般用于开发调试Spark应用程序
  • Standalone:利用Spark自带的资源管理与调度器运行Spark集群,采用Master/Slave结构,可引入ZooKeeper实现spark集群HA
  • Hadoop YARN : 集群运行在YARN资源管理器上,资源管理交给YARN,Spark只负责进行任务调度和计算,参考本博客《基于YARN HA集群的Spark HA集群》
  • Apache Mesos :运行在著名的Mesos资源管理框架基础之上,该集群运行模式将资源管理交给Mesos,Spark只负责进行任务调度和计算
    ==Mesos和YARN两种资源有什么区别:==
    之前看一个视频,对其给出的解释印象深刻:
    Mesos:细腻度资源管控
    YARN:粗粒度资源管控
    例如有个老师要给45个学生上课,向教务处申请课室资源,若教务处以Mesos模式发放资源,那么它会发放只能容纳45个学生的课室,典型的按需分配;若教务处以YARN模式发放资源,那么它会发放能容200个学生的大教室,但实际上还有155个人位置资源空闲。这就是资源的细腻度和粗粒度的区别。

3、适合Spark的场景

  • Spark适用场景:
    Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合,基于大数据的机器学习再适合不过,例如梯度下降法,需要不断迭代找到全局或局部最优解。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小。
    准实时计算场合:实时接收用户行为原始数据,并通过Spark Streaming计算(转换+加工),例如在广告、报表、推荐系统等业务上,在广告业务方面需要大数据做应用分析、效果分析、定向优化等,在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等,这些业务天生适合大型的互联网巨头。

  • Spark不适用场景:

    内存消耗极大,在内存不足的情况下,Spark会下放到磁盘,会降低应有的性能
    有高实时性要求的流式计算业务,例如实时性要求毫秒级,对每一条数据都触发实时计算的,这种场合已经被Flink称霸。
    流线长或文件流量非常大的数据集不适合,这是因为这种场合rdd消耗极大的内存集群压力大时,一旦一个task失败会导致它前面一条线所有的前置任务全部重跑(尤其对于RDD 血缘关系链长且有多个宽依赖的情况),JVM的GC不够及时,内存不能及时释放,将会出现恶性循环导致更多的task失败,导致整个Application效率极低。所以为什么说Spark是适合“微批”处理,意味着每隔一段时间(1秒或者几秒不等)来一批次数据,Spark适合“一小口一小口准实时地吃数据”。

4、Spark相关术语

在这里插入图片描述一个完整的Spark应用程序,例如wordcount,在提交集群运行时:
./bin/spark-submit --name word_count_app --master yarn --deploy-mode cluster --py-files word_count.py
它涉及到上图流程的相关术语:

  • SparkContext:整个应用的上下文,控制应用的生命周期。
  • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型(鉴于RDD是Spark的核心概念,后面的有一篇博客给出相关讨论)。无法快速理解RDD?这里有两种方式可助于理解:
    A、把它设想为分布式的Pandas dataframe,df也是一个数据集,也是被加载到内存上,然后利用pandas各个算子对df反复迭代最后得出计算结果。
    B、rdd = sc.parallelize(list(range(1000)), 10),这就创建了一个“轻量”的rdd数据集,设想下:这个数组有10亿项,分10个分区,有10个计算节点,每个节点负责1个分区的计算,也就是说从计算节点来看,每个节点负责1亿行的“小块rdd”;而从用户逻辑层面来看:就一个大rdd,包含10亿项数据

  • RDD的窄依赖和宽依赖

    A、窄依赖NarrowDependency(一对一):不会产生分区之间的shuffle,所有的父RDD的partition会一一映射到子RDD的partition中,例如Map、FlatMap、Filter算子等

    B、宽依赖ShuffleDependency(一对多):会引起多个分区之间的shuffle,父RDD中的partition会根据key的不同进行切分,划分到子RDD中对应的partition中,例如reduceByKey的任务。

  • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系(关系链)。

  • Driver Program:Application中运行main函数并创建的SparkContext,创建SparkContext的目的是和集群的ClusterManager通信,进行计算资源的申请、任务的分配和监控等。因此也可认为SparkContext代表Driver控制程序。Driver负责对Application构建DAG图。

  • Cluster Manager:集群资源管理中心,例如YARN里面的ResourceManage,负责分配计算资源分配和回收。

  • Worker Node:启动Executor或Driver负责完成具体计算,在YARN模式中 Worker Node就是NodeManager节点。

  • Executor:是Application在Worker上面的一个进程,该进程会启动线程池方式去跑task,并负责把数据存在内存或者磁盘上。每个Application都有属于自己的一组Executors。在Spark on YARN模式下,Executor进程名为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor实例,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。

  • Application:用户编写的Spark应用程序,例如下面启动一个名字为word_count_app的Application(简称app),该app的业务逻辑在word_count.py实现。一个Application包含多个Job。
    ./bin/spark-submit --name word_count_app --master yarn --deploy-mode cluster --py-files word_count.py

  • Job:包含多个Task组成的并行计算,由Spark Action算子(collect、groupByKey、ReduceByKey、count、takeOrdered等)触发产生。一个action产生一个job,如果一个Application里面的业务代码有多少个action算子,就产生多少个Job。一个Job包含多个RDD及作用于相应RDD上的各种操作。

  • Task:任务,运行在Executor上的工作单元,是Executor中的一个线程(Executor是JVM进程,在YARN模式下,就是一个跑在container上JVM进程),与Hadoop MapReduce中的MapTask和ReduceTask一样,是运行Application的基本单位。多个Task组成一个Stage,而Task的调度和管理由TaskScheduler负责。

  • Stage:DAGScheduler将一个Job划分为若干Stages,每个Stage打包成一组Tasks,又称TaskSet。Stage的调度和划分由DAGScheduler负责。Stage又分为Shuffle Map Stage和Result Stage两种。Stage的边界就在发生Shuffle(rdd宽依赖)的地方。

  • DAGScheduler:根据Job构建基于Stage的DAG(有向无环任务图),DAGScheduler会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,并提交Stage给TaskScheduler。

  • TaskScheduler:将任务(Task)分发给Executor,每个Executor负责运行什么Task由TaskScheduler分配。

  • Shared Variables共享变量:Application在整个运行过程中,可能需要一些变量在每个Task中都使用,用于节省计算时间和IO。Spark有两种共享变量:一种缓存到各个节点的广播变量:broadcast;一种只支持加法操作:accumulator,一般用于对rdd求和sum以及累加计数counter。 这个概念需结合实际用例说明,否则难以理解。

  一句话:一个Application(其实就是你设计的Spark业务程序)由多个Job组成,一个Job由多个Stage组成,一个Stage由多个Task组成一个TaskSet。Stage是作业调度的基本单位,Task是执行计算和操作rdd的最小工作单元,以下面的wordcount语句对于的关系图说明:

1
sc.textFile("hdfs://nn:9000/app").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false)

在这里插入图片描述  这里为何画了2个Job,因为该计算任务包含两个Action:reduceByKey和sortBy。Application里面有多少个Action算子,Driver就给Application分配多少个Job。

  Spark计算涉及的相关部件比较多而且相对抽象,需要在实际spark集群上跑几个测试application结合理解,本博客在前面的文章已经在测试项目中结合spark UI的截图就Job、Stage、Task等给出较为详细的说明,参考《基于hadoop3.1.2分布式平台上部署spark HA集群》第7章内容。

5、Spark程序执行流程

在这里插入图片描述  Spark 程序执行流程基于不同资源管理器其有不同的执行流程,以下以Standalone模式说明执行流程。其他资源管理模式的执行流程可以参考这篇文章:《Spark任务提交方式和执行流程》

  • 1)构建Spark Application的运行环境,启动SparkContext也即启动Driver,Driver向资源管理器注册并申请运行Executor资源;
  • 2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
  • 3)Driver根据算子链预先构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task。

  • 4)TaskScheduler将Task发送给Executor运行,同时Driver将应用程序代码传给Executor。

  • 5)Task在Executor上运行,运行完毕释放所有资源。

其实不管在哪种种资源模式下,Spark的程序执行流程一定离不开三条主线:

  • Driver 申请资源用于启动Executor
  • Driver 构建DAG图,切分Stage,最终生产出多组TaskSet
  • Executor 领取Task和业务代码并执行

6、理解Spark Stage的划分

  本部分内容比较重要,也是理解spark任务调度原理的关键,本章节内容参考《Spark Scheduler内部原理剖析》其中任务调度内容

6.1 Spark Stage的划分

  这里以wordcount的Scala语句分析Spark对于一个application是如何划分stage的

1
sc.textFile("hdfs://nn:9000/app").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://nn:9000/result")

  以上就是一个Job,由rdd和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据rdd的血缘关系的DAG图(你可以理解为预先规划的计算流程)进行切分,将一个Job划分为若干Stages,具体划分策略是,由处于末端的RDD不断通过依赖回溯判断父依赖是否为宽依赖,即以Shuffle为界,划分Stage。
划分的Stages分两类,一类叫做ResultStage,由Action方法决定,是DAG计算流程图最下游的Stage,这个Stage会最先被划分出来(因为是从末端回溯,因此首先遇到宽依赖reduceByKey,因此ResultStage最先被划出)。另一类叫做ShuffleMapStage,为下游ResultStage准备数据。
以下面的DAG流程作为说明:
1
sc.textFile("hdfs://nn:9000/app").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://nn:9000/result")

在这里插入图片描述  Job由Action算子saveAsFile触发,该Job由rdd3和saveAsTextFile方法组成,根据rdd之间的依赖关系:

  • 首先从rdd3开始回溯搜索,在回溯搜索过程中,rdd3依赖rdd2,遇到reduceByKey需要宽依赖,所以在rdd3和rdd2之间划分Stage,该Stage为ResultStage
  • 继续回溯,rdd2依赖rdd1,遇到Map窄依赖,不划分stage
  • rdd1依赖rdd0,遇到flapMap窄依赖,不划分Stage
  • 最终回溯到源头rdd0,rdd0无父依赖,因此rdd2、rdd1和rdd0都划分到同一个Stage,即ShuffleMapStage。

小结:一个Spark应用程序包括Job、Stage以及Task三个概念:

  • Job是以Action方法(算子)为界,遇到一个Action方法则触发一个Job
  • Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次Stage划分
    • Task是Stage的子集,以并行度(分区数)来衡量,一个Stage有多少个partition就有多个task
6.2 Spark DAG的可视化

  为了更直观理解6.1 stage的划分,以及DAG可视化,这里用另外一个名为word-count的application语句跑一个测试,并在spark web UI查看相关划分情况以及执行计划:

1
sc.textFile("hdfs://nn:9000/app").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false)

从提交语句的逻辑即可直接看出分为两个Stage。在web端可直观看到Application有2个executor分别位于两个不同spark节点上。
==对于Stage 0:==
Stage 0 的Taskset有3个task,其中1个task在节点5上的executor0进程跑,另外2个task在节点6的executor1进程上跑。executor进程会使用多线程方式运行自己管辖的tasks
在这里插入图片描述==对于Stage 1:==
Stage 1 的Taskset有3个task,其中1个task在节点5上的executor0进程跑,另外2个task在节点6的executor1进程上跑。executor进程会使用多线程方式运行自己管辖的tasks
在这里插入图片描述
这从两张图也可以看到Application的运行机制:引用官网文档Scheduling Across Applications的一句话:

When running on a cluster, each Spark application gets an independent set of executor JVMs that only run tasks and store data for that application
当提交的application是在集群上跑时,每个application包含多个executor JVMs运行进程,这些executors(JVM进程)只负责为当前的application运行它的tasks任务和存储数据

7、Spark调度过程

  有了前面内容铺垫后,本章节内容才能比较好理解,本章节内容参考《Spark Scheduler内部原理剖析》其中的”Spark任务调度总览“,这篇文章质量不错。

7.1 Spark的两级调度模型

  Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。Stage级别的调度前面第6章节已经给出详细说明,至于Task级的调度相对复杂,原文作者给出了非常专业的、从源代码执行流程的说明,但这里不再重复累赘,毕竟重心还是以大数据项目在应用层的开发为主。
总体调度流程如下图所示。
在这里插入图片描述

  • 1)Spark RDD通过其Transactions和Action,形成了RDD的计划执行流程图,即DAG,最后通过Action的调用,触发Job并调度执行。
  • 2)DAGScheduler负责Stage级的调度,主要是将DAG按照RDD的宽依赖切分成若干Stages,并将每个Stage里面的多个task打包成一个TaskSet。多个Stage就有多个TaskSets,这些TaskSets由DAGScheduler交给TaskScheduler调度。
  • 3)TaskScheduler负责Task级的调度,TaskSets按照指定的调度策略分发到不同的Executor上执行。
  • 4)调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多个实现API,分别对接不同的资源管理器YARN/Mesos/Standalone。你可以看成SchedulerBackend就是资源的代理,这个代理不断询问TaskScheduler是否需要资源去运行task。

==这里会有一个疑问:每个Stage里面的task的数量怎么确定?==
每个Stage里面的task的数量是由该Stage中最后一个RDD的Partition数量所决定!

7.2 以Spark On Yarn说明调度过程

  Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver、DAGScheduler、TaskScheduler、Executor等内部模块的交互过程,以进一步巩固理解7.1章节的内容。
(从下图中有无发现一个有趣的现象:大数据多个组件理解难,其实不是那种像操作系统底层原理的难,而是在于:大数据的每个组件内部有很多个角色模块,每个角色负责的”工作“不一样,整个hadoop生态圈,每个组件和组件之间联系,这就会涉及到几十个实现模块,你要记忆和理解这几十个不同名字模块及其具体能做什么,以及他们之间的逻辑联系关系。
举个栗子:

  • 如果你是部门经理,部门只有9个人,3个岗位,如果现在接到一个开发项目,你当然很清楚和也容易安排每个岗位的人负责项目哪部分开发工作,并指定谁跟谁如何协调某部分内容。
  • 如果你是市长,假设现在你接到一个上级大型项目,需要你亲自统筹和设计出如何让20个局完成该大型项目的方案。首先需要设计出10个局单位之间怎么配合,10个局单位共有100个岗位,每个局的每个岗位具体负责哪部分工作,并且你要设计出项目某部分内容是由哪个岗位和哪个岗位直接如何调度完成,够崩溃的。这就是大数据生态圈,概念多、杂,概念之间有一定逻辑关系,这些都需要你去理解和记忆,这就是难点所在。

    在这里插入图片描述
  • Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver(这四部分是程序里面类或者模块,不是线程),并启动SchedulerBackend线程以及HeartbeatReceiver线程。
  • 当Driver启动后,ApplicationMaster会通过本地的RPC连接Driver,并开始向ResourceManager申请Container资源运行Executor进程(一个Executor对应与一个Container),当ResourceManager返回Container资源,则在对应的Container上启动Executor。
  • SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。
  • DAGScheduler负责的Stage调度
  • TaskScheduler负责的Task调度。
  • work node上Executor进程负责运行Task和把数据存在内存或者磁盘上

小结

  本章内容对于博客接下来有关Spark以及Spark Streaming项目的文章理解起着关键的基础作用,下一篇文章,重点细讲RDDs。