Trino源码学习-执行计划生成
Trino源码学习-执行计划生成
上篇分析了trino提交查询部分的源码,本篇来分析下,构建执行计划部分的源码。
DDL执行
DDL执行通过QueryExecution的子类DataDefinitionExecution实现。
1 | public void start() |
DDL执行较为简单,通过内部的DataDefinitionTask执行,一般都是通过Metadata接口进行操作。Metadata提供了对元数据的操作API,其实现基于Connector的ConnectorMetadata实现提供。对于部分Task(例如createTable),也会调用Analyzer进行分析。
flowchart LR Metadata --> MetadataManager MetadataManager --> CatalogMetadata CatalogMetadata --> ConnectorMetadata## sql执行 sql执行是通过QueryExecution的子类SqlQueryExecution实现的。 ### sql执行前的语法树分析 在SqlQueryExecution的构造器中会通过Analyzer分析语法树。
1 | // io.trino.execution.SqlQueryExecution |
1 | public Analysis analyze(Statement statement, QueryType queryType) |
classDiagram class AstVisitor DescribeInputRewrite..>AstVisitor ShowQueriesRewrite ..>AstVisitor DescribeOutputRewrite ..>AstVisitor ExplainRewrite..>AstVisitor ShowStatsRewrite..>AstVisitor class Rewrite{ <<Interface>> + rewrite(AnalyzerFactory analyzerFactory, Session session, Statement node, List[Expression] parameters, Map[NodeRef[Parameter], Expression] parameterLookup,WarningCollector warningCollector): Statement } DescribeInputRewrite --|>Rewrite ShowQueriesRewrite --|>Rewrite DescribeOutputRewrite --|>Rewrite ExplainRewrite--|>Rewrite ShowStatsRewrite--|>Rewrite
重写完的Statement将通过StatementAnalyzer进一步分析。在StatementAnalyzer分析中会用到Metadata。
StatementAnalyzer对每个Statement实现子类分析后会得到Scope.
1 | public class Scope |
对于Select和Show 语句,返回的是结果视图结构,对于insert,delete和create table as select语句返回的字段只有一列(语句操作的行数)。
此外在StatementAnalyzer中还会调用AggregationAnalyzer和ExpressionAnalyzer的方法。
- AggregationAnalyzer会分析表达式和group的关系
- ExpressionAnalyzer会返回表达式的返回值类型。
sql执行计划入口
Sql查询的入口是start方法
1 | // io.trino.execution.SqlQueryExecution |
生成sql执行计划
SqlQueryExecution通过planQuery(),生成Query的执行计划。
1 | // io.trino.execution.SqlQueryExecution |
逻辑计划
LogicalPlanner类会根据分析后的SQL语句,生成逻辑执行计划Plan。逻辑执行计划是一个有向图,图中的每个节点都是一个PlanNode。
1 | public abstract class PlanNode |
可以看到,每个planNode都有输入和输出,如果将输入和输出的planNode分别一一对应连接起来就构成了一个有向图。planNode的所有实现类都在io.trino.sql.planner.plan
包下。这里就不一一赘述了。
下面来看下逻辑计划是如何生成的。
1 | public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics) |
生成逻辑计划
1 | // io.trino.sql.planner.LogicalPlanner |
对于上面的分支,我们主要分析下Query对应的createRelationPlan分支。
1 | // io.trino.sql.planner.LogicalPlanner |
RelationPlanner继承自AstVisitor,覆写了下面几个方法:
- visitTable:
- visitTableFunctionInvocation
- visitAliasedRelation
- visitPatternRecognitionRelation
- visitSampledRelation
- visitLateral
- visitJoin
- visitTableSubquery
- visitQuery:使用QueryPlanner分析
- visitQuerySpecification:使用QueryPlanner分析
- visitValues
- visitUnnest
- visitUnion
- visitIntersect
- visitExcept
- visitSubqueryExpression
除了最后的SubqueryExpression(里面包含了Query节点),其他类型都是Relation的子类。
以简单SELECT * FROM system.runtime.nodes
为例:
该查询会通过RelationPlanner.visitTable方法处理,生成如下逻辑计划:
flowchart TD TableScanNode --> OutputNode
对于带join的查询:
1 | select |
会生成如下逻辑计划:
flowchart LR ProjectNode1[ProjectNode]-->OutputNode ProjectNode2[ProjectNode] --> ProjectNode1 ProjectNode3[ProjectNode] --> ProjectNode2 ProjectNode4[ProjectNode] --> ProjectNode3 FilterNode --> ProjectNode4 JoinNode --> FilterNode ProjectNodeL1[ProjectNode]-->|left|JoinNode ProjectNodeL2[ProjectNode] --> ProjectNodeL1 TableScanNodeL[TableScanNode] -->ProjectNodeL2 ProjectNodeR1[ProjectNode]-->|right|JoinNode ProjectNodeR2[ProjectNode] --> ProjectNodeR1 TableScanNodeR[TableScanNode] -->ProjectNodeR2
逻辑计划优化
在生成逻辑计划后,会遍历所有的PlanOptimizer来优化逻辑执行计划。
1 | public interface PlanOptimizer |
Trino支持的优化器从类型上来看有两种,Rule-Based和Cost-Based。当前Trino的Cost-Based优化器支持并不全面.本篇主要介绍Rule-Based优化器的架构,对于Cost-Based优化器将在后面的文章中介绍。
Rule-Based的优化器从实现上分为两种:
- io.trino.sql.planner.plan.SimplePlanRewriter: PlanOptimizer内置一个SimplePlanRewriter,SimplePlanRewriter继承自PlanVisitor,通过一次遍历(大多数情况下是一次遍历)重写Plan(例如PredicatePushDown 谓词下推)。
- io.trino.sql.planner.iterative.Rule: PlanOptimizer是IterativeOptimizer,支持传入多个Rule。Rule中包含Pattern和查询match上Pattern后的重写逻辑(例如PruneProjectColumns 删除无用投影字段 )。
IterativeOptimizer 驱动rule。IterativeOptimizer内部存储了Rule列表。
- 通过递归的方式(类似深度优先遍历)去驱动Rule
- 先优化自己,然后再优化孩子节点
- 如果孩子节点发生了变化,会再次尝试对自身进行优化。
- 如果节点不再发生变化则返回。
- 支持超时检测。
IterativeOptimizer的驱动时序图如下:
sequenceDiagram IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreNode: 优化自己 IterativeOptimizer#exploreNode->>checkTimeoutNotExhausted:检测超时 checkTimeoutNotExhausted-->>IterativeOptimizer#exploreNode: 未超时 loop each rule IterativeOptimizer#exploreNode-->Rule: transform plan Rule-->>IterativeOptimizer#exploreNode: optimized end IterativeOptimizer#exploreNode-->> IterativeOptimizer#exploreGroup: 自己优化完成 break Children not change or self not change IterativeOptimizer#exploreGroup -->> IterativeOptimizer#exploreChildren: 优化孩子节点 loop each child Node IterativeOptimizer#exploreChildren-->>IterativeOptimizer#exploreGroup: 遍历优化child节点 IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreChildren: child节点优化完成 end IterativeOptimizer#exploreChildren -->> IterativeOptimizer#exploreGroup: 所有孩子节点优化完成 IterativeOptimizer#exploreGroup -->>IterativeOptimizer#exploreNode: 优化自己 end
接下来我们看看Rule的实现:
1 | public interface Rule<T> |
- pattern是一个链表结构,previous指针指向 上一个Pattern。
- pattern使用accept进行匹配,匹配时的入参是 Node和Captures,返回的参数是Node和Captures
- Node 是指plan节点。
- Captures是一个链表,内部包含了每个pattern节点捕获的信息和一个尾指针。
flowchart subgraph input nodeI(PlanNode) CapturesI(Captures) end input -->|accpet|pattern subgraph pattern a(pattern A) --> null b(pattern B) -->|previous|a c(pattern C) -->|previous|b end pattern -->Match subgraph Match CapturesO(Captures.NIL) -->|tail| CapturesOa(Captures A) CapturesOa -->|tail| CapturesOb(Captures B) CapturesOb -->|tail| CapturesOc(Captures C) end
对于上面带join的查询:
1 | select |
优化后的语法树节点如下:
flowchart JoinNode[JoinNode,DistributionType=partitioned]-->OutputNode ExchangeNode1[ExchangeNode,scope=remote,type=repartition] -->|left|JoinNode ExchangeNode2[ExchangeNode,scope=local,type=repartition] -->|right|JoinNode ProjectNode1[ProjectNode]-->ExchangeNode1 FilterNode1[FilterNode]-->ProjectNode1 TableScanNode1[TableScanNode]-->FilterNode1 ExchangeNode3[ExchangeNode,scope=remote,type=repartition]-->ExchangeNode2 ProjectNode2[ProjectNode]-->ExchangeNode3 TableScanNode2[TableScanNode]-->ProjectNode2
值得注意的是Exchange节点是通过AddExchanges等优化规则加入语法树节点的,后续将通过Exchange节点拆分执行计划。
Trino目前支持的Join有两种,partitioned(Hash join)和replicated(broadcast join)
逻辑计划分段
执行计划分段的实现方法是PlanFragmenter#createSubPlans。PlanFragmenter会将PlanNode树构建为SubPlan树。
1 | //io.trino.sql.planner.SubPlan |
PlanFragmenter中内置了一个Fragmenter,Fragmenter是SimplePlanRewriter的实现类。主要的片段拆分逻辑依靠ExchangeNode。
1 | public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context) |
对于上面带join的查询:
1 | select |
生成的Subplan结构如下:
flowchart subgraph subPlan0 subgraph planFragment0 OutputNode0(OutputNode)-->|source|JoinNode0(JoinNode) JoinNode0-->|left source|RemoteSourceNode0(RemoteSourceNode) JoinNode0-->|right source|ExchangeNode0(ExchangeNode0) ExchangeNode0-->|source|RemoteSourceNode01(RemoteSourceNode) end end subgraph subPlan1 subgraph planFragment1 ProjectNode1(ProjectNode)-->|source|FilterNode1(FilterNode) FilterNode1-->|source|TableScanNode1(TableScanNode,system.runtime.tasks) end end subgraph subPlan2 subgraph planFragment2 ProjectNode2(ProjectNode)-->|source|TableScanNode2(TableScanNode,system.runtime.nodes) end end RemoteSourceNode0-.->ProjectNode1 RemoteSourceNode01-.->ProjectNode2
详细执行计划可以通过explain关键字返回:
1 | Fragment 0 [HASH] |
PartitioningHandle
PartitioningHandle定义了执行计划中的分区状况。例如:
- 在上文中介绍的AddExchanges优化规则中,会设置不同分区的ExchangeNode。
- 在QueryPlaner中visitMerge时,会在MergeWriterNode中设置MergePartitioningHandle。
- 在上文介绍的Fragmenter中,会在访问不同planNode时,设置上下文的PartitioningHandle,然后在buildFragment时,将PartitioningHandle设置到PlanFragment中。例如在处理TableScan时, TableScan会从Connector中获取ConnectorPartitioningHandle的实现类。
1 |
|
SystemPartitioningHandle
SystemPartitioningHandle是Trino系统默认的分区方式。有5种内置分区类型:
- SINGLE: 在单个节点上执行,通常是用于汇总结果。
- FIXED: 将数据分散到固定的多个节点上执行。
- SOURCE: 一般是用于从数据源读取表
- COORDINATOR_ONLY: 一般只在COORDINATOR上执行。
- ARBITRARY: 表示无限制,动态扩展的
在执行计划中会有类似的输出 Fragment 0 [HASH]
,描述Fragment的分区方式,FIXED和ARBITRARY方式会打印使用的函数。
1 | public final class SystemPartitioningHandle |
系统默认组合如下:
Function\Partitioning | SINGLE | COORDINATOR_ONLY |
---|---|---|
SINGLE | SINGLE_DISTRIBUTION | COORDINATOR_DISTRIBUTION |
Function\Partitioning | FIXED |
---|---|
HASH | FIXED_HASH_DISTRIBUTION |
ROUND_ROBIN | FIXED_ARBITRARY_DISTRIBUTION |
BROADCAST | FIXED_BROADCAST_DISTRIBUTION |
UNKNOWN | FIXED_PASSTHROUGH_DISTRIBUTION |
Function\Partitioning | ARBITRARY |
---|---|
ROUND_ROBIN | SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION |
UNKNOWN | ARBITRARY_DISTRIBUTION |
Function\Partitioning | SOURCE |
---|---|
UNKNOWN | SOURCE_DISTRIBUTION |