flink源码-获取env
flink 流处理的每个Job,都需要StreamExecutionEnvironment|ExecutionEnvironment作为程序执行上下文。下面是获取StreamExecutionEnvironment执行上下文的静态方法。
1 | /** |
方法的逻辑大致是:
- 创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。
- 根据通过ExecutionEnvironment创建的返回 env 类型,返回不同的StreamExecutionEnvironment子类。
根据方法中的注释描述,以及contextEnvironmentFactory的初始化条件,可以知道DataStream在判断当前程序运行环境时,借用了DataSet 的判断逻辑,根据 DataSet中的结果返回对应的env。
下面开始聚焦ExecutionEnvironment的getExecutionEnvironment 方法:
1 | public static ExecutionEnvironment getExecutionEnvironment() { |
可以看到 getExecutionEnvironment() 方法的逻辑出现了2个分支,分支的判定条件是contextEnvironmentFactory == null
。
1 | // contextEnvironmentFactory 是一个静态属性 |
local Environment
当 contextEnvironmentFactory 为 null 时,会调用createLocalEnvironment(),获得本地执行环境。
1 | public static LocalStreamEnvironment createLocalEnvironment() { |
根据和 ExecutionEnvironment 同样的逻辑, StreamExecutionEnvironment会得到 LocalStreamEnvironment ( StreamExecutionEnvironment.createLocalEnvironment()方法的返回值 )。
local Environment可以很方便地用于本地调试。
集群环境
到这差不多就走完了集群模式下,env的设置以及获取。