Trino源码学习-执行计划生成

Trino源码学习-执行计划生成

上篇分析了trino提交查询部分的源码,本篇来分析下,构建执行计划部分的源码。

DDL执行

DDL执行通过QueryExecution的子类DataDefinitionExecution实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void start()
{
try {
// transition to running
if (!stateMachine.transitionToRunning()) {
// query already running or finished
return;
}
// DataDefinitionExecution直接执行,不需要经过执行计划构建(planning)和查询调度(STARTING)。
ListenableFuture<Void> future = task.execute(statement, stateMachine, parameters, warningCollector);
Futures.addCallback(future, new FutureCallback<>()
{
@Override
public void onSuccess(@Nullable Void result)
{
stateMachine.transitionToFinishing();
}

@Override
public void onFailure(Throwable throwable)
{
fail(throwable);
}
}, directExecutor());
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
}
}

DDL执行较为简单,通过内部的DataDefinitionTask执行,一般都是通过Metadata接口进行操作。Metadata提供了对元数据的操作API,其实现基于Connector的ConnectorMetadata实现提供。对于部分Task(例如createTable),也会调用Analyzer进行分析。

flowchart LR

Metadata --> MetadataManager
MetadataManager --> CatalogMetadata
CatalogMetadata --> ConnectorMetadata

sql执行

sql执行是通过QueryExecution的子类SqlQueryExecution实现的。

sql执行前的语法树分析

在SqlQueryExecution的构造器中会通过Analyzer分析语法树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// io.trino.execution.SqlQueryExecution
public SqlQueryExecution(){
... ...
this.analysis = analyze(preparedQuery, stateMachine, warningCollector, analyzerFactory);
... ...
}
private static Analysis analyze(
PreparedQuery preparedQuery,
QueryStateMachine stateMachine,
WarningCollector warningCollector,
AnalyzerFactory analyzerFactory)
{
stateMachine.beginAnalysis(); // 开始为Analysis计时

requireNonNull(preparedQuery, "preparedQuery is null");
// 创建分析器
Analyzer analyzer = analyzerFactory.createAnalyzer(
stateMachine.getSession(),
preparedQuery.getParameters(),
bindParameters(preparedQuery.getStatement(), preparedQuery.getParameters()),
warningCollector);
Analysis analysis;
try {
// 执行分析
analysis = analyzer.analyze(preparedQuery.getStatement());
}
catch (StackOverflowError e) {
throw new TrinoException(STACK_OVERFLOW, "statement is too large (stack overflow during analysis)", e);
}

stateMachine.setUpdateType(analysis.getUpdateType());
stateMachine.setReferencedTables(analysis.getReferencedTables());
stateMachine.setRoutines(analysis.getRoutines());

stateMachine.endAnalysis();
// analysis中存储着分析的结果
return analysis;
}

下面来关注下analyzer中做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Analysis analyze(Statement statement, QueryType queryType)
{
// 重写部分语句
Statement rewrittenStatement = statementRewrite.rewrite(analyzerFactory, session, statement, parameters, parameterLookup, warningCollector);
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, queryType);
StatementAnalyzer analyzer = statementAnalyzerFactory.createStatementAnalyzer(analysis, session, warningCollector, CorrelationSupport.ALLOWED);
analyzer.analyze(rewrittenStatement, Optional.empty());

// check column access permissions for each table
analysis.getTableColumnReferences().forEach((accessControlInfo, tableColumnReferences) ->
tableColumnReferences.forEach((tableName, columns) ->
accessControlInfo.getAccessControl().checkCanSelectFromColumns(
accessControlInfo.getSecurityContext(session.getRequiredTransactionId(), session.getQueryId()),
tableName,
columns)));
return analysis;
}

statementRewrite.Rewrite接口的rewrite方法会重写部分语句(例如将show tables命令,改为从元信息表information_schema.tables中查询)。在Rewrite接口的每个实现类中,都有AstVisitor的子类。Rewrite接口的Rewrite方法实际上是通过遍历语法树的visitor实现的。

classDiagram
    class AstVisitor
    DescribeInputRewrite..>AstVisitor
    ShowQueriesRewrite ..>AstVisitor
    DescribeOutputRewrite ..>AstVisitor
    ExplainRewrite..>AstVisitor
    ShowStatsRewrite..>AstVisitor
    class Rewrite{
        <<Interface>>
        + rewrite(AnalyzerFactory analyzerFactory, Session session, Statement node, List[Expression] parameters, Map[NodeRef[Parameter], Expression] parameterLookup,WarningCollector warningCollector): Statement
    }
    DescribeInputRewrite --|>Rewrite
    ShowQueriesRewrite --|>Rewrite
    DescribeOutputRewrite --|>Rewrite
    ExplainRewrite--|>Rewrite
    ShowStatsRewrite--|>Rewrite

