Spark 的整体架构和关键概念
参考 Cluster Mode Overview, 当一个应用在 Spark 集群上运行时,总体来说包含两大部分: 基础设施集群和Spark Application。
Spark 目前支持的基础设施集群包括 Standalone(Spark 内置集群), Apache Mesos, Hadoop YARN 和 Kubernetes。 基础设施集群包括两部分:
-
Cluster manager: 运行在作为集群管理节点的主机上;
-
Worker node: 承担具体计算工作的主机节点,被cluster manager 管理;
基础设施集群作为计算基础设施是持久存在的,与 Spark Application 的生命周期无关。
Spark Application 包含用户定义的计算过程,例如一个 Java/Scala jar 或者 python/R script,
包括 driver program 和 executor 两部分,
它们都是 JVM process,生命周期与 Spark Application 的生命周期一致。
spark-shell
和 pyspark
也是 Spark Application。
其他主要概念包括(参考 "Glossary" in Cluster Mode Overview):
-
Driver program: 负责运行Spark Application 的
main
方法并创建 SparkContext, Driver 还内置了一个 web server (下面简称为 4040), 监听它所在主机的 4040 端口,下面将结合它分析 Spark Application 的逻辑运行过程。 -
SparkContext: 由 driver program 创建,负责调度管理整个 Spark Application 各部分通信;
-
SparkSession: SparkSession 是 Spark 2.x 中新出现的概念,由 Spark 1.x 中的 SparkContext 和 SQLContext 合并而来,为用户提供更一致的 API。 SparkSession 对象内部包含一个 SparkContext 对象,负责与 Spark cluster 通讯方面的工作;
-
Executor: 运行在基础设施集群的一台 worker node 上,完成本地计算任务。 它可以同时执行
spark.executor.cores
个 task,默认值是主机的逻辑CPU数量, 即lscpu
中CPU(s)
的值,等于 \(Sockets \times Cores_per_socket \times Thread_per_core\).
Spark Application 的物理运行过程
大体分为 client request, launch, execution, completion 几个阶段,详细参考 "The Life Cycle of a Spark Application (Outside Spark)" in [SDG15].
-
Client Request: 用户将 Spark app 从 client 发给 cluster manager, cluster manager 选择集群中的一个节点做 Spark driver;之后 client 执行完毕退出。
-
Launch: Spark driver 开始执行用户程序,首先初始化 SparkContext, 由 SparkContext 连接 cluster manager (通过用户指定的
--master
参数), 将用户指定的 executor 数量和其他配置信息发给它用来创建 executors process, cluster manager 按照要求在 worker node 上创建好 executors 后,将它们的位置信息 返回给 SparkContext,至此 Spark cluster 创建完毕。 -
Execution: 具体计算工作在这一阶段完成,由 Spark driver 协调各个节点, 分配计算任务,接收任务执行结果。此阶段的信息和数据传输发生在 Spark cluster 内部,与基础设施集群无关。
-
Completion: Spark driver 执行结束,返回 成功/失败 结果,cluster manager 负责关闭 与此 Spark driver 相关的 executors.
Spark Application 的逻辑运行过程
样例程序 (in Python):
$ pyspark
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000
启动 pyspark
后,就可以在 http://localhost:4040 查看 app 运行情况了,
这时 Jobs 和 Stages 页面均为空,Executors 页面显示有一个 executor,
ID 为 driver,监听端口 33869,在系统命令行中输入 jps
可以看到一个名为
SparkSubmit
的进程,PID为16270,通过 netstat
命令可以看到正是这个进程监听
33869端口。
运行step4.collect()
前面的代码,可以看到4040页面上 Jobs 和 Stages 始终是空的,
表示 job 没有实际执行。
执行step4.collect()
,4040 Jobs 页面上出现一条ID为0的记录
(由于 collect()
方法是 action,前面的操作都是 transformation,
表明一个 job 对应一个 action)。
点击 job0 后出现 Details for Job 0 页面,包含6个 Completed Stages, ID 为 0 ~ 5。页面上的 DAG Visualization 图示了这6个 stage 之间的继承关系。
其中 stage0 和 stage1 对应 df1 = ...
和 df2 = ...
两行,
二者各包含8个task,stage0 的 Shuffle Write 为24MB,stage1为12MB,
这是因为 df1
的长度是 df2
的二倍。
由于代码是在单机 Spark 2.2 的 pyspark REPL 中运行,
机器的逻辑CPU数为8,所以一个 stage 默认的(并行)task 数为8。
点击 stage0 进入 Details for Stage 0 页面,其中的 Shuffle Write Size / Records
的值为 24.0 MB / 4999999,与 df1
的长度吻合,这些 records 被平均分配给了
8个 task,每个 task 的 Shuffle Write Size / Records 都是 1/8.
Stage1 对应的 df2
的长度是 df1
的一半,所以处理的 records 也只有 stage0
的一半。
从 DAG 图可知 stage2 从 stage1 变化而来,stage2 的 Shuffle Read 和 stage1 的
Shuffle Write 都是 12MB,也印证了这两个 stage 的继承关系,
所以 stage2 对应代码行是 step12 = df2.repartition(6)
,所以它包含6个 task。
Stage3 对应 step1 = df1.repartition(5)
和 step2 = step1.selectExpr(...)
,
包含5个 task。
Stage4 对应 step3 = step2.join(...)
,所以父节点是 stage3 和 stage2,
所以它的 shuffle read 值 (38.6MB) 是两个父节点各自 shuffle write 值
(13MB 和 25.6MB) 的和。
这个 stage 有200个 task,是因为 spark.sql.shuffle.partitions
的默认值是200。
关于这个参数(以及其他配置参数)可以参考
Spark SQL, DataFrames and Datasets Guide。
Stage5 对应 step4 = step3.selectExpr(...)
和 step4.collect()
,
前者是一个 transformation,后者是action,所以两个归入一个 stage.
逻辑架构中的关键概念
-
Job: 一个 job 包含一个或多个 stages,一个 job 对应一个 action ;
-
Stage: 一个 stage 包含一组可并行计算的 tasks,一个 stage 对应一次 shuffle;
-
Shuffle: 数据在 executor 间移动,叫做 shuffle,例如初始化 RDD,对 RDD 重新分区 (repartition), join, collect 都会引起 shuffle 动作;
task
Spark 的 task 包含如下特征:
-
一个 task 只运行在一个 executor 上;
-
一个 task 在一个 RDD 中的一个 partition 上执行一组 transformations;
-
一个 task 可以使用
spark.task.cpus
个 CPU; -
同一个 stage 中不同 task 是并行执行的;
Task 的执行过程:
Driver 序列化一个 function 对象,并发送给 executor,executor 收到后做反序列化, 并在某个 partition 上执行之。
Ref:
[SDG15]: Chapter 15 of "How Spark Runs on a Cluter, in Spark: the Definitive Guide" by Bill Chambers.