flink源码-获取env

flink源码-获取env

flink 流处理的每个Job,都需要StreamExecutionEnvironment|ExecutionEnvironment作为程序执行上下文。下面是获取StreamExecutionEnvironment执行上下文的静态方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 创建程序执行时的上下文。
* 如果这个方法是在standalone模式下运行,会返回本地执行env。
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
/**
* 创建程序执行时的上下文。
* 如果这个方法是在standalone模式下运行,会返回本地执行env。
* 当从命令行执行的时候,给定的configuration会覆盖flink-conf.yaml中的重复配置项
*/
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils
// 获取上下文环境创建工厂,Optional<StreamExecutionEnvironmentFactory>
// 1. 优先选择 threadLocal 存储的工厂
// 2. threadLocal为空时返回 static 工厂
.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
// 通过工厂方法创建执行环境
// 如果工厂是空或者创建的环境是空,会触发orElseGet()
.map(factory -> factory.createExecutionEnvironment(configuration))
// 通过createLocalEnvironment方法创建本地执行环境
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

方法的逻辑大致是:

  1. 创建StreamExecutionEnvironment时线检查是否存在contextEnvironmentFactory,如果有直接从contextEnvironmentFactory创建返回。没有则通过ExecutionEnvironment创建。
  2. 根据通过ExecutionEnvironment创建的返回 env 类型,返回不同的StreamExecutionEnvironment子类。

根据方法中的注释描述,以及contextEnvironmentFactory的初始化条件,可以知道DataStream在判断当前程序运行环境时,借用了DataSet 的判断逻辑,根据 DataSet中的结果返回对应的env。

StreamExecutionEnvironment

ExecutionEnvironment

下面开始聚焦ExecutionEnvironment的getExecutionEnvironment 方法:

1
2
3
4
5
public static ExecutionEnvironment getExecutionEnvironment() {
return contextEnvironmentFactory == null
? createLocalEnvironment()
: contextEnvironmentFactory.createExecutionEnvironment();
}

可以看到 getExecutionEnvironment() 方法的逻辑出现了2个分支,分支的判定条件是contextEnvironmentFactory == null

1
2
3
4
5
6
7
8
// contextEnvironmentFactory 是一个静态属性
private static ExecutionEnvironmentFactory contextEnvironmentFactory;

// ExecutionEnvironment class 里 ExecutionEnvironmentFactory 唯一的赋值方法
// 注意,在集群环境会有调用该函数的时候
protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx){
contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
}

local Environment

当 contextEnvironmentFactory 为 null 时,会调用createLocalEnvironment(),获得本地执行环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}

public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
return createLocalEnvironment(parallelism, new Configuration());
}
private static LocalEnvironment createLocalEnvironment(Configuration configuration, int defaultParallelism) {
final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);

if (defaultParallelism > 0) {
localEnvironment.setParallelism(defaultParallelism);
}
return localEnvironment;
}

根据和 ExecutionEnvironment 同样的逻辑, StreamExecutionEnvironment会得到 LocalStreamEnvironment ( StreamExecutionEnvironment.createLocalEnvironment()方法的返回值 )。

local Environment可以很方便地用于本地调试。

集群环境

到这差不多就走完了集群模式下,env的设置以及获取。

-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%