重写完的Statement将通过StatementAnalyzer进一步分析。在StatementAnalyzer分析中会用到Metadata。

StatementAnalyzer对每个Statement实现子类分析后会得到Scope.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Scope
{
private final Optional<Scope> parent; // 父scope
private final boolean queryBoundary; // 如果没有父scope,为true
private final RelationId relationId; // 关系ID
private final RelationType relation; // 关系类型
private final Map<String, WithQuery> namedQueries; // 包含的命名With查询
}
// RelationType 描述了一个relation的类型
public class RelationType
{
private final List<Field> visibleFields; // 可见字段
private final List<Field> allFields; // 所有字段

private final Map<Field, Integer> fieldIndexes; // 字段的索引
}
// Field 描述了一个列
public class Field
{
private final Optional<QualifiedObjectName> originTable;
private final Optional<String> originColumnName;
private final Optional<QualifiedName> relationAlias;
private final Optional<String> name;
private final Type type;
private final boolean hidden;
private final boolean aliased;
}

对于Select和Show 语句,返回的是结果视图结构,对于insert,delete和create table as select语句返回的字段只有一列(语句操作的行数)。

此外在StatementAnalyzer中还会调用AggregationAnalyzer和ExpressionAnalyzer的方法。

  • AggregationAnalyzer会分析表达式和group的关系
  • ExpressionAnalyzer会返回表达式的返回值类型。

sql执行计划入口

Sql查询的入口是start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// io.trino.execution.SqlQueryExecution
@Override
public void start(){
// QueryExecution 运行在DispatchExecutor.getExecutor上,默认线程名是dispatcher-query-%s
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try {
if (!stateMachine.transitionToPlanning()) { // 尝试更新状态为构建执行计划中
// query already started or finished
return;
}
... ...

try {
PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
// 注册动态过滤,见文档 https://trino.io/docs/current/admin/dynamic-filtering.html
registerDynamicFilteringQuery(plan);
planDistribution(plan);
}
finally {
synchronized (this) {
planningThread.set(null);
// Clear the interrupted flag in case there was a race condition where
// the planning thread was interrupted right after planning completes above
Thread.interrupted();
}
}
... ...
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
}
}
}

生成sql执行计划

SqlQueryExecution通过planQuery(),生成Query的执行计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// io.trino.execution.SqlQueryExecution
private PlanRoot planQuery()
{
try {
return doPlanQuery();
}
catch (StackOverflowError e) {
throw new TrinoException(STACK_OVERFLOW, "statement is too large (stack overflow during analysis)", e);
}
}
private PlanRoot doPlanQuery()
{
// 用于分配plan节点的ID,从0开始
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
// 生成逻辑计划
LogicalPlanner logicalPlanner = new LogicalPlanner(... ...);
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);

// fragment the plan
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, stateMachine.getWarningCollector());

// 抽取输入
List<Input> inputs = new InputExtractor(plannerContext.getMetadata(), stateMachine.getSession()).extractInputs(fragmentedPlan);
stateMachine.setInputs(inputs);
// 设置输出
stateMachine.setOutput(analysis.getTarget());
// 如果是 explain语句,执行计划需要统计 task执行信息
boolean explainAnalyze = analysis.getStatement() instanceof ExplainAnalyze;
return new PlanRoot(fragmentedPlan, !explainAnalyze);
}

逻辑计划

LogicalPlanner类会根据分析后的SQL语句,生成逻辑执行计划Plan。逻辑执行计划是一个有向图,图中的每个节点都是一个PlanNode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class PlanNode
{
private final PlanNodeId id;

protected PlanNode(PlanNodeId id)
{
requireNonNull(id, "id is null");
this.id = id;
}

@JsonProperty("id")
public PlanNodeId getId()
{
return id;
}
// planNode的输入
public abstract List<PlanNode> getSources();

// planNode输出
public abstract List<Symbol> getOutputSymbols();
// 替换子节点,生成新节点
public abstract PlanNode replaceChildren(List<PlanNode> newChildren);
// 访问者模式
public <R, C> R accept(PlanVisitor<R, C> visitor, C context)
{
return visitor.visitPlan(this, context);
}
}

可以看到,每个planNode都有输入和输出,如果将输入和输出的planNode分别一一对应连接起来就构成了一个有向图。planNode的所有实现类都在io.trino.sql.planner.plan包下。这里就不一一赘述了。

