return FluentFuture.from(query.waitForDispatched()) // wait for query to be dispatched, up to the wait timeout .withTimeout(waitMillis, MILLISECONDS, timeoutExecutor) .catching(TimeoutException.class, ignored -> null, directExecutor()) // when state changes, fetch the next result .transform(ignored -> query.getQueryResults(token, uriInfo), responseExecutor) .transform(this::createQueryResultsResponse, directExecutor()); }
// io.trino.server.protocol.ExecutingStatementResource privatevoidasyncQueryResults( Query query, long token, Duration maxWait, DataSize targetResultSize, UriInfo uriInfo, AsyncResponse asyncResponse) { // 带超时时间等待结果 ListenableFuture<QueryResults> queryResultsFuture = query.waitForResults(token, uriInfo, wait, targetResultSize); // 构建Response ListenableFuture<Response> response = Futures.transform(queryResultsFuture, queryResults -> toResponse(query, queryResults), directExecutor()); // 构建异步Response bindAsyncResponse(asyncResponse, response, responseExecutor); } // io.trino.server.protocol.Query publicsynchronized ListenableFuture<QueryResults> waitForResults(long token, UriInfo uriInfo, Duration wait, DataSize targetResultSize) { // before waiting, check if this request has already been processed and cached Optional<QueryResults> cachedResult = getCachedResult(token); if (cachedResult.isPresent()) { return immediateFuture(cachedResult.get()); }
// wait for a results data or query to finish, up to the wait timeout ListenableFuture<Void> futureStateChange = addTimeout( getFutureStateChange(), () -> null, wait, timeoutExecutor);
// when state changes, fetch the next result // 当QueryState变化,或者Query中的exchange client已经打开在等待数据 // 会触发getNextResult方法获取QueryResult return Futures.transform(futureStateChange, ignored -> getNextResult(token, uriInfo, targetResultSize), resultsProcessorExecutor); }
// fetch result data from exchange QueryResultRowsresultRows= removePagesFromExchange(queryInfo, targetResultSize.toBytes()); ... ... // advance next token only return a next url if // (1) the query is not done AND the query state is not FAILED // OR // (2) there is more data to send (due to buffering) // OR // (3) cached query result needs client acknowledgement to discard if (queryInfo.getState() != FAILED && (!queryInfo.isFinalQueryInfo() || !exchangeDataSource.isFinished() || (queryInfo.getOutputStage().isPresent() && !resultRows.isEmpty()))) { // 增加Token 下次继续轮询 nextToken = OptionalLong.of(token + 1); } else { nextToken = OptionalLong.empty(); // the client is not coming back, make sure the exchange is closed exchangeDataSource.close(); }
// io.trino.dispatcher.DispatchManager public ListenableFuture<Void> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query) { requireNonNull(queryId, "queryId is null"); requireNonNull(sessionContext, "sessionContext is null"); requireNonNull(query, "query is null"); checkArgument(!query.isEmpty(), "query must not be empty string"); checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);
// It is important to return a future implementation which ignores cancellation request. // Using NonCancellationPropagatingFuture is not enough; it does not propagate cancel to wrapped future // but it would still return true on call to isCancelled() after cancel() is called on it. DispatchQueryCreationFuturequeryCreationFuture=newDispatchQueryCreationFuture(); dispatchExecutor.execute(() -> { try { // 提交查询 createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager); } finally { queryCreationFuture.set(null); } }); return queryCreationFuture; } /** * Creates and registers a dispatch query with the query tracker. This method will never fail to register a query with the query * tracker. If an error occurs while creating a dispatch query, a failed dispatch will be created and registered. */ private <C> voidcreateQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager) { Sessionsession=null; PreparedQuerypreparedQuery=null; try { ... ... // decode session session = sessionSupplier.createSession(queryId, sessionContext);
// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below // can result in delivering query-created event after query analysis has already started. // That can result in misbehaviour of plugins called during analysis phase (e.g. access control auditing) // which depend on the contract that event was already delivered. // // Note that for immediate and in-order delivery of query events we depend on synchronous nature of // QueryMonitor and EventListenerManager. queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty())); // Query执行future ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> { QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass()); // 根据不同的语句获取不同的工厂 if (queryExecutionFactory == null) { thrownewTrinoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatement().getClass().getSimpleName()); }
try { return queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector); } catch (Throwable e) { if (e instanceof Error) { if (e instanceof StackOverflowError) { log.error(e, "Unhandled StackOverFlowError; should be handled earlier; to investigate full stacktrace you may need to enable -XX:MaxJavaStackTraceDepth=0 JVM flag"); } else { log.error(e, "Unhandled Error"); } // wrapping as RuntimeException to guard us from problem that code downstream which investigates queryExecutionFuture may not necessarily handle // Error subclass of Throwable well. RuntimeExceptionwrappedError=newRuntimeException(e); stateMachine.transitionToFailed(wrappedError); throw wrappedError; } stateMachine.transitionToFailed(e); throw e; } });
protectedbooleaninternalStartNext() { checkState(Thread.holdsLock(root), "Must hold lock to find next query"); synchronized (root) { if (!canRunMore()) { returnfalse; } ManagedQueryExecutionquery= queuedQueries.poll(); if (query != null) { startInBackground(query); // 启动查询 returntrue; }
// Remove even if the sub group still has queued queries, so that it goes to the back of the queue InternalResourceGroupsubGroup= eligibleSubGroups.poll(); if (subGroup == null) { returnfalse; } booleanstarted= subGroup.internalStartNext(); // 启动子group的query checkState(started, "Eligible sub group had no queries to run");
descendantQueuedQueries--; // Don't call updateEligibility here, as we're in a recursive call, and don't want to repeatedly update our ancestors. if (subGroup.isEligibleToStartNext()) { // 如果subGroup还能继续运行Query,再次加入到eligibleSubGroups addOrUpdateSubGroup(subGroup); } returntrue; } }
// io.trino.execution.SqlQueryManager @Override publicvoidcreateQuery(QueryExecution queryExecution) { requireNonNull(queryExecution, "queryExecution is null");
if (!queryTracker.addQuery(queryExecution)) { thrownewTrinoException(GENERIC_INTERNAL_ERROR, format("Query %s already registered", queryExecution.getQueryId())); }
queryExecution.addFinalQueryInfoListener(finalQueryInfo -> { // execution MUST be added to the expiration queue or there will be a leak queryTracker.expireQuery(queryExecution.getQueryId()); });