Spark SQL 与 Catalyst 优化器:从 SQL 文本到向量化执行的全链路复盘
引言
去年我把一组跑在 Spark 2.4 的离线报表作业迁到 3.3,同一份数据、同一个 SQL,整体 TPE 降了 38% 。改的只是几个参数和 join hint,但执行计划变了一大片:原来 14 个 stage 缩成 9 个,explain formatted 里多了一堆 *(1) Scan 与 *(2) Filter——那个 * 就是 whole-stage codegen 的标记。这次迁移让我意识到,Spark SQL 真正的护城河不是 SQL 语法,而是 Catalyst 优化器叠加 Codegen、向量化 reader 这条贯穿到底的执行链路。
很多人把 Catalyst 当成”黑盒优化器”用,写完 SQL 就 spark.sql("...").show(),执行计划看不懂、慢了只会调 spark.sql.shuffle.partitions 或者无脑加 repartition。但 Spark SQL 的每一处性能差异——为什么同样 join 有时走 BroadcastHash 有时走 SortMerge、为什么同一张表有时扫 200 列有时只扫 5 列、为什么有时候 SQL 跑得比手写 RDD 还快——背后都是 Catalyst 的决策。本篇我把这条链路从 SQL 文本一直拆到向量化 reader,把 TreeNode/RuleExecutor 的 fixPoint 迭代、RBO 与 CBO 的分工、Codegen 的边界与塌陷条件都讲清楚,并给出我自己写 Catalyst Rule 时踩过的坑。
一、SQL 文本到物理计划:五个 phase
Spark SQL 一条 SQL 的生命周期,本质是五次”形态变化”。我画一张总览图,这也是看懂 explain() 输出顺序的地图:
1 | SQL 文本 "SELECT a, sum(b) FROM t WHERE c > 10 GROUP BY a" |
这五步对应 QueryExecution 里五个 lazy 字段:analyzed、withCachedData、optimizedPlan、sparkPlan、executedPlan。explain() 默认打的就是前面四张加 executed。3.0 之后多了 explain formatted,把 plan 折叠成算子层级缩进,看动辄几十个算子的大查询特别舒服;3.2 又加了 explain codegen,直接把生成的 Java 源码吐出来,对调 Codegen 边界很有用。我自己排查性能问题,第一步永远是 explain formatted,第二步是看 stage 监控里 * 标记的连续性,第三步才是看 task 级指标。
二、Parser:ANTLR4 生成 Unresolved 逻辑计划
Spark 3.x 用 ANTLR4 根据 SqlBaseParser.g4 直接生成词法、语法分析器,得到 parse tree,再由 AstBuilder 把它翻译成 LogicalPlan。这一步产物叫 Unresolved Logical Plan——表名是 UnresolvedRelation、列名是 UnresolvedAttribute,全都是字符串标识,没绑 schema,没类型信息。
为什么要先 unresolved?因为这一步还不知道有哪些 catalog、哪些表,需要 Analyzer 在有 SessionState 的上下文里去查。Parser 阶段只负责语法正确性,SELECT 拼错成 SELCT 在这里就报错;但表名拼错、列名拼错,Parser 是不管的,那是 Analyzer 的事。这一点和 Hive 的 Antlr parser 思路一致,但 Spark 的 g4 维护得更勤,3.x 里 INSERT INTO、MERGE INTO、PIVOT、LATERAL、ANSI 模式下的强类型转换这些都陆续进了语法。AstBuilder 本身是一个 visitor,遍历 parse tree 把每个非终结符翻译成对应的 TreeNode,比如 SELECT ... FROM ... WHERE ... 翻成 Project(Filter(UnresolvedRelation)),结构上和 SQL 的语法树基本同构。理解这点的好处是:报”语法错误”时,能从报错行号倒推是哪条 g4 规则没匹配上;报”分析错误”时,则是 Analyzer 阶段的事,两者日志栈完全不同。
explain 最上面那段就是它:
1 | == Parsed Logical Plan == |
撇号 ' 就是 unresolved 的标记,对应 UnresolvedAttribute,没 exprId、没 dataType。
三、Analyzer:用 Catalog 把”字符串”绑成”实体”
Analyzer 的输入是 Unresolved 计划,输出是 Resolved 计划。它靠 SessionCatalog(Hive 元数据或 V2 Catalog)做四件事:
| 解析对象 | 做的事 | 触发报错 |
|---|---|---|
| 表名 | ResolveRelations:UnresolvedRelation -> Relation 带实际 schema |
table not found |
| 列名 | ResolveReferences:UnresolvedAttribute -> AttributeReference 带 exprId |
cannot resolve column |
| 函数 | ResolveFunctions:函数名 -> Expression 或聚合函数 |
undefined function |
| 类型 | TypeCoercion:隐式类型转换,如 int+double 自动 promote |
type mismatch |
Analyzer 自己也是一个 RuleExecutor[LogicalPlan],跑一批 resolve 规则,跑到全部 unresolved 消失为止。跑完后 analyzed 计划里所有撇号消失:
1 | == Analyzed Logical Plan == |
这里有个细节常被忽略:SubqueryAlias 是为了给表起一个逻辑别名,保证 a#1 这种 exprId 在多表自连接时唯一。类型提升也藏在这步,c > 10 如果 c 是 long,10 会被 promote 成 long,避免后续比较时反复转换。Analyzer 解决的是”对不对”的问题,Optimizer 才解决”快不快”的问题。值得提的是 Catalog 的两层结构:V1 的 SessionCatalog 走 HiveMetastoreClient,元数据兼容 Hive;V2 的 TableCatalog 接口是 3.0 后的新设计,支持多 catalog、命名空间隔离,Iceberg/Delta/Hudi 都基于 V2 接入。Analyzer 在解析 UnresolvedRelation 时会按 spark.sql.catalog.<name> 配置决定走 V1 还是 V2,两条路径产物都归一到 LogicalPlan,对下游透明。迁移到 V2 后,分区裁剪、谓词下推的契约更清晰,但也踩过 V2 catalog 在某些版本对 ALTER TABLE 支持不全的坑。
四、Optimizer:RBO 与 CBO 双驱动
这是 Catalyst 的心脏。Optimizer 输入 Resolved 计划,输出 Optimized 计划。它跑两套规则:RBO(Rule-Based) 和 CBO(Cost-Based),两者都通过 RuleExecutor 的 fixPoint 迭代执行。
TreeNode 与 RuleExecutor 的 fixPoint
LogicalPlan 和 Expression 都继承自 TreeNode,提供 transformUp/transformDown——本质是后序/前序遍历,用偏函数把匹配的子树替换掉,没匹配的原样返回。RuleExecutor 把规则按 Batch 分组,每个 batch 跑到不动点,伪码大概是:
1 | // RuleExecutor 关键循环(简化) |
maxIterations 默认 100 ,跑超会 warning 但不报错。Optimizer 内部把规则切成多个 batch,顺序大致是:Finish Analysis(移除 AnalysisBarriers 等)、Substitution(去冗余算子)、Operator Optimization Before Inferring Filters、Infer Filters、Operator Optimization After Inferring Filters、Early Filter and Projection Push-Down、Join Reorder、DecideOutputRelation。每个 batch 独立跑到不动点,前一个 batch 的产物是后一个的输入。规则之间可能反复触发:PredicatePushDown 把 Filter 推下去后,ConstantFolding 又把常量折叠掉,反过来又给 PredicatePushDown 新机会,直到没有任何规则再产生变化。这也是为什么写自定义 Rule 必须幂等——不幂等的规则会让 batch 永远不收敛,触发 maxIterations warning,等于每条 SQL 多烧一截 CPU。
RBO 核心规则
| 规则 | 作用 | 我的实战体感 |
|---|---|---|
PredicatePushDown |
把 Filter 推到数据源/叶子节点 | Parquet 下推后 IO 减少 60% |
ConstantFolding |
编译期算常量,1+1 -> 2 |
干掉 UDF 误用引起的常量表达式 |
ProjectionPruning |
只读用到的列 | 列裁剪,省 IO 的最大头 |
NullPropagation |
isnull(null) -> true |
避免 null 在表达式里反复求值 |
BooleanSimplification |
a OR true -> true |
动态 SQL 拼出来的复杂条件很顶用 |
PushDownPredicates(Aggregate) |
having 能下推的转 where |
配合分区裁剪效果显著 |
ColumnPruning(与 ProjectionPruning 协同) |
把不需要的列从子树剪掉 | 配合向量化 reader 收益叠加 |
PredicatePushDown 有两层:Catalyst 层把 Filter 算子下移,Source 层把 Filter 翻成 Parquet/ORC 的 pushdown 谓词。两层配合,扫 Parquet 时能跳过整个 row group,省的是磁盘 IO 和解码 CPU,这是大数据作业最大的成本项。
CBO 与统计信息
RBO 是”语法匹配就优化”,不关心数据量。但 A join B join C,A 1MB、B 100GB、C 10GB,join 顺序就决定生死。CBO 就是干这个:
1 | spark.sql.cbo.enabled=true |
CBO 依赖 ANALYZE TABLE t COMPUTE STATISTICS(表级)和 COMPUTE STATISTICS FOR COLUMNS(列级)收集的行数、列基数、直方图。没统计信息它会 fallback 到 RBO 或 HDFS 文件大小估算。直方图用等高桶(equi-height histogram,默认 254 桶 ),对范围谓词 > '2026-01-01' 的选择率估算很关键;没有直方图时 CBO 只能用 NDV 平均估算,偏斜列误差能到几倍。我踩过坑:一张表迁了分区没收统计,CBO 估算行数偏小,选了 BroadcastHashJoin 结果 map 端 OOM,收完统计才换回 SortMergeJoin 。所以团队约定:每次大分区变更后强制重跑 ANALYZE,否则 CBO 反而比 RBO 更危险。另外 CBO 的代价模型目前只算 cardinality,不真正考虑 IO/CPU 成本,把它当成”join 顺序决策器”而不是万能优化器更合适。
五、物理计划生成:SparkPlanner 的 Strategies
Optimizer 出来还是逻辑计划。SparkPlanner 用一组 Strategy 把逻辑算子翻译成物理算子,比如 Join 翻成 BroadcastHashJoin / SortMergeJoin / BroadcastNestedLoopJoin / CartesianProduct 中的某一个。
关键策略:
JoinSelection:按表大小与 hint 选 join 物理实现。小表 <spark.sql.autoBroadcastJoinThreshold(默认 10MB )就走 BroadcastHashJoin;等值连接且任一侧可广播走 Broadcast,否则等值连接走 SortMergeJoin(要求两侧有序,由 EnsureRequirements 补 Sort);非等值连接只能走 BroadcastNestedLoopJoin 或 CartesianProduct,性能最差。/*+ BROADCAST(t) *///*+ MERGE(t) */这类 hint 也是在这里生效,会绕过代价估算强制选择。Aggregation:HashAggregate优先,内存不够 fallbackSortAggregate。FileSourceStrategy:把LogicalRelation翻成FileSourceScanExec,同时把可下推的谓词、需要的列交给 source 层。
物理计划生成后还有 preparedForExecution,跑 EnsureRequirements——给 SortMergeJoin 补 Sort、给 Exchange 补 Shuffle、给 Broadcast 补 BroadcastExchange——把物理计划”补齐”成可执行形态。这一步很关键,因为前面 Strategy 阶段只决定 join 类型,不保证输入有序,SortMergeJoin 要求两边按 key 排好序,缺的 Sort 就在这里补。这一步还会处理 Distribution(数据分布要求)与 Ordering(排序要求)的匹配:如果一个算子的子计划分布不满足要求,就插入 Exchange;排序不满足就插入 Sort。理解这一步的价值在于,看 explain 时如果发现 Sort 出现得很意外,往往是上游 join 选型或分区策略导致的,根因在前面而不是这个 Sort 本身。
六、whole-stage codegen:把一串算子折成一个函数
这是 Spark 2.0 引入、3.x 持续打磨的核心加速器。物理计划里相邻的几个算子如果不跨 shuffle,会被折叠进同一个 Java 方法,消除虚函数调用与 boxing。从执行模型上,它把经典的 Volcano 迭代器模型(next() 套 next())压成了 pipeline 化的循环:
1 | 物理计划片段 |
explain 里 * 前缀就是 codegen 标记,数字是 stage 编号。explain codegen 直接把生成的 Java 源码打出来:
1 | == Final Plan == |
迁移那次让我印象最深:Spark 2.4 同一查询 codegen 出来是 14 段,3.3 优化成 9 段,原因是 3.x 把 Filter + Aggregate 也折进了同一 stage,虚函数调用少了 ~40% 。Codegen 的实现是 doProduce/doConsume 调用链:叶子算子(Scan)调 doProduce 生成”产生一行”的代码,父算子把”消费一行”的逻辑注入进去,层层嵌套,最后拼成一个大的 while 循环。但 Codegen 不是万能:跨 shuffle 切断(Exchange 是 codegen 边界)、某些算子(如 SortAggregate 的 sort 阶段、HashJoin 的 build 侧需要物化整张 hash 表)不支持 codegen,会被切成多段;UDF 太复杂、生成的方法超过 Janino 的单方法 64KB 限制也会 fallback 回 interpreted 模式。看 explain 时如果发现一个本应连起来的 pipeline 里夹杂了不带 * 的算子,多半就是 codegen 塌陷了,这时候优化方向就变成了”消除那个不支持 codegen 的算子”——比如把 SortAggregate 换成 HashAggregate,把 sort merge join 换成 broadcast。
GenerateUnsafeProjection
Codegen 背后有一组 Generate* 类,对应不同表达式求值器:
GenerateUnsafeProjection:把表达式求值编译成直接写 UnsafeRow 的字节,零 boxing。GenerateMutableProjection:可变 row,多用于聚合中间态。GenerateOrdering:编译 sort comparator。GeneratePredicate:编译 Filter 谓词。
UnsafeRow 是 Spark 自己的二进制行格式,定长 8 字节槽 + 变长区,getInt(2) 直接是 Platform.getLong(base, offset + 2*8) 级别的内存访问。这一层把 JVM 对象开销彻底干掉,是 Spark 处理效率能压 Hive 一头的底层原因。Hive 默认走 Text 或 Java 对象,光 boxing 和 GC 压力就拉开了差距。
七、运行时向量化:Parquet/ORC 的 columnar batch
逻辑上 Spark 是行模型(InternalRow),但读 Parquet/ORC 时可以走向量化 reader,一次读一批(一个 ColumnarBatch,默认 1024 行 ),按列连续存放。FileSourceScanExec 用 *(1) ColumnarToRow 把列式 batch 转回行模型喂给 codegen:
1 | spark.sql.parquet.enableVectorizedReader=true |
向量化 reader 的收益来自两点:CPU cache 命中率(按列连续读,prefetch 友好,不再被无关列污染 cache line)、SIMD 友好(同类型批量运算,JIT 后能用 AVX 指令)。在向量化路径上,读出来的 ColumnarBatch 里的每一列是 ColumnVector,可以是 on-heap 的 byte 数组,也可以是 off-heap 的直接内存。我曾对一张 200 列的大宽表只读 5 列做聚合,开 vectorizedReader 比 row reader 快 2.1 倍 ,瓶颈从 CPU 直接挪到 IO。但要留意:向量化 reader 只在读侧,codegen 后的运算仍是行模型,ColumnarToRow 本身有开销;如果查询只是简单 select 全表全列,向量化收益会缩水。另外某些复杂类型(嵌套 struct、map)在某些版本上会 fallback 到 row reader,排查时可以临时关掉看是否回退。Spark 3.x 还在做向量化表达式求值(ColumnarToRow 之后的运算也保持列式),但目前为止行模型 + codegen 仍是主力路径,向量化更多是”读侧加速器”。
八、自定义 Catalyst Rule:把优化器当扩展点
Catalyst 最被低估的能力是可扩展。比如我们有一类报表,CAST(date_str AS DATE) 后跟一个常量比较,希望提前折叠掉这些常量转换。可以写一个 Rule:
1 | import org.apache.spark.sql.catalyst.rules.Rule |
注意三点:一是 transformDown/transformExpressionsUp 的方向选对,下推类规则一般 down,折叠类一般 up,方向不对可能漏掉嵌套表达式;二是 Rule 必须幂等,跑两次和跑一次结果一样,否则会拖死 fixPoint 收敛;三是 extraOptimizations 是开发态入口,正式上线最好走 SparkSessionExtensions 注入,通过 withExtensions 配置,这样能进生产配置而不是硬编码在代码里。生产里我建议先在测试环境跑、看 spark.sql.optimizer.maxIterations 日志确认迭代次数合理再上,我见过一个非幂等规则把单 SQL 解析时间从 200ms 拉到 3s 的案例,定位手段就是开 org.apache.spark.sql.catalyst.rules 的 DEBUG 日志,看每轮 batch 跑了多少次。还有一个反模式:在 Rule 里做 IO 或 RPC(比如查外部配置中心决定下推策略),这会把优化器拖成毫秒级,Rule 必须是纯函数,需要的上下文应该提前物化。
九、与 Hive SQL 的差异
把团队从 Hive 迁到 Spark SQL 时,下面这张表我给团队讲过很多遍:
| 维度 | Hive SQL | Spark SQL |
|---|---|---|
| 优化器 | 简单 RBO,无 CBO(4.0 才引入更完善 CBO) | Catalyst,RBO+CBO+AQE |
| 执行引擎 | MapReduce / Tez | DAG + Stage |
| 表达式执行 | 解释执行 | whole-stage codegen |
| 行格式 | Java 对象/Text | UnsafeRow 二进制 |
| 列式读 | ORC Vectorized(Hive 自带) | Parquet/ORC 向量化 reader |
| 函数语义 | 强 Hive 兼容 | 兼容 Hive 模式,靠 spark.sql.hive.* 开关 |
| 谓词下推 | Storage Handler 层 | Catalyst + Source 层双下推 |
| 中间结果落盘 | MR 每个 stage 落 HDFS | shuffle 落本地盘,可 cache 内存 |
实战中迁移最容易踩的几个坑:一是 Hive 的 INSERT OVERWRITE 语义与 Spark 略不同(Spark 写临时目录再 rename,分区清理和锁的时机不一样,并发写同一分区会冲突,建议配合 spark.sql.sources.partitionOverwriteMode=dynamic 控制);二是 Hive 某些 UDF 在 Spark 下走 interpreted 模式,codegen 不生效,性能反而比 Hive 还差——这种要么重写 UDF 为 Catalyst 原生表达式,要么接受慢;三是 LEFT JOIN 的 ON 与 WHERE 在 Spark 下语义和 Hive 一致但优化器处理时机不同,过滤条件下推深度有差异,复杂多表 join 后结果对不上的情况要查这;四是 Decimal 精度:Hive 与 Spark 的隐式精度提升规则不完全一致,迁库时涉及金额的报表要逐张对账,必要时显式 CAST 锁定精度。
小结
Catalyst 把 SQL 从”字符串”一路编译到”Janino 生成的 Java 字节码”,每一步都有扩展点:Parser 可改 g4、Analyzer 可挂 Catalog、Optimizer 可加 Rule、Planner 可加 Strategy、Codegen 可加 CodegenFallback 之外的扩展。这是 Spark SQL 能压过 Hive 的根本,也是 Spark 能把 SQL、DataFrame、Dataset 三套 API 统一到同一执行路径的根本——DataFrame 在底层就是一棵构造好的 LogicalPlan,绕过 Parser 直接喂给 Analyzer,享受和 SQL 完全一样的优化。这也是后续 AQE(Adaptive Query Execution)能登场的舞台——AQE 本质就是让优化器在执行期、拿到真实 shuffle 数据量后再跑一轮,把 StaticPlan 阶段估不准的 join 顺序、分区数、倾斜处理重新决策。
下一篇我会专门写 AQE 与数据倾斜:为什么 spark.sql.adaptive.enabled=true 一行参数能把倾斜 join 从 6 小时压到 40 分钟 ,以及 AdaptiveSkewJoin 拆分逻辑、DynamicPartitionPruning 的运行期裁剪、CoalesceShufflePartitions 的合并阈值怎么调,把这条”运行时再优化”的链路也拆透。