下面来看下逻辑计划是如何生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
{
// 生成逻辑执行计划
PlanNode root = planStatement(analysis, analysis.getStatement());
... ...
// 快速验证逻辑执行计划
planSanityChecker.validateIntermediatePlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);

TableStatsProvider tableStatsProvider = new CachingTableStatsProvider(metadata, session);

if (stage.ordinal() >= OPTIMIZED.ordinal()) {
for (PlanOptimizer optimizer : planOptimizers) {
// 优化执行计划
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator, warningCollector, tableStatsProvider);
requireNonNull(root, format("%s returned a null plan", optimizer.getClass().getName()));
... ...
}
}
}

if (stage.ordinal() >= OPTIMIZED_AND_VALIDATED.ordinal()) {
// make sure we produce a valid plan after optimizations run. This is mainly to catch programming errors
// 验证最终的执行计划
planSanityChecker.validateFinalPlan(root, session, plannerContext, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
}

TypeProvider types = symbolAllocator.getTypes();

StatsAndCosts statsAndCosts = StatsAndCosts.empty();
if (collectPlanStatistics) {
// 收集统计和开销
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types, tableStatsProvider);
CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, Optional.empty(), session, types);
statsAndCosts = StatsAndCosts.create(root, statsProvider, costProvider);
}
return new Plan(root, types, statsAndCosts);
}

生成逻辑计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// io.trino.sql.planner.LogicalPlanner
public PlanNode planStatement(Analysis analysis, Statement statement)
{
if ((statement instanceof CreateTableAsSelect && analysis.getCreate().orElseThrow().isCreateTableAsSelectNoOp()) ||
statement instanceof RefreshMaterializedView && analysis.isSkipMaterializedViewRefresh()) {
// 对于CreateTableAsSelect和RefreshMaterializedView的特殊处理
Symbol symbol = symbolAllocator.newSymbol("rows", BIGINT);
// 行数没有变化
PlanNode source = new ValuesNode(idAllocator.getNextId(), ImmutableList.of(symbol), ImmutableList.of(new Row(ImmutableList.of(new GenericLiteral("BIGINT", "0")))));
return new OutputNode(idAllocator.getNextId(), source, ImmutableList.of("rows"), ImmutableList.of(symbol));
}
return createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);
}
// 根据SQL语句类型的不同,走不同的创建执行计划分支。
private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement)
{
if (statement instanceof CreateTableAsSelect) {
if (analysis.getCreate().orElseThrow().isCreateTableAsSelectNoOp()) {
throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE IF NOT EXISTS is not supported in this context " + statement.getClass().getSimpleName());
}
// 在外层包装一个outputNode
return createTableCreationPlan(analysis, ((CreateTableAsSelect) statement).getQuery());
}
if (statement instanceof Analyze) {
return createAnalyzePlan(analysis, (Analyze) statement);
}
if (statement instanceof Insert) {
checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
return createInsertPlan(analysis, (Insert) statement);
}
if (statement instanceof RefreshMaterializedView) {
return createRefreshMaterializedViewPlan(analysis);
}
if (statement instanceof Delete) {
return createDeletePlan(analysis, (Delete) statement);
}
if (statement instanceof Update) {
return createUpdatePlan(analysis, (Update) statement);
}
if (statement instanceof Merge) {
return createMergePlan(analysis, (Merge) statement);
}
if (statement instanceof Query) {
return createRelationPlan(analysis, (Query) statement);
}
if (statement instanceof ExplainAnalyze) {
return createExplainAnalyzePlan(analysis, (ExplainAnalyze) statement);
}
if (statement instanceof TableExecute) {
return createTableExecutePlan(analysis, (TableExecute) statement);
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName());
}

对于上面的分支,我们主要分析下Query对应的createRelationPlan分支。

1
2
3
4
5
6
7
8
9
// io.trino.sql.planner.LogicalPlanner
private RelationPlan createRelationPlan(Analysis analysis, Query query)
{
return getRelationPlanner(analysis).process(query, null);
}
private RelationPlanner getRelationPlanner(Analysis analysis)
{
return new RelationPlanner(analysis, symbolAllocator, idAllocator, buildLambdaDeclarationToSymbolMap(analysis, symbolAllocator), plannerContext, Optional.empty(), session, ImmutableMap.of());
}

