DarkMatter in Cyberspace
  • Home
  • Categories
  • Tags
  • Archives

Apache Spark Architecture and Runtime


Spark 的整体架构和关键概念

参考 Cluster Mode Overview, 当一个应用在 Spark 集群上运行时,总体来说包含两大部分: 基础设施集群和Spark Application。

Spark 目前支持的基础设施集群包括 Standalone(Spark 内置集群), Apache Mesos, Hadoop YARN 和 Kubernetes。 基础设施集群包括两部分:

  • Cluster manager: 运行在作为集群管理节点的主机上;

  • Worker node: 承担具体计算工作的主机节点,被cluster manager 管理;

基础设施集群作为计算基础设施是持久存在的,与 Spark Application 的生命周期无关。

Spark Application 包含用户定义的计算过程,例如一个 Java/Scala jar 或者 python/R script, 包括 driver program 和 executor 两部分, 它们都是 JVM process,生命周期与 Spark Application 的生命周期一致。 spark-shell 和 pyspark 也是 Spark Application。

其他主要概念包括(参考 "Glossary" in Cluster Mode Overview):

  • Driver program: 负责运行Spark Application 的 main 方法并创建 SparkContext, Driver 还内置了一个 web server (下面简称为 4040), 监听它所在主机的 4040 端口,下面将结合它分析 Spark Application 的逻辑运行过程。

  • SparkContext: 由 driver program 创建,负责调度管理整个 Spark Application 各部分通信;

  • SparkSession: SparkSession 是 Spark 2.x 中新出现的概念,由 Spark 1.x 中的 SparkContext 和 SQLContext 合并而来,为用户提供更一致的 API。 SparkSession 对象内部包含一个 SparkContext 对象,负责与 Spark cluster 通讯方面的工作;

  • Executor: 运行在基础设施集群的一台 worker node 上,完成本地计算任务。 它可以同时执行 spark.executor.cores 个 task,默认值是主机的逻辑CPU数量, 即 lscpu 中 CPU(s) 的值,等于 \(Sockets \times Cores_per_socket \times Thread_per_core\).

Spark Application 的物理运行过程

大体分为 client request, launch, execution, completion 几个阶段,详细参考 "The Life Cycle of a Spark Application (Outside Spark)" in [SDG15].

  1. Client Request: 用户将 Spark app 从 client 发给 cluster manager, cluster manager 选择集群中的一个节点做 Spark driver;之后 client 执行完毕退出。

  2. Launch: Spark driver 开始执行用户程序,首先初始化 SparkContext, 由 SparkContext 连接 cluster manager (通过用户指定的 --master 参数), 将用户指定的 executor 数量和其他配置信息发给它用来创建 executors process, cluster manager 按照要求在 worker node 上创建好 executors 后,将它们的位置信息 返回给 SparkContext,至此 Spark cluster 创建完毕。

  3. Execution: 具体计算工作在这一阶段完成,由 Spark driver 协调各个节点, 分配计算任务,接收任务执行结果。此阶段的信息和数据传输发生在 Spark cluster 内部,与基础设施集群无关。

  4. Completion: Spark driver 执行结束,返回 成功/失败 结果,cluster manager 负责关闭 与此 Spark driver 相关的 executors.

Spark Application 的逻辑运行过程

样例程序 (in Python):

$ pyspark
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000

启动 pyspark 后,就可以在 http://localhost:4040 查看 app 运行情况了, 这时 Jobs 和 Stages 页面均为空,Executors 页面显示有一个 executor, ID 为 driver,监听端口 33869,在系统命令行中输入 jps 可以看到一个名为 SparkSubmit 的进程,PID为16270,通过 netstat 命令可以看到正是这个进程监听 33869端口。

运行step4.collect()前面的代码,可以看到4040页面上 Jobs 和 Stages 始终是空的, 表示 job 没有实际执行。

执行step4.collect(),4040 Jobs 页面上出现一条ID为0的记录 (由于 collect() 方法是 action,前面的操作都是 transformation, 表明一个 job 对应一个 action)。

点击 job0 后出现 Details for Job 0 页面,包含6个 Completed Stages, ID 为 0 ~ 5。页面上的 DAG Visualization 图示了这6个 stage 之间的继承关系。

其中 stage0 和 stage1 对应 df1 = ... 和 df2 = ... 两行, 二者各包含8个task,stage0 的 Shuffle Write 为24MB,stage1为12MB, 这是因为 df1 的长度是 df2 的二倍。 由于代码是在单机 Spark 2.2 的 pyspark REPL 中运行, 机器的逻辑CPU数为8,所以一个 stage 默认的(并行)task 数为8。

点击 stage0 进入 Details for Stage 0 页面,其中的 Shuffle Write Size / Records 的值为 24.0 MB / 4999999,与 df1 的长度吻合,这些 records 被平均分配给了 8个 task,每个 task 的 Shuffle Write Size / Records 都是 1/8.

Stage1 对应的 df2 的长度是 df1 的一半,所以处理的 records 也只有 stage0 的一半。

从 DAG 图可知 stage2 从 stage1 变化而来,stage2 的 Shuffle Read 和 stage1 的 Shuffle Write 都是 12MB,也印证了这两个 stage 的继承关系, 所以 stage2 对应代码行是 step12 = df2.repartition(6),所以它包含6个 task。

Stage3 对应 step1 = df1.repartition(5) 和 step2 = step1.selectExpr(...), 包含5个 task。

Stage4 对应 step3 = step2.join(...),所以父节点是 stage3 和 stage2, 所以它的 shuffle read 值 (38.6MB) 是两个父节点各自 shuffle write 值 (13MB 和 25.6MB) 的和。 这个 stage 有200个 task,是因为 spark.sql.shuffle.partitions 的默认值是200。 关于这个参数(以及其他配置参数)可以参考 Spark SQL, DataFrames and Datasets Guide。

Stage5 对应 step4 = step3.selectExpr(...) 和 step4.collect(), 前者是一个 transformation,后者是action,所以两个归入一个 stage.

逻辑架构中的关键概念

  • Job: 一个 job 包含一个或多个 stages,一个 job 对应一个 action ;

  • Stage: 一个 stage 包含一组可并行计算的 tasks,一个 stage 对应一次 shuffle;

  • Shuffle: 数据在 executor 间移动,叫做 shuffle,例如初始化 RDD,对 RDD 重新分区 (repartition), join, collect 都会引起 shuffle 动作;

task

Spark 的 task 包含如下特征:

  • 一个 task 只运行在一个 executor 上;

  • 一个 task 在一个 RDD 中的一个 partition 上执行一组 transformations;

  • 一个 task 可以使用 spark.task.cpus 个 CPU;

  • 同一个 stage 中不同 task 是并行执行的;

Task 的执行过程:

Driver 序列化一个 function 对象,并发送给 executor,executor 收到后做反序列化, 并在某个 partition 上执行之。

Ref:

[SDG15]: Chapter 15 of "How Spark Runs on a Cluter, in Spark: the Definitive Guide" by Bill Chambers.



Published

Sep 11, 2018

Last Updated

Sep 12, 2018

Category

Tech

Tags

  • spark 21
  • structure 1

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor