publicvoidstart() { try (SetThreadNameignored=newSetThreadName("HttpRemoteTask-%s", taskId)) { // to start we just need to trigger an update started.set(true); triggerUpdate(); // 触发创建或更新
dynamicFiltersFetcher.start(); taskStatusFetcher.start(); taskInfoFetcher.start(); } } privatevoidtriggerUpdate() { if (!started.get()) { // task has not started yet return; } if (pendingRequestsCounter.getAndIncrement() == 0) { // schedule update if this is the first update requested scheduleUpdate(); } } privatevoidscheduleUpdate() { executor.execute(this::sendUpdate); } privatevoidsendUpdate() { ... ... HttpUriBuilderuriBuilder= getHttpUriBuilder(taskStatus); Requestrequest= preparePost() .setUri(uriBuilder.build()) .setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString()) .setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson)) .build(); ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, Futures.addCallback( future, newSimpleHttpResponseHandler<>(newUpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion(), System.nanoTime(), currentPendingRequestsCounter), request.getUri(), stats), executor); }
// Workers don't need the embedded JSON representation when the fragment is sent Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty(); TaskUpdateRequestupdateRequest=newTaskUpdateRequest( session.toSessionRepresentation(), session.getIdentity().getExtraCredentials(), fragment, splitAssignments, outputBuffers.get(), dynamicFilterDomains.getDynamicFilterDomains(), session.getExchangeEncryptionKey());
/** * Declare that createOperator will not be called any more and release * any resources associated with this factory. * <p> * This method will be called only once. * Implementation doesn't need to worry about duplicate invocations. */ voidnoMoreOperators();
sequenceDiagram
SqlTaskExecution->>SqlTaskExecution: addSplitAssignments
SqlTaskExecution->>SqlTaskExecution: updateSplitAssignments
loop every new SplitAssignment
alt driverRunnerFactoriesWithSplit contains
SqlTaskExecution->>SqlTaskExecution:schedulePartitionedSource
else
SqlTaskExecution->>DriverSplitRunnerFactory:enqueueSplits
SqlTaskExecution->>updatedUnpartitionedSources: added
updatedUnpartitionedSources->>SqlTaskExecution: return
end
end
loop every planNodeId in updatedUnpartitionedSources
SqlTaskExecution->>+DriverSplitRunnerFactory: scheduleSplits
end
// first remove any split that was already acknowledged longcurrentMaxAcknowledgedSplit=this.maxAcknowledgedSplit; splitAssignments = splitAssignments.stream() .map(assignment -> newSplitAssignment( assignment.getPlanNodeId(), assignment.getSplits().stream() .filter(scheduledSplit -> scheduledSplit.getSequenceId() > currentMaxAcknowledgedSplit) .collect(toImmutableSet()), assignment.isNoMoreSplits())) // drop assignments containing no unacknowledged splits // the noMoreSplits signal acknowledgement is not tracked but it is okay to deliver it more than once .filter(assignment -> !assignment.getSplits().isEmpty() || assignment.isNoMoreSplits()) .collect(toList());
// update task with new assignments for (SplitAssignment assignment : splitAssignments) { if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(assignment.getPlanNodeId())) { // 本地Source schedulePartitionedSource(assignment); } else { // 远程Source,才会加到updatedUnpartitionedSources中 // tell existing drivers about the new splits DriverSplitRunnerFactoryfactory= driverRunnerFactoriesWithRemoteSource.get(assignment.getPlanNodeId()); factory.enqueueSplits(assignment.getSplits(), assignment.isNoMoreSplits()); updatedUnpartitionedSources.add(assignment.getPlanNodeId()); } }
// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination. ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder(); for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) { // create a new driver for the split // 创建用于执行的DriverRunner runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit)); } // 将DriverRunner放入TaskExecutor enqueueDriverSplitRunner(false, runners.build());
// If all driver runners have been enqueued for this plan node and driver life cycle combination, // move on to the next plan node. if (pendingSplits.getState() != NO_MORE_SPLITS) { break; } partitionedDriverRunnerFactory.noMoreDriverRunner(); pendingSplits.markAsCleanedUp();
if (taskHandle.isDestroyed()) { // If the handle is destroyed, we destroy the task splits to complete the future splitsToDestroy.add(prioritizedSplitRunner); } elseif (intermediate) { // 立刻启动 // Note: we do not record queued time for intermediate splits startIntermediateSplit(prioritizedSplitRunner); // add the runner to the handle so it can be destroyed if the task is canceled taskHandle.recordIntermediateSplit(prioritizedSplitRunner); } else { // 将Split 入队 taskHandle.enqueueSplit(prioritizedSplitRunner); // 如果Task启动的Split低于保证(guaranteedNumberOfDriversPerTask),尝试启动Split scheduleTaskIfNecessary(taskHandle); // 如果有更多的全局资源(低于minimumNumberOfDrivers),TaskExecutor会全局启动更多Split addNewEntrants(); }
publicvoidoffer(PrioritizedSplitRunner split) { checkArgument(split != null, "split is null");
split.setReady(); intlevel= split.getPriority().getLevel(); lock.lock(); try { if (levelWaitingSplits[level].isEmpty()) { // Accesses to levelScheduledTime are not synchronized, so we have a data race // here - our level time math will be off. However, the staleness is bounded by // the fact that only running splits that complete during this computation // can update the level time. Therefore, this is benign. longlevel0Time= getLevel0TargetTime(); longlevelExpectedTime= (long) (level0Time / Math.pow(levelTimeMultiplier, level)); longdelta= levelExpectedTime - levelScheduledTime[level].get(); levelScheduledTime[level].addAndGet(delta); }
// a task normally slowly accrues scheduled time in a level and then moves to the next, but // if the split had a particularly long quanta, accrue time to each level as if it had run // in that level up to the level limit. for (intcurrentLevel= oldLevel; currentLevel < newLevel; currentLevel++) { // 对不同level的时间贡献 longtimeAccruedToLevel= Math.min(SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[currentLevel + 1] - LEVEL_THRESHOLD_SECONDS[currentLevel]), remainingLevelContribution); addLevelTime(currentLevel, timeAccruedToLevel); remainingLevelContribution -= timeAccruedToLevel; remainingTaskTime -= timeAccruedToLevel; }
@Override public ListenableFuture<Void> processFor(Duration duration) { Driver driver; synchronized (this) { // if close() was called before we get here, there's not point in even creating the driver if (closed) { return immediateVoidFuture(); }
if (partitionedSplit != null) { // TableScanOperator requires partitioned split to be added before the first call to process driver.updateSplitAssignment(newSplitAssignment(partitionedSplit.getPlanNodeId(), ImmutableSet.of(partitionedSplit), true)); }
private ListenableFuture<Void> processInternal(OperationTimer operationTimer) { checkLockHeld("Lock must be held to call processInternal"); // 处理内存撤回 handleMemoryRevoke(); // 处理新增的SplitAssignment processNewSources();
// if we got an output page, add it to the next operator if (page != null && page.getPositionCount() != 0) { // 将当前operator的输出作为下一个operator的输出 next.addInput(page); next.getOperatorContext().recordAddInput(operationTimer, page); // 标记 page移动 movedPage = true; }
// if current operator is finished... if (current.isFinished()) { // 告知下一个operator没有输入数据 next.finish(); next.getOperatorContext().recordFinish(operationTimer); } }
for (intindex= activeOperators.size() - 1; index >= 0; index--) { //如果当前Operator已经结束,那么前面的Operator也应该结束了 if (activeOperators.get(index).isFinished()) { // close and remove this operator and all source operators List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1); // 关闭和销毁结束的Operator Throwablethrowable= closeAndDestroyOperators(finishedOperators); // 从activeOperators中删除这个sublist finishedOperators.clear(); if (throwable != null) { throwIfUnchecked(throwable); thrownewRuntimeException(throwable); } // Finish the next operator, which is now the first operator. if (!activeOperators.isEmpty()) { // 告知下一个operator没有输入数据 OperatornewRootOperator= activeOperators.get(0); newRootOperator.finish(); newRootOperator.getOperatorContext().recordFinish(operationTimer); } break; } }
// 如果没有发生Page移动,检查是不是阻塞了 if (!movedPage) { List<Operator> blockedOperators = newArrayList<>(); List<ListenableFuture<Void>> blockedFutures = newArrayList<>(); for (Operator operator : activeOperators) { Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator); if (blocked.isPresent()) { blockedOperators.add(operator); blockedFutures.add(blocked.get()); } } // 确实有阻塞Operator if (!blockedFutures.isEmpty()) { // allow for operators to unblock drivers when they become finished // 当任意operator完成时,也能解除driver的阻塞状态。 for (Operator operator : activeOperators) { operator.getOperatorContext().getFinishedFuture().ifPresent(blockedFutures::add); }
// 创建包装阻塞, 内部任何阻塞完成,都会解除blocked状态 ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures); // driver records serial blocked time driverContext.recordBlocked(blocked); // each blocked operator is responsible for blocking the execution // until one of the operators can continue for (Operator operator : blockedOperators) { operator.getOperatorContext().recordBlocked(blocked); } //返回阻塞 return blocked; } } // 无阻塞 return NOT_BLOCKED; }
privatevoiddestroyIfNecessary() { checkLockHeld("Lock must be held to call destroyIfNecessary");
if (!state.compareAndSet(State.NEED_DESTRUCTION, State.DESTROYED)) { return; }
// if we get an error while closing a driver, record it and we will throw it at the end ThrowableinFlightException=null; try { inFlightException = closeAndDestroyOperators(activeOperators); if (driverContext.getMemoryUsage() > 0) { log.error("Driver still has memory reserved after freeing all operator memory."); } if (driverContext.getRevocableMemoryUsage() > 0) { log.error("Driver still has revocable memory reserved after freeing all operator memory. Freeing it."); } driverContext.finished(); } catch (Throwable t) { // this shouldn't happen but be safe inFlightException = addSuppressedException( inFlightException, t, "Error destroying driver for task %s", driverContext.getTaskId()); }
if (inFlightException != null) { // this will always be an Error or Runtime throwIfUnchecked(inFlightException); thrownewRuntimeException(inFlightException); } }
privateDriverSplitRunnerFactory(DriverFactory driverFactory, boolean partitioned) { this.driverFactory = driverFactory; this.pipelineContext = taskContext.addPipelineContext(driverFactory.getPipelineId(), driverFactory.isInputDriver(), driverFactory.isOutputDriver(), partitioned); } public DriverSplitRunner createDriverRunner(@Nullable ScheduledSplit partitionedSplit) { checkState(!noMoreDriverRunner.get(), "noMoreDriverRunner is set"); pendingCreations.incrementAndGet(); // create driver context immediately so the driver existence is recorded in the stats // the number of drivers is used to balance work across nodes longsplitWeight= partitionedSplit == null ? 0 : partitionedSplit.getSplit().getSplitWeight().getRawValue(); DriverContextdriverContext= pipelineContext.addDriverContext(splitWeight); returnnewDriverSplitRunner(this, driverContext, partitionedSplit); }