RelationPlanner继承自AstVisitor,覆写了下面几个方法:

  • visitTable:
  • visitTableFunctionInvocation
  • visitAliasedRelation
  • visitPatternRecognitionRelation
  • visitSampledRelation
  • visitLateral
  • visitJoin
  • visitTableSubquery
  • visitQuery:使用QueryPlanner分析
  • visitQuerySpecification:使用QueryPlanner分析
  • visitValues
  • visitUnnest
  • visitUnion
  • visitIntersect
  • visitExcept
  • visitSubqueryExpression

除了最后的SubqueryExpression(里面包含了Query节点),其他类型都是Relation的子类。

以简单SELECT * FROM system.runtime.nodes为例:

该查询会通过RelationPlanner.visitTable方法处理,生成如下逻辑计划:

flowchart TD
    TableScanNode --> OutputNode

对于带join的查询:

1
2
3
4
5
6
7
8
9
10
select 
t.*,n.*
from
system.runtime.tasks t
left join
system.runtime.nodes n
on
t.node_id = n.node_id
where
t.state = 'FINISHED'

会生成如下逻辑计划:

flowchart LR
    ProjectNode1[ProjectNode]-->OutputNode
    ProjectNode2[ProjectNode] --> ProjectNode1
    ProjectNode3[ProjectNode] --> ProjectNode2
    ProjectNode4[ProjectNode] --> ProjectNode3
    FilterNode --> ProjectNode4
    JoinNode --> FilterNode
   
    ProjectNodeL1[ProjectNode]-->|left|JoinNode
    ProjectNodeL2[ProjectNode] --> ProjectNodeL1
    TableScanNodeL[TableScanNode] -->ProjectNodeL2

       
    ProjectNodeR1[ProjectNode]-->|right|JoinNode
    ProjectNodeR2[ProjectNode] --> ProjectNodeR1
    TableScanNodeR[TableScanNode] -->ProjectNodeR2

逻辑计划优化

在生成逻辑计划后,会遍历所有的PlanOptimizer来优化逻辑执行计划。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface PlanOptimizer
{
PlanNode optimize(
PlanNode plan,
Session session,
TypeProvider types,
SymbolAllocator symbolAllocator,
PlanNodeIdAllocator idAllocator,
WarningCollector warningCollector,
TableStatsProvider tableStatsProvider);
}

// io.trino.server.CoordinatorModule 中绑定PlanOptimizersFactory
binder.bind(PlanOptimizersFactory.class).to(PlanOptimizers.class).in(Scopes.SINGLETON);

Trino支持的优化器从类型上来看有两种,Rule-Based和Cost-Based。当前Trino的Cost-Based优化器支持并不全面.本篇主要介绍Rule-Based优化器的架构,对于Cost-Based优化器将在后面的文章中介绍。

Rule-Based的优化器从实现上分为两种:

  • io.trino.sql.planner.plan.SimplePlanRewriter: PlanOptimizer内置一个SimplePlanRewriter,SimplePlanRewriter继承自PlanVisitor,通过一次遍历(大多数情况下是一次遍历)重写Plan(例如PredicatePushDown 谓词下推)。
  • io.trino.sql.planner.iterative.Rule: PlanOptimizer是IterativeOptimizer,支持传入多个Rule。Rule中包含Pattern和查询match上Pattern后的重写逻辑(例如PruneProjectColumns 删除无用投影字段 )。

IterativeOptimizer 驱动rule。IterativeOptimizer内部存储了Rule列表。

  • 通过递归的方式(类似深度优先遍历)去驱动Rule
  • 先优化自己,然后再优化孩子节点
  • 如果孩子节点发生了变化,会再次尝试对自身进行优化。
  • 如果节点不再发生变化则返回。
  • 支持超时检测。

IterativeOptimizer的驱动时序图如下:

sequenceDiagram
    IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreNode: 优化自己
    IterativeOptimizer#exploreNode->>checkTimeoutNotExhausted:检测超时
    checkTimeoutNotExhausted-->>IterativeOptimizer#exploreNode: 未超时
    loop each rule
        IterativeOptimizer#exploreNode-->Rule: transform plan
        Rule-->>IterativeOptimizer#exploreNode: optimized
    end
    IterativeOptimizer#exploreNode-->> IterativeOptimizer#exploreGroup: 自己优化完成
    break Children not change or self not change
        IterativeOptimizer#exploreGroup -->> IterativeOptimizer#exploreChildren: 优化孩子节点
        loop each child Node
            IterativeOptimizer#exploreChildren-->>IterativeOptimizer#exploreGroup: 遍历优化child节点
            IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreChildren: child节点优化完成
        end 
        IterativeOptimizer#exploreChildren -->> IterativeOptimizer#exploreGroup: 所有孩子节点优化完成
        IterativeOptimizer#exploreGroup -->>IterativeOptimizer#exploreNode: 优化自己
    end

