OutputCommitCoordinator主要用于判断task是否有权限提交输出到HDFS,canCommit用于判断task是否有提交权限,使用endpointRef发送消息给endpoint
def canCommit( stage: Int, stageAttempt: Int, partition: Int, attemptNumber: Int): Boolean = { val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) coordinatorRef match { case Some(endpointRef) => ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), RpcUtils.askRpcTimeout(conf).duration) case None => logError( "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?") false } }
endpoint接收到消息后判断能否提交,如果task被标记为失败,则没有权限,提交过或者stage已经结束也会导致失败
private[scheduler] def handleAskPermissionToCommit(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Boolean = synchronized {
stageStates.get(stage) match {
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
s"task attempt $attemptNumber already marked as failed.")
false
case Some(state) =>
val existing = state.authorizedCommitters(partition)
if (existing == null) {
logDebug(s"Commit allowed for stage=$stage.$stageAttempt, partition=$partition, " +
s"task attempt $attemptNumber")
state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber)
true
} else {
logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
s"already committed by $existing")
false
}
case None =>
logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
"stage already marked as completed.")
false
}
}
其他的就是stage开始和结束会调用该类中的stageStart/stageEnd方法,添加/移除相应的stageId及相关信息
当task完成时DAGScheduler会调用taskCompleted方法,如果状态是失败,stage会做记录
private[scheduler] def taskCompleted(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int,
reason: TaskEndReason): Unit = synchronized {
val stageState = stageStates.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
reason match {
case Success =>
// The task output has been committed successfully
case _: TaskCommitDenied =>
logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
s"partition: $partition, attempt: $attemptNumber")
case _ =>
// Mark the attempt as failed to blacklist from future commit protocol
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
}
}
}
这块的东西比较简单,没有涉及到其他的东西,主要就是判断task是否有提交输出权限到hdfs