上篇讲完spark-shell,那么这篇就讲讲spark-submit吧,spark-shell其实在shell脚本中也是执行的spark-submit命令,指定--class参数为org.apache.spark.repl.Main
从spark-submit脚本中,可以看到,底层执行的是spark-class命令,SparkSubmit被当作主类传进去,exec命令的意思也是执行,用被执行的命令行替换掉当前的shell进程,且exec命令后的其他命令将不再执行
进入SparkSubmit类,查找主函数
override def main(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
主函数中会创建SparkSubmitArguments对象,这个类主要的作用就是较验用户使用spark-submit提交的参数,最后封装到对象中,进入该类后,会执行parse(args.asJava)方法,解析spark-sumit后面跟着的参数,正则匹配参数中如果带有=并且是–开头的话,则会解析成key-value键值对,如果参数中没有=,则会进行关键字匹配,例如–driver-memory,代码中已经枚举了所有的关键key,循环较验参数,正则没匹配上的参数再进行循环匹配,如果正则全都没匹配上的话,时间复杂度就是O(n2),这段代码是java写的,可能是多个开发者协同开发吧,个人感觉使用scala match匹配会更好些,内层循环中如果匹配上了关键词,则会将下一个参数也取进来当成value,如果没匹配上,则会执行handleUnknown方法,判断当前参数是否为提交的jar资源,如果是jar资源则跳出参数解析循环,剩下的参数被传递进入handleExtraArgs方法,最后当成用户提交的jar的main class参数
参数解析完后调用mergeDefaultSparkProperties合并重复的配置参数,接着执行ignoreNonSparkProperties忽略无效的配置参数,无效的参数主要就是非spark开头的参数,该类参数会被忽略掉,项目中如果使用conf配置自定义参数时可以避坑,接着加载参数给类中的属性赋值,action默认为submit,如果指定的话可以为kill,status,用了这么久的spark-submit,一直以为只有提交任务用,没想到还可以查看任务状态、kill任务。
if (mainClass == null && !isPython && !isR && primaryResource != null) { val uri = new URI(primaryResource) val uriScheme = uri.getScheme() uriScheme match { case "file" => try { Utils.tryWithResource(new JarFile(uri.getPath)) { jar => // Note that this might still return null if no main-class is set; we catch that later mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") } } catch { case _: Exception => SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource") } case _ => SparkSubmit.printErrorAndExit( s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " + "Please specify a class through --class.") } }
提交的jar的主类参数如果不存在,则会从打包时pom生成的文件中获取指定的main class,最后还要执行validateArguments方法验证一遍参数。
参数解析较验完后,接着会根据args.action匹配是submit还是kill或者status操作
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
}
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
进入submit方法后会执行prepareSubmitEnvironment,该方法执行到下面,会判断当前执行模式,基于yarn,k8s,mesos等,不同的模式会提供不同的操作,提供各自的较验,当然所有的配置会写入sparkConf对象中,再后续会执行DependencyUtils.resolveMavenDependencies方法,解决maven中的jar依赖问题,生成一些排除规则去除一些jar。如下部分jar就会被排除
val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_",
"mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_",
"tags_", "unsafe_")
/** Add exclusion rules for dependencies already included in the spark-assembly */
def addExclusionRules(
ivySettings: IvySettings,
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
IVY_DEFAULT_EXCLUDES.foreach { comp =>
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
ivyConfName))
}
获取到返回值后,从doRunMain进入runMain方法,准备执行用户提交的main函数,创建SparkApplication对象
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
SparkApplication是trait类型,如果SparkApplicaiton是mainClass的父类,则直接通过mainClass创建实例,否则创建类JavaMainApplication实例,将mainClass当作参数传入
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
调用start方法,这个地方用到了多态,如果调用的是JavaMainApplication类型的start方法,如下,则会通过反射执行main方法,并将sparkConf对象中的配置写入SystemProperties中
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
mainMethod.invoke(null, args)
}
}
源码中涉及到很多面向对象知识,有些知识点都忘了,看了源码,满脑子都是继承、封装、多态,对于面向对象很熟的同学,看源码应该会容易很多