接下来我们看看Rule的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Rule<T>
{
Pattern<T> getPattern();

default boolean isEnabled(Session session)
{
return true;
}

Result apply(T node, Captures captures, Context context);
}
public abstract class Pattern<T>
{
private final Optional<Pattern<?>> previous;
public abstract <C> Stream<Match> accept(Object object, Captures captures, C context);
... ...
}
public class Captures
{
private static final Captures NIL = new Captures(null, null, null);

private final Capture<?> capture;
private final Object value;
private final Captures tail;
}
  • pattern是一个链表结构,previous指针指向 上一个Pattern。
  • pattern使用accept进行匹配,匹配时的入参是 Node和Captures,返回的参数是Node和Captures
    • Node 是指plan节点。
  • Captures是一个链表,内部包含了每个pattern节点捕获的信息和一个尾指针。
flowchart
    subgraph input
        nodeI(PlanNode)
        CapturesI(Captures)
    end
    input -->|accpet|pattern
    subgraph pattern
    a(pattern A) --> null
    b(pattern B) -->|previous|a
    c(pattern C) -->|previous|b
    end 

    pattern -->Match
    subgraph Match
        CapturesO(Captures.NIL) -->|tail| CapturesOa(Captures A)
        CapturesOa -->|tail| CapturesOb(Captures B)
        CapturesOb -->|tail| CapturesOc(Captures C)
    end

对于上面带join的查询:

1
2
3
4
5
6
7
8
9
10
select 
t.*,n.*
from
system.runtime.tasks t
left join
system.runtime.nodes n
on
t.node_id = n.node_id
where
t.state = 'FINISHED'

优化后的语法树节点如下:

flowchart
    JoinNode[JoinNode,DistributionType=partitioned]-->OutputNode
    ExchangeNode1[ExchangeNode,scope=remote,type=repartition] -->|left|JoinNode
    ExchangeNode2[ExchangeNode,scope=local,type=repartition] -->|right|JoinNode
    
    ProjectNode1[ProjectNode]-->ExchangeNode1
    FilterNode1[FilterNode]-->ProjectNode1
    TableScanNode1[TableScanNode]-->FilterNode1

    ExchangeNode3[ExchangeNode,scope=remote,type=repartition]-->ExchangeNode2
    ProjectNode2[ProjectNode]-->ExchangeNode3
    TableScanNode2[TableScanNode]-->ProjectNode2

值得注意的是Exchange节点是通过AddExchanges等优化规则加入语法树节点的,后续将通过Exchange节点拆分执行计划。

Trino目前支持的Join有两种,partitioned(Hash join)和replicated(broadcast join)

逻辑计划分段

执行计划分段的实现方法是PlanFragmenter#createSubPlans。PlanFragmenter会将PlanNode树构建为SubPlan树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//io.trino.sql.planner.SubPlan
public class SubPlan // 子计划
{
private final PlanFragment fragment; // 计划片段
private final List<SubPlan> children; // 子节点
}
// 计划片段
public class PlanFragment
{
private final PlanFragmentId id; // 计划片段ID
private final PlanNode root; // 片段内部PlanNode的根节点
private final Map<Symbol, Type> symbols;
private final PartitioningHandle partitioning;
private final List<PlanNodeId> partitionedSources;
private final Set<PlanNodeId> partitionedSourcesSet;
private final List<Type> types;
private final Set<PlanNode> partitionedSourceNodes;
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StatsAndCosts statsAndCosts;
private final List<CatalogProperties> activeCatalogs;
private final Optional<String> jsonRepresentation;
}

PlanFragmenter中内置了一个Fragmenter,Fragmenter是SimplePlanRewriter的实现类。主要的片段拆分逻辑依靠ExchangeNode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
if (exchange.getScope() != REMOTE) { // 本地Exchange不拆分片段
return context.defaultRewrite(exchange, context.get());
}

PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();

if (exchange.getType() == ExchangeNode.Type.GATHER) {
// 一些聚合操作会有GATHER ExchangeNode,例如topN, union
context.get().setSingleNodeDistribution();
}
else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {
// 设置分区Handler
context.get().setDistribution(partitioningScheme.getPartitioning().getHandle(), metadata, session);
}

ImmutableList.Builder<FragmentProperties> childrenProperties = ImmutableList.builder();
ImmutableList.Builder<SubPlan> childrenBuilder = ImmutableList.builder();
for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) {
FragmentProperties childProperties = new FragmentProperties(partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)));
childrenProperties.add(childProperties);
// 将exchange节点的每个上游source递归处理,并生成SubPlan
childrenBuilder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context));
}

