上篇 分析了SQL逻辑执行计划的生成,本篇将继续分析逻辑计划是如何在进行调度的。
查询调度 首先回顾下调度的入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void start () { try (SetThreadName ignored = new SetThreadName ("Query-%s" , stateMachine.getQueryId())) { try { if (!stateMachine.transitionToPlanning()) { return ; } ... ... try { PlanRoot plan = planQuery(); registerDynamicFilteringQuery(plan); planDistribution(plan); } finally { ... ... } tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId()); if (!stateMachine.transitionToStarting()) { return ; } QueryScheduler scheduler = queryScheduler.get(); if (!stateMachine.isDone()) { scheduler.start(); } } catch (Throwable e) { fail(e); throwIfInstanceOf(e, Error.class); } } }
构建查询调度器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private void planDistribution (PlanRoot plan) { if (stateMachine.isDone()) { return ; } PlanFragment rootFragment = plan.getRoot().getFragment(); stateMachine.setColumns( ((OutputNode) rootFragment.getRoot()).getColumnNames(), rootFragment.getTypes()); RetryPolicy retryPolicy = getRetryPolicy(getSession()); QueryScheduler scheduler; switch (retryPolicy) { case QUERY: case NONE: scheduler = new PipelinedQueryScheduler ( stateMachine, plan.getRoot(), nodePartitioningManager, nodeScheduler, remoteTaskFactory, plan.isSummarizeTaskInfos(), scheduleSplitBatchSize, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, executionPolicy, schedulerStats, dynamicFilterService, tableExecuteContextManager, plannerContext.getMetadata(), splitSourceFactory, coordinatorTaskManager); break ; case TASK: if (isFaultTolerantExecutionEventDriverSchedulerEnabled(stateMachine.getSession())) { scheduler = new EventDrivenFaultTolerantQueryScheduler (... ...); } else { scheduler = new FaultTolerantQueryScheduler (... ...) } break ; default : throw new IllegalArgumentException ("Unexpected retry policy: " + retryPolicy); } queryScheduler.set(scheduler); }
QueryScheduler QueryScheduler是Trino中的查询调度器,其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface QueryScheduler { void start () ; void cancelStage (StageId stageId) ; void failTask (TaskId taskId, Throwable failureCause) ; BasicStageStats getBasicStageStats () ; StageInfo getStageInfo () ; long getUserMemoryReservation () ; long getTotalMemoryReservation () ; Duration getTotalCpuTime () ; }
调度器将分段好的逻辑执行计划转化为分布式物理stage,然后用于执行。
flowchart LR
SubPlan-->|QueryScheduler|SqlStage
SqlStage-->|QueryScheduler|StageExecution NodeScheduler NodeScheduler是节点调度器,用于为各个Stage的task分配执行节点。在前面的架构overview 一文中有介绍,Task是执行计划的最小组成。一个stage对应一个或多个Task。
NodeScheduler内部有NodeSelectorFactory,并对外提供createNodeSelector方法。
1 2 3 4 public NodeSelector createNodeSelector (Session session, Optional<CatalogHandle> catalogHandle) { return nodeSelectorFactory.createNodeSelector(requireNonNull(session, "session is null" ), requireNonNull(catalogHandle, "catalogHandle is null" )); }
NodeSelector NodeSelector提供了对应节点的分配算法.有两个实现类:
TopologyAwareNodeSelector: 基于网络拓扑距离的选择器如果split不能任意调度,那么会从split返回的节点中选择 对于可以任意调度的split,根据split的节点和NetworkPath获取节点列表。 根据Node上所有Split权重,或是Node上排队Split的权重平衡选择节点。 如果仍然没有找到,那么返回等待split空间的Future。 UniformNodeSelector: 默认的均衡选择器如果开启了本地调度优化尽量将split放在本地节点上,否则(无法任意调度的split只能从自己返回的节点中选择)随机选取若干节点(count>=minCandidates) 每次选取有最小权重的Node。即尽量使不同Node上所有Split权重的平衡,或是不同Node上排队Split的权重平衡(默认策略)。 如果仍然没有找到,那么返回等待split空间的Future。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public interface NodeSelector { void lockDownNodes () ; List<InternalNode> allNodes () ; InternalNode selectCurrentNode () ; default List<InternalNode> selectRandomNodes (int limit) { return selectRandomNodes(limit, ImmutableSet.of()); } List<InternalNode> selectRandomNodes (int limit, Set<InternalNode> excludedNodes) ; SplitPlacementResult computeAssignments (Set<Split> splits, List<RemoteTask> existingTasks) ; SplitPlacementResult computeAssignments (Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap) ; }
NodePartitioningManager NodePartitioningManager#getNodePartitioningMap提供了基于分区和节点的映射关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class NodePartitionMap { private final List<InternalNode> partitionToNode; private final int [] bucketToPartition; private final ToIntFunction<Split> splitToBucket; ... ... public InternalNode getNode (Split split) { int bucket = splitToBucket.applyAsInt(split); int partition = bucketToPartition[bucket]; return requireNonNull(partitionToNode.get(partition)); } } public final class ConnectorBucketNodeMap { private final int bucketCount; private final Optional<List<Node>> bucketToNode; }
getNodePartitionMap的具体逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 private NodePartitionMap getNodePartitioningMap ( Session session, PartitioningHandle partitioningHandle, Map<Integer, List<InternalNode>> bucketToNodeCache, AtomicReference<List<InternalNode>> systemPartitioningCache) { requireNonNull(session, "session is null" ); requireNonNull(partitioningHandle, "partitioningHandle is null" ); if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) { return systemNodePartitionMap(session, partitioningHandle, systemPartitioningCache); } if (partitioningHandle.getConnectorHandle() instanceof MergePartitioningHandle mergeHandle) { return mergeHandle.getNodePartitioningMap(handle -> getNodePartitioningMap(session, handle, bucketToNodeCache, systemPartitioningCache)); } Optional<ConnectorBucketNodeMap> optionalMap = getConnectorBucketNodeMap(session, partitioningHandle); if (optionalMap.isEmpty()) { return systemNodePartitionMap(session, FIXED_HASH_DISTRIBUTION, systemPartitioningCache); } ConnectorBucketNodeMap connectorBucketNodeMap = optionalMap.get(); checkArgument(connectorBucketNodeMap.getBucketCount() < 1_000_000 , "Too many buckets in partitioning: %s" , connectorBucketNodeMap.getBucketCount()); List<InternalNode> bucketToNode; if (connectorBucketNodeMap.hasFixedMapping()) { bucketToNode = getFixedMapping(connectorBucketNodeMap); } else { CatalogHandle catalogHandle = requiredCatalogHandle(partitioningHandle); bucketToNode = bucketToNodeCache.computeIfAbsent( connectorBucketNodeMap.getBucketCount(), bucketCount -> createArbitraryBucketToNode(getAllNodes(session, catalogHandle), bucketCount)); } int [] bucketToPartition = new int [connectorBucketNodeMap.getBucketCount()]; BiMap<InternalNode, Integer> nodeToPartition = HashBiMap.create(); int nextPartitionId = 0 ; for (int bucket = 0 ; bucket < bucketToNode.size(); bucket++) { InternalNode node = bucketToNode.get(bucket); Integer partitionId = nodeToPartition.get(node); if (partitionId == null ) { partitionId = nextPartitionId; nextPartitionId++; nodeToPartition.put(node, partitionId); } bucketToPartition[bucket] = partitionId; } List<InternalNode> partitionToNode = IntStream.range(0 , nodeToPartition.size()) .mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId)) .collect(toImmutableList()); return new NodePartitionMap (partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle)); } private NodePartitionMap systemNodePartitionMap (Session session, PartitioningHandle partitioningHandle, AtomicReference<List<InternalNode>> nodesCache) { SystemPartitioning partitioning = ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getPartitioning(); NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, Optional.empty()); List<InternalNode> nodes = switch (partitioning) { case COORDINATOR_ONLY -> ImmutableList.of(nodeSelector.selectCurrentNode()); case SINGLE -> nodeSelector.selectRandomNodes(1 ); case FIXED -> { List<InternalNode> value = nodesCache.get(); if (value == null ) { value = nodeSelector.selectRandomNodes(getHashPartitionCount(session)); nodesCache.set(value); } yield value; } default -> throw new IllegalArgumentException ("Unsupported plan distribution " + partitioning); }; checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available" ); return new NodePartitionMap (nodes, split -> { throw new UnsupportedOperationException ("System distribution does not support source splits" ); }); }
注意,对于SOURCE_DISTRIBUTION和SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION,并不是通过NodePartitioningManager处理。而是通过StageScheduler的实现类进行处理。
PipelinedQueryScheduler 由于默认的RetryPolicy是NONE,按照上面构建查询调度器的逻辑,会构建PipelinedQueryScheduler。
PipelinedQueryScheduler会在初始化的时候通过StageManager将SubPlan生成SqlStage。其中SqlStage会被分为两类
coordinatorStage distributedStage 这两类Stage会分别由CoordinatorStagesScheduler和DistributedStagesScheduler 调度。
调度流程如下:
sequenceDiagram
PipelinedQueryScheduler->>PipelinedQueryScheduler: 对象初始化
PipelinedQueryScheduler->>+StageManager:create
loop every Subplan
StageManager->>StageManager:createSqlStage
StageManager->>StageManager: 分为coordinatorStage和distributedStage
end
StageManager->>-PipelinedQueryScheduler: return StageManager
PipelinedQueryScheduler->>+CoordinatorStagesScheduler: create
loop every coordinatorStage
CoordinatorStagesScheduler ->> CoordinatorStagesScheduler: createPipelinedStageExecution
CoordinatorStagesScheduler->>PipelinedStageExecution: initialize
PipelinedStageExecution ->> CoordinatorStagesScheduler: return PipelinedStageExecution
end
CoordinatorStagesScheduler->>CoordinatorStagesScheduler: initialize,为stageExecution添加listener
CoordinatorStagesScheduler ->>-PipelinedQueryScheduler: return CoordinatorStagesScheduler
PipelinedQueryScheduler->>PipelinedQueryScheduler: start
PipelinedQueryScheduler ->> DistributedStagesScheduler: create
loop every distributedStage
DistributedStagesScheduler ->> DistributedStagesScheduler: createPipelinedStageExecution
DistributedStagesScheduler->>PipelinedStageExecution: initialize
PipelinedStageExecution ->> DistributedStagesScheduler: return PipelinedStageExecution
end
loop every StageExecution
DistributedStagesScheduler ->> StageScheduler: createStageScheduler
StageScheduler->>DistributedStagesScheduler: return StageScheduler
end
DistributedStagesScheduler ->>PipelinedQueryScheduler: return DistributedStagesScheduler
PipelinedQueryScheduler ->>+ CoordinatorStagesScheduler: schedule
CoordinatorStagesScheduler->>- PipelinedQueryScheduler: return
PipelinedQueryScheduler ->>+ DistributedStagesScheduler: schedule(Async)
DistributedStagesScheduler ->>- PipelinedQueryScheduler: return 生成SqlStage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static StageManager create (... ...) { ... ... for (SubPlan planNode : Traverser.forTree(SubPlan::getChildren).breadthFirst(planTree)) { PlanFragment fragment = planNode.getFragment(); SqlStage stage = createSqlStage( getStageId(session.getQueryId(), fragment.getId()), fragment, TableInfo.extract(session, metadata, fragment), taskFactory, session, summarizeTaskInfo, nodeTaskMap, executor, schedulerStats); StageId stageId = stage.getStageId(); stages.put(stageId, stage); stagesInTopologicalOrder.add(stage); if (fragment.getPartitioning().isCoordinatorOnly()) { coordinatorStagesInTopologicalOrder.add(stage); } else { distributedStagesInTopologicalOrder.add(stage); } if (rootStageId == null ) { rootStageId = stageId; } Set<StageId> childStageIds = planNode.getChildren().stream() .map(childStage -> getStageId(session.getQueryId(), childStage.getFragment().getId())) .collect(toImmutableSet()); children.put(stageId, childStageIds); childStageIds.forEach(child -> parents.put(child, stageId)); } StageManager stageManager = new StageManager (... ...); return stageManager; }
从Subplan创建SqlStage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static SqlStage createSqlStage (... ...) { ... ... SqlStage sqlStage = new SqlStage ( session, new StageStateMachine (stageId, fragment, tables, executor, schedulerStats), remoteTaskFactory, nodeTaskMap, summarizeTaskInfo); sqlStage.initialize(); return sqlStage; }
从SqlStage创建StageExecution 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static PipelinedStageExecution createPipelinedStageExecution ( SqlStage stage, Map<PlanFragmentId, PipelinedOutputBufferManager> outputBufferManagers, TaskLifecycleListener taskLifecycleListener, FailureDetector failureDetector, Executor executor, Optional<int []> bucketToPartition, int attempt) { PipelinedStageStateMachine stateMachine = new PipelinedStageStateMachine (stage.getStageId(), executor); ImmutableMap.Builder<PlanFragmentId, RemoteSourceNode> exchangeSources = ImmutableMap.builder(); for (RemoteSourceNode remoteSourceNode : stage.getFragment().getRemoteSourceNodes()) { for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) { exchangeSources.put(planFragmentId, remoteSourceNode); } } PipelinedStageExecution execution = new PipelinedStageExecution ( stateMachine, stage, outputBufferManagers, taskLifecycleListener, failureDetector, bucketToPartition, exchangeSources.buildOrThrow(), attempt); execution.initialize(); return execution; }
coordinatorStage 调度 coordinatorStage是在 coordinator上执行的Stage。 例如,使用 EXPLAIN ANALYZE
命令时,coordinatorStage结构如下图所示。
flowchart LR
subgraph coordinatorStage
subgraph planFragment
OutputNode0(OutputNode)-->|source|ExplainAnalyzedNode0(ExplainAnalyzedNode)
ExplainAnalyzedNode0-->|source|ExchangeNode0(ExchangeNode0)
ExchangeNode0-->|source|RemoteSourceNode01(RemoteSourceNode)
end
end coordinatorStage处理流程如下:
coordinatorStage是COORDINATOR_ONLY分区的Stage。 在create CoordinatorStagesScheduler时会从SqlStage构建StageExecution 在CoordinatorStagesScheduler初始化(initialize)的时候会给StageExecution设置listener给每个StageExecution设置自己的状态Listener 每个StageExecution done时,终止其child StageExecution root(first) StageExecution状态变化时,同步到QueryState last StageExecution结束时,终止distributedStagesScheduler CoordinatorStagesScheduler 执行schedule方法时会执行StageExecution#scheduleTask方法,创建RemoteTask用于执行。 distributedStage distributedStage是指不在 coordinator上执行的Stage,其处理流程如下:
distributedStage是非COORDINATOR_ONLY分区的Stage。 在create DistributedStagesScheduler时会从SqlStage构建StageExecution 构建StageExecution完成后,会为每个Stage和其children构建StageScheduler用于Stage调度。 在DistributedStagesScheduler初始化(initialize)的时候会给StageExecution设置listener给每个StageExecution设置自己的状态Listener 每个StageExecution done时,终止其child StageExecution DistributedStagesScheduler执行schedule方法时启动所有的StageScheduler 根据执行调度策略循环获取可以执行的StageExecution和对应的StageScheduler,直到所有StageExecution均完成。使用StageScheduler执行schedule 如果有blockedStage,且原因是WRITER_SCALING,WAITING_FOR_SOURCE和SPLIT_QUEUES_FULL加入列表,否则抛出异常 对于所有的blockedStage,等待1s,对所有blockedStage调用cancel方法(google AbstractFuture该方法默认是donothing)。 ExecutionSchedule ExecutionSchedule 是指StageExecution的调度策略。
1 2 3 4 5 6 7 public interface ExecutionSchedule { StagesScheduleResult getStagesToSchedule () ; boolean isFinished () ; }
ExecutionSchedule有两个实现类。
AllAtOnceExecutionSchedule: 每次返回所有可以调度的StageExecution PhasedExecutionSchedule: 默认调度模式,根据StageExecution的依赖顺序返回可以执行的stage,防止由于stage间的依赖造成的阻塞。 对于带join的查询:
1 2 3 4 5 6 7 8 9 10 select t.* ,n.* from system.runtime.tasks t left join system.runtime.nodes n on t.node_id = n.node_id where t.state = 'FINISHED'
生成的查询结构如下:
flowchart
subgraph stage0
subgraph planFragment0
OutputNode0(OutputNode)-->|source|JoinNode0(JoinNode)
JoinNode0-->|left source|RemoteSourceNode0(RemoteSourceNode)
JoinNode0-->|right source|ExchangeNode0(ExchangeNode0)
ExchangeNode0-->|source|RemoteSourceNode01(RemoteSourceNode)
end
end
subgraph stage1
subgraph planFragment1
ProjectNode1(ProjectNode)-->|source|FilterNode1(FilterNode)
FilterNode1-->|source|TableScanNode1(TableScanNode,system.runtime.tasks)
end
end
subgraph stage2
subgraph planFragment2
ProjectNode2(ProjectNode)-->|source|TableScanNode2(TableScanNode,system.runtime.nodes)
end
end
RemoteSourceNode0-.->ProjectNode1
RemoteSourceNode01-.->ProjectNode2 PhasedExecutionSchedule会计算出以下依赖关系
Trino中会将join执行分为构建侧和探针侧,其中构建侧是小表(Trino中的右表),会加载到内存中,探针侧是大表,用于遍历。所以会存在隐式依赖关系,构建侧的Stage和其上游任务需要在探针侧Stage前调度。所以有了上面的依赖关系。
由于依赖关系的存在,首次调度的stage将是 stage0和stage2。
StageScheduler 从上面的distributedStage处理流程可以知道,最终StageExecution的调度是通过StageScheduler来进行调度的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface StageScheduler extends Closeable { default void start () {} ScheduleResult schedule () ; @Override default void close () {} }
StageScheduler的实现有4个:
newSourcePartitionedSchedulerAsStageScheduler中的内部类: 如果分区格式是SOURCE_DISTRIBUTION使用这种调度器 ScaledWriterScheduler: 如果分区格式是SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION,使用这种调度器 FixedCountScheduler: 如果没有本地Source,都是remote Source,使用这种调度器 FixedSourcePartitionedScheduler: 如果有本地Source,使用这种调度器。 FixedCountScheduler和FixedSourcePartitionedScheduler会使用上面提到的NodePartitioningManager进行分区设置。
我们先来看看FixedCountScheduler。
FixedCountScheduler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class FixedCountScheduler implements StageScheduler { public interface TaskScheduler { Optional<RemoteTask> scheduleTask (InternalNode node, int partition) ; } private final TaskScheduler taskScheduler; private final List<InternalNode> partitionToNode; public FixedCountScheduler (StageExecution stageExecution, List<InternalNode> partitionToNode) { requireNonNull(stageExecution, "stage is null" ); this .taskScheduler = (node, partition) -> stageExecution.scheduleTask(node, partition, ImmutableMultimap.of()); this .partitionToNode = requireNonNull(partitionToNode, "partitionToNode is null" ); } @Override public ScheduleResult schedule () { List<RemoteTask> newTasks = IntStream.range(0 , partitionToNode.size()) .mapToObj(partition -> taskScheduler.scheduleTask(partitionToNode.get(partition), partition)) .filter(Optional::isPresent) .map(Optional::get) .collect(toImmutableList()); return new ScheduleResult (true , newTasks, 0 ); } }
FixedSourcePartitionedScheduler FixedSourcePartitionedScheduler是针对有本地Source的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public FixedSourcePartitionedScheduler (... ...) { ... ... BucketedSplitPlacementPolicy splitPlacementPolicy = new BucketedSplitPlacementPolicy (nodeSelector, nodes, bucketNodeMap, stageExecution::getAllTasks); ArrayList<SourceScheduler> sourceSchedulers = new ArrayList <>(); partitionIdAllocator = new PartitionIdAllocator (); scheduledTasks = new HashMap <>(); for (PlanNodeId planNodeId : schedulingOrder) { SplitSource splitSource = splitSources.get(planNodeId); SourceScheduler sourceScheduler = newSourcePartitionedSchedulerAsSourceScheduler( stageExecution, planNodeId, splitSource, splitPlacementPolicy, splitBatchSize, dynamicFilterService, tableExecuteContextManager, () -> true , partitionIdAllocator, scheduledTasks); sourceSchedulers.add(sourceScheduler); } this .sourceSchedulers = new ArrayDeque <>(sourceSchedulers); }
如果所有的RemoteSource的ExchangeType都是REPLICATE(一般只有小表join时才会有REPLICATE类型的Exchange,即该stage都是本地Source,Trino中小表用于构建)。会根据输入的bucket确定和节点的映射(nodePartitioningManager#getBucketNodeMap) 否则,对于有remoteSource的会使用nodePartitioningManager#getNodePartitioningMap进行分区。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public ScheduleResult schedule () { List<RemoteTask> newTasks = ImmutableList.of(); if (scheduledTasks.isEmpty()) { ImmutableList.Builder<RemoteTask> newTasksBuilder = ImmutableList.builder(); for (InternalNode node : nodes) { Optional<RemoteTask> task = stageExecution.scheduleTask(node, partitionIdAllocator.getNextId(), ImmutableMultimap.of()); if (task.isPresent()) { scheduledTasks.put(node, task.get()); newTasksBuilder.add(task.get()); } } newTasks = newTasksBuilder.build(); } ListenableFuture<Void> blocked = immediateFuture(null ); ScheduleResult.BlockedReason blockedReason = null ; int splitsScheduled = 0 ; while (!sourceSchedulers.isEmpty()) { SourceScheduler scheduler = sourceSchedulers.peek(); ScheduleResult schedule = scheduler.schedule(); splitsScheduled += schedule.getSplitsScheduled(); blocked = schedule.getBlocked(); if (schedule.getBlockedReason().isPresent()) { blockedReason = schedule.getBlockedReason().get(); } else { blockedReason = null ; } if (!blocked.isDone() || !schedule.isFinished()) { break ; } stageExecution.schedulingComplete(scheduler.getPlanNodeId()); sourceSchedulers.remove().close(); } if (blockedReason != null ) { return new ScheduleResult (sourceSchedulers.isEmpty(), newTasks, blocked, blockedReason, splitsScheduled); } checkState(blocked.isDone(), "blockedReason not provided when scheduler is blocked" ); return new ScheduleResult (sourceSchedulers.isEmpty(), newTasks, splitsScheduled); }
注意ScheduleResult返回如果不是finished,那么stageExecution的状态是不会被变到Scheduled。那么在下一次循环时,还是能获取到这个stageExecution
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 if (result.isFinished()) { stageExecution.schedulingComplete(); } public void schedulingComplete () { if (!stateMachine.transitionToScheduled()) { return ; } if (isStageFlushing()) { stateMachine.transitionToFlushing(); } if (isStageFinished()) { stateMachine.transitionToFinished(); } for (PlanNodeId partitionedSource : stage.getFragment().getPartitionedSources()) { schedulingComplete(partitionedSource); } }
ScaledWriterScheduler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public ScheduleResult schedule () { List<RemoteTask> writers = scheduleTasks(getNewTaskCount()); future.set(null ); future = SettableFuture.create(); executor.schedule(() -> future.set(null ), 200 , MILLISECONDS); return new ScheduleResult (done.get(), writers, future, WRITER_SCALING, 0 ); } private int getNewTaskCount () { if (scheduledNodes.isEmpty()) { return 1 ; } double fullTasks = sourceTasksProvider.get().stream() .filter(task -> !task.getState().isDone()) .map(TaskStatus::getOutputBufferStatus) .map(OutputBufferStatus::isOverutilized) .mapToDouble(full -> full ? 1.0 : 0.0 ) .average().orElse(0.0 ); long writtenBytes = writerTasksProvider.get().stream() .map(TaskStatus::getPhysicalWrittenDataSize) .mapToLong(DataSize::toBytes) .sum(); if ((fullTasks >= 0.5 ) && (writtenBytes >= (writerMinSizeBytes * maxTaskWriterCount * scheduledNodes.size()))) { return 1 ; } return 0 ; } private List<RemoteTask> scheduleTasks (int count) { if (count == 0 ) { return ImmutableList.of(); } List<InternalNode> nodes = nodeSelector.selectRandomNodes(count, scheduledNodes); checkCondition(!scheduledNodes.isEmpty() || !nodes.isEmpty(), NO_NODES_AVAILABLE, "No nodes available to run query" ); ImmutableList.Builder<RemoteTask> tasks = ImmutableList.builder(); for (InternalNode node : nodes) { Optional<RemoteTask> remoteTask = stage.scheduleTask(node, scheduledNodes.size(), ImmutableMultimap.of()); remoteTask.ifPresent(task -> { tasks.add(task); scheduledNodes.add(node); }); } return tasks.build(); }
默认情况下,写出任务的数量是静态的。启用写出器扩展允许 Presto 动态扩展写出器任务的数量,而不是分配固定数量的任务。当每个写出器的平均物理数据量高于最小阈值时,会添加其他任务,但前提是查询在写出时遇到瓶颈。
动态添加的原因,也是因为stageExecution未处于scheduled状态,从而会重复进入schedule方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if (partitioningHandle.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) { Supplier<Collection<TaskStatus>> sourceTasksProvider = () -> childStageExecutions.stream() .map(StageExecution::getTaskStatuses) .flatMap(List::stream) .collect(toImmutableList()); Supplier<Collection<TaskStatus>> writerTasksProvider = stageExecution::getTaskStatuses; ScaledWriterScheduler scheduler = new ScaledWriterScheduler ( stageExecution, sourceTasksProvider, writerTasksProvider, nodeScheduler.createNodeSelector(session, Optional.empty()), executor, getWriterMinSize(session), isTaskScaleWritersEnabled(session) ? getTaskScaleWritersMaxWriterCount(session) : getTaskWriterCount(session)); whenAllStages(childStageExecutions, StageExecution.State::isDone) .addListener(scheduler::finish, directExecutor()); return scheduler;}
newSourcePartitionedSchedulerAsStageScheduler newSourcePartitionedSchedulerAsStageScheduler是用于处理SplitSource的调度器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler ( StageExecution stageExecution, PlanNodeId partitionedNode, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int splitBatchSize, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier anySourceTaskBlocked) { SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler ( stageExecution, partitionedNode, splitSource, splitPlacementPolicy, splitBatchSize, dynamicFilterService, tableExecuteContextManager, anySourceTaskBlocked, new PartitionIdAllocator (), new HashMap <>()); return new StageScheduler () { @Override public void start () { sourcePartitionedScheduler.start(); } @Override public ScheduleResult schedule () { return sourcePartitionedScheduler.schedule(); } @Override public void close () { sourcePartitionedScheduler.close(); } }; }
该Stage调度器的调度方法,实际上是调用了SourcePartitionedScheduler实现。SourcePartitionedScheduler的状态有4种:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private enum State { INITIALIZED, SPLITS_ADDED, SPLITS_SCHEDULED, FINISHED }
SplitSource是支持Split的Source,一般是ConnectorAwareSplitSource,调度器会从SplitSource中获取SplitBatch(表示一批Split)。然后将其中所有的Split加入到pendingSplits。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 else if (pendingSplits.isEmpty()) { if (nextSplitBatchFuture == null ) { nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize); long start = System.nanoTime(); addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start)); } if (nextSplitBatchFuture.isDone()) { SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture); nextSplitBatchFuture = null ; pendingSplits.addAll(nextSplits.getSplits()); if (nextSplits.isLastBatch()) { if (state == State.INITIALIZED && pendingSplits.isEmpty()) { pendingSplits.add(new Split ( splitSource.getCatalogHandle(), new EmptySplit (splitSource.getCatalogHandle()))); } log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED" , stageExecution.getStageId(), partitionedNode); state = State.SPLITS_SCHEDULED; } } else { blockedFuture = Optional.of(asVoid(nextSplitBatchFuture)); blockedOnNextSplitBatch = true ; log.debug("stage id: %s, node: %s; blocked on next split batch" , stageExecution.getStageId(), partitionedNode); } }
当pendingSplits不为空时。调度器状态为SPLITS_ADDED。
1 2 3 4 if (!pendingSplits.isEmpty() && state == State.INITIALIZED) { log.debug("stage id: %s, node: %s; transitioning to SPLITS_ADDED", stageExecution.getStageId(), partitionedNode); state = State.SPLITS_ADDED; }
然后是对于所有的Split,计算其放置策略。根据计算出来的策略设置Split
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);splitAssignment = splitPlacementResult.getAssignments(); splitAssignment.values().forEach(pendingSplits::remove); overallSplitAssignmentCount += splitAssignment.size(); if (!pendingSplits.isEmpty()) { placementFuture = splitPlacementResult.getBlocked(); blockedFuture = Optional.of(placementFuture); blockedOnPlacements = true ; } overallNewTasks.addAll(assignSplits(splitAssignment)); private Set<RemoteTask> assignSplits (Multimap<InternalNode, Split> splitAssignment) { ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder(); ImmutableSet<InternalNode> nodes = ImmutableSet.copyOf(splitAssignment.keySet()); for (InternalNode node : nodes) { ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder() .putAll(partitionedNode, splitAssignment.get(node)) .build(); RemoteTask task = scheduledTasks.get(node); if (task != null ) { task.addSplits(splits); } else { scheduleTask(node, splits).ifPresent(newTasks::add); } } return newTasks.build(); }
当pendingSplits为空,且状态为SPLITS_SCHEDULED时,会将状态设置为FINISH,并将调度器设置为scheduled。其他情况都会让调度器继续执行调度方法。