<
Spark源码学习笔记(二十一)
>
上一篇

Spark源码学习笔记(二十二)
下一篇

Spark源码学习笔记(二十)
Spark SQL之物理计划

逻辑计划执行完,生成的Optimizer Logical Plan会被SparkPlanner调用plan方法执行,SparkPlaner会将各种物理计划策略strategies作用到Logical Plan上,生成Iterator[SparkPlan],SparkPlan是物理计划的父类,所以一个逻辑计划可能产生多个物理计划,然后选取最佳的SparkPlan,看如下代码,spark2.3.3版本中仍然是用next方法选取第一个。之后在执行计划前做一些准备工作,调用若干规则应用,toRDD函数正式执行物理计划,转换成RDD[InternalRow]类型

lazy val withCachedData: LogicalPlan = {
  assertAnalyzed()
  assertSupported()
  sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}

lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

lazy val sparkPlan: SparkPlan = {
  SparkSession.setActiveSession(sparkSession)
  // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
  //       but we will implement to choose the best plan.
  planner.plan(ReturnAnswer(optimizedPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

/**
 * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
 * row format conversions as needed.
 */
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
  preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
  python.ExtractPythonUDFs,
  PlanSubqueries(sparkSession),
  EnsureRequirements(sparkSession.sessionState.conf),
  CollapseCodegenStages(sparkSession.sessionState.conf),
  ReuseExchange(sparkSession.sessionState.conf),
  ReuseSubquery(sparkSession.sessionState.conf))

在生成物理计划时,需要对子节点进行分区和排序,每个子类会有自己的实现,UnspecifiedDistribution就是未指定分布,无需确定数据之间的位置关系

/**
   * Specifies the data distribution requirements of all the children for this operator. By default
   * it's [[UnspecifiedDistribution]] for each child, which means each child can have any
   * distribution.
   *
   * If an operator overwrites this method, and specifies distribution requirements(excluding
   * [[UnspecifiedDistribution]] and [[BroadcastDistribution]]) for more than one child, Spark
   * guarantees that the outputs of these children will have same number of partitions, so that the
   * operator can safely zip partitions of these children's result RDDs. Some operators can leverage
   * this guarantee to satisfy some interesting requirement, e.g., non-broadcast joins can specify
   * HashClusteredDistribution(a,b) for its left child, and specify HashClusteredDistribution(c,d)
   * for its right child, then it's guaranteed that left and right child are co-partitioned by
   * a,b/c,d, which means tuples of same value are in the partitions of same index, e.g.,
   * (a=1,b=2) and (c=1,d=2) are both in the second partition of left and right child.
   */
  def requiredChildDistribution: Seq[Distribution] =
    Seq.fill(children.size)(UnspecifiedDistribution)
    
/** Specifies sort order for each partition requirements on the input data for this operator. */
  def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

过滤算子FilterExec与列剪裁算子ProjectExec分区和排序的方式仍然沿用其子节点的方式,不对RDD的分区和排序进行任何的重新操作

转换逻辑算子到物理算子的plan方法实现在QueryPlanner类中,SparkStrategies类继承自QueryPlaner,但不提供任何方法,仅提供策略的实现,如下代码为逻辑计划转换成物理计划的实现,prunePlans方法用于过滤差的物理计划,但是当前版本2.3.3中还未实现

  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p == placeholder => childPlan
                }
              }
            }
        }
      }
    }

    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }
  
  override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = {
    // TODO: We will need to prune bad plans when we improve plan space exploration
    //       to prevent combinatorial explosion.
    plans
  }

到RDD的过程仍然需要使用如下规则,CollapseCodegenStages用于代码生成相关

/** A sequence of rules that will be applied in order to the physical plan before execution. */
  protected def preparations: Seq[Rule[SparkPlan]] = Seq(
  	 // 提取Python中的UDF函数
    python.ExtractPythonUDFs,
    // 特殊子查询物理计划处理
    PlanSubqueries(sparkSession),
    // 确保分区排序正确性
    EnsureRequirements(sparkSession.sessionState.conf),
    // 代码生成
    CollapseCodegenStages(sparkSession.sessionState.conf),
    // Exchange节点重用
    ReuseExchange(sparkSession.sessionState.conf),
    // 子查询重用
    ReuseSubquery(sparkSession.sessionState.conf))
Top
Foot