List<SubPlan> children = childrenBuilder.build();
context.get().addChildren(children);

List<PlanFragmentId> childrenIds = children.stream()
.map(SubPlan::getFragment)
.map(PlanFragment::getId)
.collect(toImmutableList());
// 将Subplan中的ExchangeNode及后续node断开,换成RemoteSourceNode
return new RemoteSourceNode(
exchange.getId(),
childrenIds,
exchange.getOutputSymbols(),
exchange.getOrderingScheme(),
exchange.getType(),
isWorkerCoordinatorBoundary(context.get(), childrenProperties.build()) ? getRetryPolicy(session) : RetryPolicy.NONE);
}

对于上面带join的查询:

1
2
3
4
5
6
7
8
9
10
select 
t.*,n.*
from
system.runtime.tasks t
left join
system.runtime.nodes n
on
t.node_id = n.node_id
where
t.state = 'FINISHED'

生成的Subplan结构如下:

flowchart
    subgraph subPlan0
    subgraph planFragment0
        OutputNode0(OutputNode)-->|source|JoinNode0(JoinNode)
        JoinNode0-->|left source|RemoteSourceNode0(RemoteSourceNode)
        JoinNode0-->|right source|ExchangeNode0(ExchangeNode0)
        ExchangeNode0-->|source|RemoteSourceNode01(RemoteSourceNode)
    end
    end
    subgraph subPlan1
    subgraph planFragment1
        ProjectNode1(ProjectNode)-->|source|FilterNode1(FilterNode)
        FilterNode1-->|source|TableScanNode1(TableScanNode,system.runtime.tasks)
    end
    end
    subgraph subPlan2
    subgraph planFragment2
        ProjectNode2(ProjectNode)-->|source|TableScanNode2(TableScanNode,system.runtime.nodes)
    end
    end
    RemoteSourceNode0-.->ProjectNode1
    RemoteSourceNode01-.->ProjectNode2

