Trino源码学习-Trino启动
Trino源码学习-Trino启动
本篇文章开始,我们将进行Trino的源码分析,一切还是从启动开始看起。
启动流程
sequenceDiagram TrinoServer->>Server: start Server->>+EmbedVersion: 嵌入Trino版本信息 EmbedVersion-->>-Server: doStart Server->>Server: 验证JVM参数和系统时间 Server->>+Bootstrap: 构建Bootstrap并initialize Bootstrap-->>-Server: airlift初始化完成 Server->>+PluginManager: loadPlugins PluginManager-->>-Server: 插件加载成功 Server->>+ConnectorServiceProvider: 初始化catalog Server->>Server: 加载其他组件配置 Server->>Announcer: 启动服务发现 Announcer->>Server: 返回 Server->>StartupStatus: 修改启动状态为启动成功
EmbedVersion
EmbedVersion会动态生成一个类,类名会带上当前的版本:
- DevelopmentServer是dev
- TrinoServer是Jar包中
META-INF/MANIFEST.MF
文件里的Implementation-Version
属性
生成的类是Runnable, Callable 的包装类,代码结构如下:
1 | package io.trino.$gen; |
BootStrap
Trino使用Airlift框架,所以整个系统是通过BootStrap启动起来的。这里就不赘述了,可以查看前面的Airlift源码分析。
PluginManager
在查看Plugin加载的源码前,我们先回顾下trino server的代码结构:
1 | . |
PluginManager加载插件的时序图如下:
sequenceDiagram PluginManager->>SeverPluginProvider: loadPlugins SeverPluginProvider->>SeverPluginProvider: 遍历plugin目录下的子目录 SeverPluginProvider->>PluginClassLoader: 为每个插件构建ClassLoader PluginClassLoader-->>PluginManager: ClassLoader构建成功 PluginManager->>ServiceLoader: 使用SPI机制加载Plugin的实现类型 ServiceLoader-->>PluginManager: 加载Plugin成功 PluginManager->>PluginManager: 安装plugin并注册插件中的组件
在PluginManager installPlugin的过程中,会注册ConnectorFactory。后面会在Catalog初始化中使用。
Catalog
Catalog通过CatalogManagerModule配置。
1 | public class CatalogManagerModule |
有上面可知CatalogManager 有两类:
- Static(默认): Catalog是静态的,修改需重启服务
- Dynamic: 支持动态的Catalog管理
我们先来看下StaticCatalogManager的逻辑:
- 初始化实例
- 注入StaticCatalogManagerConfig配置和CatalogFactory
- 从配置中获取catalog目录地址,默认是
etc/catalog/
目录 - 遍历目录获取所有Properties文件,文件名就是catalog名(
system
被用作内部使用,不能使用) - 过滤所有禁用的catalog,然后将所有catalog配置保存
- loadInitialCatalogs(Server启动时触发)
- 对于所有catalog配置,使用catalogFactory(由plugin加载)创建catalogConnector
- 将创建的catalogConnector存储在内部的ConcurrentMap中
- stop(生命周期方法)
- 关闭所有catalogConnector
StaticCatalogManager的逻辑很简单。下面我们来看下DynamicCatalogManager的逻辑。
1 | public class DynamicCatalogManagerModule |
DynamicCatalogManager有两种:
- CoordinatorDynamicCatalogManager
- WorkerDynamicCatalogManager
根据节点的角色是Coordinator还是Worker分别初始化。
- CoordinatorDynamicCatalogManager
- 初始化时注入CatalogStore和catalogFactory
- 从注入的CatalogStore中获取catalog配置信息。
- loadInitialCatalogs时,对于CatalogStore中的所有catalog配置,使用catalogFactory(由plugin加载)创建catalogConnector,然后将创建的catalogConnector存储在内部的ConcurrentMap中。
- 支持动态创建Catalog(createCatalog方法)
- WorkerDynamicCatalogManager
- 初始化时注入catalogFactory
- loadInitialCatalogs时什么都不做
- 在执行任务时,会通过ensureCatalogsLoaded方法检查CatalogProperties有没有被加载,如果没有加载会通过catalogFactory创建catalogConnector,然后将创建的catalogConnector存储在内部的ConcurrentMap中。(实现了从Coordinator中分发catalog配置)。
当然当前Dynamic catalog的实现还不是很完善。后续的进度可以通过github issue追踪。
Connector
Connector的ConnectorFactory由Plugin加载并注册,然后在catalog加载时会创建对应的connector。
sequenceDiagram CatalogManager->>CatalogFactory: createCatalog CatalogFactory->>CatalogFactory: createConnector CatalogFactory->>ConnectorContext: new ConnectorContextInstance ConnectorContext->>CatalogFactory: return Connector创建上下文 CatalogFactory->>ConnectorFactory: create ConnectorFactory->>CatalogFactory: return Connector
服务发现
Trino的服务发现是基于Airlift的服务发现。由于Trino只有一个Coordinator,如果是内嵌方式部署的话,服务发现会有单点问题,可以考虑使用独立部署。
HeartbeatFailureDetector
除了Airlift的服务发现外,Trino还是实现了心跳检测器(HeartbeatFailureDetector)用作故障检测。
1 | // io.trino.server.CoordinatorModule |
FailureDetector 主要是定义了状态的一些枚举,具体健康检查是在 HeartbeatFailureDetector 中。HeartbeatFailureDetector 会以一定的间隔向 worker nodes 发送 HEAD 请求来进行健康检查。
1 | // io.trino.failuredetector |
默认情况下,健康检查的间隔是500ms, 1分钟的失败率超过0.1就视为异常,这些都写在 FailureDetectorConfig 配置中了。
1 | public class FailureDetectorConfig |
判断是否出现异常的逻辑
1 | private synchronized void updateState() |
DiscoveryNodeManager
在 DiscoveryNodeManager 类中实现了管理 Coordinator 使用的 worker 节点列表。DiscoveryNodeManager 每 5 秒运行一次 pollWorkers 来更新节点列表和每个节点的状态。
1 | //ServerMainModule中会绑定DiscoveryNodeManager |
pollWorkers的流程
1 | // io.trino.metadata.DiscoveryNodeManager |
pollWorkers的时序逻辑如下:
sequenceDiagram DiscoveryNodeManager->>DiscoveryNodeManager: pollWorkers loop Every NodeState DiscoveryNodeManager->>RemoteNodeState: asyncRefresh RemoteNodeState-->>DiscoveryNodeManager: end DiscoveryNodeManager->>DiscoveryNodeManager: refreshNodesInternal DiscoveryNodeManager->>+MergingServiceSelector: selectAllServices MergingServiceSelector->>+Announcer: getServiceAnnouncements Announcer -->>-MergingServiceSelector: ServiceAnnouncement set MergingServiceSelector->>+ CachingServiceSelector:selectAllServices CachingServiceSelector-->>-MergingServiceSelector: ServiceDescriptor List MergingServiceSelector->>MergingServiceSelector: merge MergingServiceSelector-->>-DiscoveryNodeManager:ServiceDescriptor List loop Every ServiceDescriptor DiscoveryNodeManager->>HeartbeatFailureDetector: filter Failed HeartbeatFailureDetector-->>DiscoveryNodeManager: dd end loop Every NodeState DiscoveryNodeManager->>RemoteNodeState: asyncRefresh RemoteNodeState-->>DiscoveryNodeManager: end DiscoveryNodeManager->>DiscoveryNodeManager: update node info DiscoveryNodeManager-->>DiscoveryNodeManager: Async notify all listener
当先前注册的节点消失时,DiscoveryNodeManager 会将以前的活动节点丢失记录到日志。这种情况,会有以下几种可能性:
- discovery server 已经超过 30 秒没有收到来自 Worker 的 PUT 请求。
- Worker 节点最后一分钟健康检查失败的概率超过 10%
在第一种情况下,应该检查 Coordinator 的 http-request.log,看看是否有worker节点对/v1/announcement/{node_id}的请求。如果没有来自该 Worker 节点的请求日志,则 Worker 节点很可能已经出现了问题。第二种情况,可以查看 Worker 节点的 http-request.log,看是否有HEAD /的请求。