本系列将参考Spark源码设计,尝试解读Spark源码中的几个核心模块,并结合源码分析实现。本章内容是基于spark 2.1.0的SparkContext的启动过程。
SparkContext
尽管spark从1.6之后一直以SparkSession
作为用户编程的主要api,但是SparkSession
实际仅仅是对SparkContext
,SQLContext
等入口对象的进一步分装,而涉及到spark核心模块的还是SparkContext
。
启动流程
启动流程在SparkContext的初始化块中,在scala类中,可以直接在类作用域内执行一些代码块,这些代码块的作用就相当于java中类的实例初始化块,在实例初始化时被调用,因此一般会有一些初始化的逻辑在这里。
代码做了省略,突出精华
1 |
|
总结一下这段代码的主要逻辑:
- 处理配置参数
- 创建事件总线LiveListenerBus,用于发布事件,监听事件
- 创建程序状态存储器AppStatusStore,是一个KV存储的包装类。并且将与此存储器关联的监听器添加到事件总线的appStatus队列中,监听appStatus类型的事件
- 创建SparkEnv对象,这个对象是spark的执行环境,是spark中最重要的类之一,内部分装了块管理器,shuffle管理器,map输出跟踪器,广播管理器,内存管理器等重要的基础设施,是spark运行的基石。
- 创建状态跟踪器,用于跟踪job和stage的执行情况,
- 创建ui对象,用于提供web页面访问服务
- 将spark.jars和spark.files添加到NettyStreamManager中以提供文件下载服务,executor会通过rpc下载这些文件
- 设置executor的一些环境变量,
- 创建HearbeatReceiver,并创建一个端点引用
- 创建schedulerBackend和taskScheduler,以常用的yarn cluster模式为例,创建的是YarnClusterScheduler和YarnClusterSchedulerBackend
- 创建DAGScheduler,dag调度器运行任务是通过向任务调度器提交任务实现的
- 启动TaskScheduler,内部启动了调度后端
- 初始化BlockManager,启动度量系统,
- 创建ExecutorAllocationManager,可选
- 创建ContextCleaner,用于清理RDD,shuffle,广播变量等的状态
- 向度量系统注册三个度量源,分别是DAG调度器度量源,块管理器度量源,动态资源申请管理器度量源
- 最后添加一个程序退出的钩子函数,用于在程序退出时关闭SparkContext