详细执行计划可以通过explain关键字返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Fragment 0 [HASH]
Output layout: [node_id, task_id, stage_id, query_id, state, splits, queued_splits, running_splits, completed_splits, split_scheduled_time_ms, split_cpu_time_ms, split_blocked_time_ms, raw_input_bytes, raw_input_rows, processed_input_bytes, processed_input_rows, output_bytes, output_rows, physical_input_bytes, physical_written_bytes, created, start, last_heartbeat, end, node_id_0, http_uri, node_version, coordinator, state_1]
Output partitioning: SINGLE []
Output[columnNames = [node_id, task_id, stage_id, query_id, state, splits, queued_splits, running_splits, completed_splits, split_scheduled_time_ms, split_cpu_time_ms, split_blocked_time_ms, raw_input_bytes, raw_input_rows, processed_input_bytes, processed_input_rows, output_bytes, output_rows, physical_input_bytes, physical_written_bytes, created, start, last_heartbeat, end, node_id, http_uri, node_version, coordinator, state]]
│ Layout: [node_id:varchar, task_id:varchar, stage_id:varchar, query_id:varchar, state:varchar, splits:bigint, queued_splits:bigint, running_splits:bigint, completed_splits:bigint, split_scheduled_time_ms:bigint, split_cpu_time_ms:bigint, split_blocked_time_ms:bigint, raw_input_bytes:bigint, raw_input_rows:bigint, processed_input_bytes:bigint, processed_input_rows:bigint, output_bytes:bigint, output_rows:bigint, physical_input_bytes:bigint, physical_written_bytes:bigint, created:timestamp(3) with time zone, start:timestamp(3) with time zone, last_heartbeat:timestamp(3) with time zone, end:timestamp(3) with time zone, node_id_0:varchar, http_uri:varchar, node_version:varchar, coordinator:boolean, state_1:varchar]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ node_id := node_id_0
│ state := state_1
└─ LeftJoin[criteria = (""node_id"" = ""node_id_0""), hash = [$hashvalue, $hashvalue_3], distribution = PARTITIONED]
│ Layout: [node_id:varchar, task_id:varchar, stage_id:varchar, query_id:varchar, state:varchar, splits:bigint, queued_splits:bigint, running_splits:bigint, completed_splits:bigint, split_scheduled_time_ms:bigint, split_cpu_time_ms:bigint, split_blocked_time_ms:bigint, raw_input_bytes:bigint, raw_input_rows:bigint, processed_input_bytes:bigint, processed_input_rows:bigint, output_bytes:bigint, output_rows:bigint, physical_input_bytes:bigint, physical_written_bytes:bigint, created:timestamp(3) with time zone, start:timestamp(3) with time zone, last_heartbeat:timestamp(3) with time zone, end:timestamp(3) with time zone, node_id_0:varchar, http_uri:varchar, node_version:varchar, coordinator:boolean, state_1:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ Distribution: PARTITIONED
├─ RemoteSource[sourceFragmentIds = [1]]
│ Layout: [node_id:varchar, task_id:varchar, stage_id:varchar, query_id:varchar, state:varchar, splits:bigint, queued_splits:bigint, running_splits:bigint, completed_splits:bigint, split_scheduled_time_ms:bigint, split_cpu_time_ms:bigint, split_blocked_time_ms:bigint, raw_input_bytes:bigint, raw_input_rows:bigint, processed_input_bytes:bigint, processed_input_rows:bigint, output_bytes:bigint, output_rows:bigint, physical_input_bytes:bigint, physical_written_bytes:bigint, created:timestamp(3) with time zone, start:timestamp(3) with time zone, last_heartbeat:timestamp(3) with time zone, end:timestamp(3) with time zone, $hashvalue:bigint]
└─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_3], arguments = [""node_id_0""]]
│ Layout: [node_id_0:varchar, http_uri:varchar, node_version:varchar, coordinator:boolean, state_1:varchar, $hashvalue_3:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [node_id_0:varchar, http_uri:varchar, node_version:varchar, coordinator:boolean, state_1:varchar, $hashvalue_4:bigint]

Fragment 1 [SOURCE]
Output layout: [node_id, task_id, stage_id, query_id, state, splits, queued_splits, running_splits, completed_splits, split_scheduled_time_ms, split_cpu_time_ms, split_blocked_time_ms, raw_input_bytes, raw_input_rows, processed_input_bytes, processed_input_rows, output_bytes, output_rows, physical_input_bytes, physical_written_bytes, created, start, last_heartbeat, end, $hashvalue_2]
Output partitioning: HASH [node_id][$hashvalue_2]
ScanFilterProject[table = $system@system:runtime.tasks, filterPredicate = (""state"" = VARCHAR 'FINISHED')]
Layout: [node_id:varchar, task_id:varchar, stage_id:varchar, query_id:varchar, state:varchar, splits:bigint, queued_splits:bigint, running_splits:bigint, completed_splits:bigint, split_scheduled_time_ms:bigint, split_cpu_time_ms:bigint, split_blocked_time_ms:bigint, raw_input_bytes:bigint, raw_input_rows:bigint, processed_input_bytes:bigint, processed_input_rows:bigint, output_bytes:bigint, output_rows:bigint, physical_input_bytes:bigint, physical_written_bytes:bigint, created:timestamp(3) with time zone, start:timestamp(3) with time zone, last_heartbeat:timestamp(3) with time zone, end:timestamp(3) with time zone, $hashvalue_2:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
$hashvalue_2 := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""node_id""), 0))
splits := splits
query_id := query_id
raw_input_rows := raw_input_rows
stage_id := stage_id
created := created
processed_input_bytes := processed_input_bytes
processed_input_rows := processed_input_rows
start := start
raw_input_bytes := raw_input_bytes
task_id := task_id
physical_written_bytes := physical_written_bytes
output_rows := output_rows
last_heartbeat := last_heartbeat
completed_splits := completed_splits
split_blocked_time_ms := split_blocked_time_ms
running_splits := running_splits
split_scheduled_time_ms := split_scheduled_time_ms
queued_splits := queued_splits
split_cpu_time_ms := split_cpu_time_ms
output_bytes := output_bytes
end := end
state := state
physical_input_bytes := physical_input_bytes
node_id := node_id

Fragment 2 [SOURCE]
Output layout: [node_id_0, http_uri, node_version, coordinator, state_1, $hashvalue_5]
Output partitioning: HASH [node_id_0][$hashvalue_5]
ScanProject[table = $system@system:runtime.nodes]
Layout: [node_id_0:varchar, http_uri:varchar, node_version:varchar, coordinator:boolean, state_1:varchar, $hashvalue_5:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
$hashvalue_5 := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""node_id_0""), 0))
http_uri := http_uri
coordinator := coordinator
node_id_0 := node_id
node_version := node_version
state_1 := state

PartitioningHandle

