心跳是从executor发送到driver,driver根据心跳判断executor是否存活
HeartbeatReceiver类实现了SparkListener,也实现了ThreadSafeRpcEndpoint,所以既是一个监听器,也是一个endpoint,身兼多职 当该类创建后,就会被添加到监听队列中,接下来的活就需要监听器和endpoint一起处理了 onStart方法在Inbox中会被调用,RPC章节创建EndPointRef时,需要创建EndpointData,每个EndpointData对象都有自己单独的Inbox对象,Inbox对象初始化时会立马将OnStart方法添加到messages中,下述方法用线程池定时调度固定的线程,默认是60s,每60s询问一次executor是否存活,
override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
}
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}
driver收到消息后会执行expireDeadHosts方法,该方法中会判断executor是否存活,如果长时间(大于心跳检测60s)没有响应,则会移除当前executor,并使用线程池执行sc.killAndReplaceExecutor方法,sc中会用ExecutorAllocationClient中的方法killExecutor
private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = clock.getTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
executorLastSeen.remove(executorId)
}
}
}
每次收到executor的Heartbeat消息时都会更新为最新通信的时间,除了这些,当ExecutorRegistered,ExecutorRemoved,endpoint都需要做相应的操作并给出答复,而发送这些消息的就是本身,因为它自身是个listener,监听executor相关事件,当executor注册/移除后,可以立马执行相应的操作,获取endpointRef发消息给endpoint(自己),自己就可以做相对应的操作
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Messages sent and received locally
case ExecutorRegistered(executorId) =>
executorLastSeen(executorId) = clock.getTimeMillis()
context.reply(true)
case ExecutorRemoved(executorId) =>
executorLastSeen.remove(executorId)
context.reply(true)
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
context.reply(true)
case ExpireDeadHosts =>
expireDeadHosts()
context.reply(true)
// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
} else {
// This may happen if we get an executor's in-flight heartbeat immediately
// after we just removed it. It's not really an error condition so we should
// not log warning here. Otherwise there may be a lot of noise especially if
// we explicitly remove executors (SPARK-4134).
logDebug(s"Received heartbeat from unknown executor $executorId")
context.reply(HeartbeatResponse(reregisterBlockManager = true))
}
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
// register itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
context.reply(HeartbeatResponse(reregisterBlockManager = true))
}
}
监听executor事件并发消息给endpointRef代码如下
/**
* Send ExecutorRegistered to the event loop to add a new executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def addExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
}
/**
* If the heartbeat receiver is not stopped, notify it of executor registrations.
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
addExecutor(executorAdded.executorId)
}
/**
* Send ExecutorRemoved to the event loop to remove an executor. Only for test.
*
* @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
* indicate if this operation is successful.
*/
def removeExecutor(executorId: String): Option[Future[Boolean]] = {
Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
}
/**
* If the heartbeat receiver is not stopped, notify it of executor removals so it doesn't
* log superfluous errors.
*
* Note that we must do this after the executor is actually removed to guard against the
* following race condition: if we remove an executor's metadata from our data structure
* prematurely, we may get an in-flight heartbeat from the executor before the executor is
* actually removed, in which case we will still mark the executor as a dead host later
* and expire it with loud error messages.
*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
removeExecutor(executorRemoved.executorId)
}
当taskScheduler创建完成后,endpointRef也会发TaskSchedulerIsSet消息给endpoint