PartitioningHandle定义了执行计划中的分区状况。例如:

  1. 在上文中介绍的AddExchanges优化规则中,会设置不同分区的ExchangeNode。
  2. 在QueryPlaner中visitMerge时,会在MergeWriterNode中设置MergePartitioningHandle。
  3. 在上文介绍的Fragmenter中,会在访问不同planNode时,设置上下文的PartitioningHandle,然后在buildFragment时,将PartitioningHandle设置到PlanFragment中。例如在处理TableScan时, TableScan会从Connector中获取ConnectorPartitioningHandle的实现类。
1
2
3
4
5
6
7
8
9
10
11
12
@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProperties> context)
{
PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable())
.getTablePartitioning()
.filter(value -> node.isUseConnectorNodePartitioning())
.map(TablePartitioning::getPartitioningHandle)
.orElse(SOURCE_DISTRIBUTION);

context.get().addSourceDistribution(node.getId(), partitioning, metadata, session);
return context.defaultRewrite(node, context.get());
}

SystemPartitioningHandle

SystemPartitioningHandle是Trino系统默认的分区方式。有5种内置分区类型:

  • SINGLE: 在单个节点上执行,通常是用于汇总结果。
  • FIXED: 将数据分散到固定的多个节点上执行。
  • SOURCE: 一般是用于从数据源读取表
  • COORDINATOR_ONLY: 一般只在COORDINATOR上执行。
  • ARBITRARY: 表示无限制,动态扩展的

在执行计划中会有类似的输出 Fragment 0 [HASH],描述Fragment的分区方式,FIXED和ARBITRARY方式会打印使用的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public final class SystemPartitioningHandle
implements ConnectorPartitioningHandle
{
// 分区类型
private final SystemPartitioning partitioning;
// 分区函数
private final SystemPartitionFunction function;

enum SystemPartitioning
{
SINGLE,
FIXED,
SOURCE,
COORDINATOR_ONLY,
ARBITRARY
}

public enum SystemPartitionFunction
{
SINGLE {
@Override
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, BlockTypeOperators blockTypeOperators)
{
checkArgument(bucketCount == 1, "Single partition can only have one bucket");
return new SingleBucketFunction();
}
},
HASH {
@Override
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, BlockTypeOperators blockTypeOperators)
{
if (isHashPrecomputed) {
return new HashBucketFunction(new PrecomputedHashGenerator(0), bucketCount);
}

return new HashBucketFunction(InterpretedHashGenerator.createPositionalWithTypes(partitionChannelTypes, blockTypeOperators), bucketCount);
}
},
ROUND_ROBIN {
@Override
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, BlockTypeOperators blockTypeOperators)
{
return new RoundRobinBucketFunction(bucketCount);
}
},
BROADCAST {
@Override
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, BlockTypeOperators blockTypeOperators)
{
throw new UnsupportedOperationException();
}
},
UNKNOWN {
@Override
public BucketFunction createBucketFunction(List<Type> partitionChannelTypes, boolean isHashPrecomputed, int bucketCount, BlockTypeOperators blockTypeOperators)
{
throw new UnsupportedOperationException();
}
};

public abstract BucketFunction createBucketFunction(List<Type> partitionChannelTypes,
boolean isHashPrecomputed,
int bucketCount,
BlockTypeOperators blockTypeOperators);

private static class SingleBucketFunction
implements BucketFunction
{
@Override
public int getBucket(Page page, int position)
{
return 0;
}
}

public static class RoundRobinBucketFunction
implements BucketFunction
{
private final int bucketCount;
private int counter;

public RoundRobinBucketFunction(int bucketCount)
{
checkArgument(bucketCount > 0, "bucketCount must be at least 1");
this.bucketCount = bucketCount;
}

@Override
public int getBucket(Page page, int position)
{
int bucket = counter % bucketCount;
counter = (counter + 1) & 0x7fff_ffff;
return bucket;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("bucketCount", bucketCount)
.toString();
}
}
}
}

系统默认组合如下:

Function\PartitioningSINGLECOORDINATOR_ONLY
SINGLESINGLE_DISTRIBUTIONCOORDINATOR_DISTRIBUTION
Function\PartitioningFIXED
HASHFIXED_HASH_DISTRIBUTION
ROUND_ROBINFIXED_ARBITRARY_DISTRIBUTION
BROADCASTFIXED_BROADCAST_DISTRIBUTION
UNKNOWNFIXED_PASSTHROUGH_DISTRIBUTION
Function\PartitioningARBITRARY
ROUND_ROBINSCALED_WRITER_ROUND_ROBIN_DISTRIBUTION
UNKNOWNARBITRARY_DISTRIBUTION
Function\PartitioningSOURCE
UNKNOWNSOURCE_DISTRIBUTION