文章摘要
飞宇 GPT

HDFS 小文件问题:成因、影响与生产治理实战

引言

在维护某数千万级文件量的大数据集群时,最让我刻骨铭心的一次故障,是凌晨 NameNode Full GC 导致整个 HDFS 不可写长达十几分钟。复盘之后根因并不复杂——某条新上线的业务流把 Kafka 数据按分钟粒度切分落 HDFS,半个月内往集群灌进去了将近一亿个小文件。小文件问题(Small Files Problem)是 Hadoop 生态最经典、也最顽固的痛点,几乎每一个上了规模的 HDFS 集群都会和它反复交手。这篇复盘,我想把这几年治理小文件踩过的坑、沉淀下来的分层方案,系统地写出来,给同样在和它搏斗的同学一个参照。

一、什么是小文件、为什么是问题

1.1 定义

所谓小文件,Hadoop 社区给出的口径是:文件大小远小于 HDFS 的 block size(默认 128MB,很多生产集群调到 256MB)。按这个口径,一个 1KB 的文件和一个 10MB 的文件,本质上都是”小文件”——它们都浪费了一个完整的 block 额度,却只贡献了极小的存储。

理解这个问题的关键在于 HDFS 的元数据架构。HDFS 是一个典型的”元数据 / 数据”分离架构:NameNode(NN)只维护命名空间的元数据(目录树、文件到块的映射、块到 DataNode 的映射),真正的数据块躺在 DataNode 上。这意味着不管文件本身是 1KB 还是 128MB,在 NameNode 内存里占用的元数据开销几乎是一样的。

1.2 元数据内存估算

社区和 Cloudera 都给过经验值:每个文件/块对象在 NameNode 内存中大约占用 150–300 字节(取决于版本和字段数)。但这是”裸对象”的估算,实际运行在 JVM 堆里还要考虑对象头、对齐填充、引用链等开销,业界普遍按 每个文件 ≈ 600B–1KB 的真实占用做容量规划

举一个具体的账。假设一个集群有:

  • 文件数:2 亿
  • 平均每文件 1.2 个块(小文件多,副本未均摊)
  • 单条元数据真实占用按 800B 估算

那么仅元数据一项,NameNode 堆内存占用大约是:

1
2 亿文件 × 1.2 块 × 800B ≈ 192 GB

再加上 namespace 的目录树 inode、租约、快照等结构,NN 堆内存轻松突破 200GB。这就是为什么小文件多了之后,NN 的 GC 会变得极其痛苦——一个几百 GB 的堆,做一次 Mixed GC 的停顿可能是秒级。

经验法则:单集群文件数控制在 1 亿以内是相对健康的,超过 3 亿就要认真治理,超过 5 亿基本是”病态”

二、成因分析:小文件从哪儿来

把锅扣给”业务乱写”很容易,但真正治理它,必须把成因分类清楚。我在项目里见到的小文件来源,大致可以归为五类。

2.1 高频小批写入

最朴素也最常见。业务侧为了”实时性”,每分钟甚至每几秒就 flush 一个文件到 HDFS。例如埋点日志按分钟滚动、监控指标按批落盘。这些文件单个可能就几百 KB,但频率极高,一天就能产生几十万个小文件。

流处理引擎是重灾区。Flink 把状态按 checkpoint 持久化到 HDFS(如 FsStateBackend 或 RocksDB 的增量 checkpoint),每个 subtask 每次 checkpoint 都会写若干 sst 文件。一个 200 并发的 Flink 作业,checkpoint interval 设为 1 分钟,一天下来的小文件数量是几何级数增长的

1
2
3
// Flink 典型配置,checkpoint interval 过短是小文件爆炸的常见来源
env.enableCheckpointing(60000); // 1 分钟 <!-- 校准:请按真实经历核实/替换 -->
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));

2.3 Hive 动态分区过度切片

Hive 动态分区是另一个”小文件工厂”。一旦分区键粒度过细(比如 dt/yyyy-MM-dd/HH/mm),或者 reducer 数量配置不合理,每个 reducer 都会在每个分区下写一个独立的文件。结果是:

  • 分区数 × reducer 数 = 文件数

一张按分钟分区的表,一天有 1440 个分区,10 个 reducer 就是 14400 个文件,且每个都极小

2.4 Kafka 落地

Kafka → HDFS 的落地方案(自研消费程序、Flume、Camus、Landoop 等)如果攒批策略不对,consumer poll 一批就写一个文件,文件大小完全取决于 poll 间隔和流量。低峰期流量小,写出来的就是几 KB 的小文件。

2.5 日志按时间切片

很多离线作业把日志按小时甚至按分钟切分后归档,配合压缩格式不当(比如逐文件 gzip 而非可拆分的列式格式),既产生了小文件,又失去了后续并行计算的能力。

三、影响量化:小文件到底伤了什么

讲清楚影响,才能争取到治理的资源和优先级。小文件的危害是全方位的,我从四个维度量化。

3.1 NameNode 内存与 RPC 压力

如前文所述,元数据内存占用是直接且可估算的。除此之外,NN 还要为每个文件维护租约(Lease)、副本状态机等,RPC 队列也会因为 getListinggetFileInfo 等元数据操作变长。客户端一次 listStatus 在百万文件目录下可能要花几秒甚至几十秒。

3.2 MapReduce / Spark 启动开销

这是计算侧最痛的点。Hadoop 的输入分片(InputSplit)逻辑是:一个 split 默认对应一个文件(或一个文件的一部分),一个 split 对应一个 Map task。所以小文件多 = split 多 = task 多。

假设有 100 万个 1MB 的小文件要处理,意味着要启动 100 万个 Map task。每个 task 的启动开销(容器分配、JVM 启动、初始化)在 YARN 下大约是几秒到十几秒。即使不算实际计算,光启动开销就是:

1
100 万 task × 2 秒 ≈ 23 天(CPU 时间)

这是典型的”启动风暴”——task 还没干活,资源就已经被调度开销吃光了。

3.3 启动风暴(Task Storm)

集中调度场景下,ApplicationMaster 一次性向 RM 申请成千上万个容器,RM 的调度器压力剧增,整个集群的吞吐会被拖垮。我们见过最夸张的一次,一个作业 8 万个 map task 全部卡在 SCHEDULED 状态,RM RPC 排队超过 30 秒

3.4 NameNode GC 与堆压力

堆内存一大,GC 停顿就成了悬在头顶的剑。G1GC 在百 GB 堆上做 Mixed GC 的停顿可以控制在数百毫秒,但如果对象晋升速率过高(典型于频繁创建/销毁 lease 对象),Mixed GC 可能退化成几秒甚至十几秒的停顿。一旦超过 dfs.namenode.heartbeat.recheck-interval(默认 2 分钟),DataNode 还会被判定为 dead,引发副本补充风暴。

影响维度 小文件场景下的表现 量级
NN 堆内存 元数据膨胀 每文件 600B–1KB
NN RPC listStatus/getFileInfo 变慢 单次秒级
MR/Spark 启动 split 数 ≈ 文件数,task 启动风暴 单 task 2–10 秒
NN GC 堆压力 → 长停顿 → DN 心跳超时 停顿秒级
HDFS 读吞吐 随机读多,DN 磁盘 seek 占比高 吞吐下降数倍

四、诊断手段:先看清病,再治病

治理前先量化。我们项目里固化了一套诊断工具链。

4.1 hdfs fsck

1
2
# 统计整个命名空间的文件数、块数、平均块大小
hdfs fsck / | grep -E "Total files|Total blocks|Average block"

重点关注 Average block size。如果远小于 block size(比如只有几 MB),基本可以确诊。

更精细的做法是按目录扫描小文件分布:

1
2
# 统计某目录下小于 10MB 的文件数
hadoop fs -count -h -q -t 10485760 /path/to/dir

4.2 NameNode Web UI

NN 的 Web UI(默认 http://nn:9870,2.x 老 版本是 50070)首页就有 Files and Blocks CountTotal FileSystem Capacity。盯住 Files 这个数字的趋势,是发现小文件增长最直接的手段。

4.3 JMX 指标

NN 暴露的 JMX 是监控的核心。几个关键指标:

1
curl http://nn:9870/jmx?qname=Hadoop:service=NameNode,name=FSNamesystemState

需要关注:

  • NumFiles —— 文件总数
  • BlocksTotal —— 块总数
  • CapacityUsed —— 实际存储
  • FilesTotal / CapacityUsed 的比值 —— 单位存储承载的文件数,反映”小文件密度”

4.4 监控告警阈值

我们项目里沉淀的告警经验值(仅供参考):

指标 告警阈值 说明
NN 堆内存使用率 > 70% 留 buffer 给 GC
单目录文件数 > 100 万 listStatus 性能拐点
集群总文件数周增长率 > 5% 排查新增小文件源
平均 block size < 32MB 确诊小文件

五、分层治理方案

小文件治理没有银弹,必须写入侧、存储侧、运行时、离线合并四层齐下。

5.1 写入侧:从源头掐住

这是性价比最高的一层。一旦写入规范定下来,后续的治理压力会小很多。

(1)攒批写入。 任何写 HDFS 的程序,都应该有一个”攒到一定大小或一定时间再 flush”的策略。推荐的经验值:单文件不小于 128MB 或一个 block size。Kafka 落地程序尤其要配好攒批参数。

(2)输出列式格式。 ORC / Parquet 相比 TextFile / SequenceFile,不仅压缩率高、可拆分,还天然鼓励”一个大文件覆盖多个 block”的写入习惯。Hive 表强烈建议默认 ORC:

1
2
3
4
5
6
7
8
9
<!-- hive-site.xml -->
<property>
<name>hive.default.fileformat</name>
<value>ORC</value>
</property>
<property>
<name>hive.exec.orc.compression.strategy</name>
<value>COMPRESSION</value>
</property>

(3)控制 Flink checkpoint interval。 RocksDB 增量 checkpoint 是小文件大户。生产环境 checkpoint interval 不建议低于 5 分钟,且要配合 state.backend.incremental 和合理的 sst compaction

(4)Hive 合并配置。 Hive 自带的小文件合并参数,能在作业末尾自动把小文件 merge 成大文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.mapredfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.smallfiles.avgsize</name>
<value>16000000</value> <!-- 16MB 以下触发合并 <!-- 校准:请按真实经历核实/替换 --> -->
</property>
<property>
<name>hive.merge.size.per.task</name>
<value>256000000</value> <!-- 合并后单文件目标大小 <!-- 校准:请按真实经历核实/替换 --> -->
</property>

(5)分区粒度收敛。 把分钟级分区收敛到小时级或天级,是消除”分区切片”类小文件的最快路径。

5.2 存储侧:HAR 与纠删码

写入侧无法完全避免的归档类冷数据,可以走存储侧治理。

HAR(Hadoop Archive)。 HAR 是 Hadoop 早期提供的小文件归档方案,把一批小文件打包成一个 .har 文件,对外仍保持原目录结构可读。优点是显著降低 NN 元数据(一堆小文件 → 一个 HAR 文件);缺点是不可变、不支持追加,且访问有额外开销。适合归档场景:

1
2
# 把 /data/logs 归档成 /archive/logs.har
hadoop archive -archiveName logs.har -p /data/logs /archive

纠删码(Erasure Coding, EC)。 Hadoop 3.x 引入 EC,主要目标是省存储(RS-3-2 比三副本省约 50%)。但 EC 对小文件是把双刃剑:EC 本身不减少 NN 元数据,而且 EC 的条带化写入对随机写不友好,更适合”已经是大文件”的冷数据。把一堆小文件直接放 EC 目录并不能解决元数据压力,反而会增加读放大。

结论:HAR 用来压冷小文件元数据,EC 用来省大文件存储,别混用。

5.3 运行时:CombineFileInputFormat

面对已经存在的小文件,计算侧的”急救”手段是 CombineFileInputFormat。它能把多个小文件合并成一个 split,从而让一个 Map task 处理多个文件,显著降低启动开销。

1
2
3
4
5
6
7
8
<property>
<name>mapreduce.job.input.format.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>268435456</value> <!-- 256MB,控制单个 split 上限 <!-- 校准:请按真实经历核实/替换 --> -->
</property>

split.maxsize 是核心参数——它决定了多少个小文件会被塞进同一个 split。设小了合并效果弱,设大了单 task 内存压力大。我们通常设成一个 block size 左右

需要注意的是,CombineFileInputFormat 是治标不治本:它降低了计算侧启动开销,但 NameNode 元数据压力依旧存在。

5.4 离线合并作业:Spark 定时合并

这是治理的”重武器”。对于历史小文件堆积的目录,我们写了一个 Spark 定时作业,按分区扫描、合并小文件、写出大文件、再清理旧文件。下面是核心实现。

合并策略

  1. 按分区遍历目标表的所有分区目录。
  2. 对每个分区,统计文件数和总大小;若文件数超过阈值(如 10)或平均文件大小低于阈值(如 32MB),则触发合并
  3. 用 Spark 读取该分区数据,coalesce 到目标文件数(按总大小 / 256MB 估算),以 ORC 格式写出到一个临时目录。
  4. 校验临时目录数据完整(行数比对),通过后原子替换原分区目录(重命名),失败则回滚。
  5. 整个作业幂等:重跑不会产生副作用,因为合并后文件大小满足阈值就不会再触发。

为什么用 coalesce 而不是 repartition

这是合并作业最关键的一个选型。repartition 会触发 full shuffle——所有数据要重新分布,代价极高;而 coalesce(默认 shuffle=false)只是在现有分区上做合并,不触发 shuffle,开销极小。

合并小文件这件事,我们并不关心数据的分布(不需要按 key 重新 hash),只关心”把 N 个小文件变成 M 个大文件”,这正是 coalesce 的天然适用场景。一个直观对比:合并 1TB 数据时,repartition 要把 1TB 全量 shuffle 一遍(网络 + 磁盘 IO),coalesce 几乎零额外开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SmallFileCompactor {

val TargetFileSize: Long = 256L * 1024 * 1024 // 256MB <!-- 校准:请按真实经历核实/替换 -->
val SmallFileThreshold: Long = 32L * 1024 * 1024 // 32MB <!-- 校准:请按真实经历核实/替换 -->
val MaxFilesPerPartition: Int = 10 // 文件数超过此值触发合并 <!-- 校准:请按真实经历核实/替换 -->

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HDFS-SmallFile-Compactor")
.getOrCreate()

val tablePath = args(0) // 例如 hdfs:///warehouse/ods/log
val partitionCol = args(1) // 例如 dt
val retentionDays = args(2).toInt // 只处理最近 N 天之外的分 区 <!-- 校准:请按真实经历核实/替换 -->

import spark.implicits._

// 1. 枚举所有分区目录
val partitions = listPartitions(spark, tablePath, partitionCol, retentionDays)

partitions.foreach { case (partitionValue, partitionPath) =>
val files = listFiles(spark, partitionPath)
val totalSize = files.map(_.getSize).sum
val fileCount = files.size
val avgSize = if (fileCount > 0) totalSize / fileCount else 0L

// 2. 判断是否需要合并
val needCompact = fileCount > MaxFilesPerPartition || avgSize < SmallFileThreshold
if (!needCompact) {
println(s"[SKIP] $partitionPath: fileCount=$fileCount, avgSize=$avgSize")
} else {
println(s"[COMPACT] $partitionPath: fileCount=$fileCount, avgSize=$avgSize")

// 3. 估算目标文件数,用 coalesce 而非 repartition(避免 shuffle)
val targetFiles = math.max(1, (totalSize / TargetFileSize).toInt)

val stagingPath = s"$partitionPath.__staging_${System.currentTimeMillis()}"

// 4. 读取 -> coalesce -> 写出 ORC(保持原 schema)
val df = spark.read.orc(partitionPath)
val rowCountBefore = df.count()

df.coalesce(targetFiles)
.write
.mode("overwrite")
.option("compression", "zlib")
.orc(stagingPath)

// 5. 校验:合并前后行数必须一致
val rowCountAfter = spark.read.orc(stagingPath).count()
require(rowCountBefore == rowCountAfter,
s"Row count mismatch: before=$rowCountBefore, after=$rowCountAfter, abort.")

// 6. 原子替换:先备份旧目录,再重命名 staging -> 正式目录
val backupPath = s"$partitionPath.__backup_${System.currentTimeMillis()}"
renameHdfs(partitionPath, backupPath)
renameHdfs(stagingPath, partitionPath)
deleteHdfs(backupPath) // 校验通过后删除备份

println(s"[DONE] $partitionPath: $fileCount -> $targetFiles files")
}
}

spark.stop()
}

// 简化的 HDFS 文件列举 / 重命名 / 删除工具方法,省略实现细节
def listPartitions(spark: SparkSession, path: String, col: String, retention: Int): Array[(String, String)] = { ??? }
def listFiles(spark: SparkSession, path: String): Array[FileStatus] = { ??? }
def renameHdfs(src: String, dst: String): Unit = { ??? }
def deleteHdfs(path: String): Unit = { ??? }
}

几点工程上要特别强调的细节:

  • 幂等:合并判定基于”文件数 / 平均大小”,合并后再次跑同一分区会被 SKIP,所以这个作业可以反复重跑、补跑。
  • 分区保留retentionDays 参数保证只处理冷分区(比如 7 天前),避免和在线写入抢锁,也避免合并正在写的分区导致数据丢失
  • 原子替换 + 备份:合并写出到 __staging,校验通过后先把原目录改名为 __backup,再把 staging 改名为正式目录,最后删 backup。任何一步失败都能回滚。
  • 行数校验:合并前后的行数必须严格相等,这是数据正确性的底线。生产中我们额外还做了主键唯一性校验。
  • coalesce 而非 repartition:再强调一遍,这是这个作业能低成本跑起来的关键。

这个作业我们挂在 DolphinScheduler 上每天凌晨跑一次,半年时间把集群文件数从 4 亿降到了 1.2 亿,NN 堆内存占用下降约 60%

六、长效机制:把治理变成纪律

离线合并只是”还债”,要避免旧债没完没了地堆,必须有长效机制。

(1)写入规范 SLA。 我们定了一条硬规范:任何写 HDFS 的程序,单文件不得小于一个 block size(除特殊场景审批)。新作业上线前 review 写入逻辑,不达标不允许发布。

(2)监控大盘。 在 Grafana 上建了小文件治理大盘,核心指标包括:集群总文件数、平均 block size、Top 100 大目录文件数、NN 堆内存使用率。趋势一旦抬头,立刻定位到业务线。

(3)责任到业务线。 每条业务流的小文件增长归口到具体的团队和 owner,治理效果纳入季度 review。技术手段再好,最终拼的还是组织纪律。

(4)定期”还债”窗口。 每月一个固定的治理窗口,强制合并 Top N 小文件目录,避免技术债积累到爆发。

小结

回头看这些年和小文件的反复较量,我最深的体会是:小文件治理从来不是一次性的技术动作,而是一种工程纪律。

它不是你写一个合并脚本就万事大吉的事情——今天合并完,明天新的写入又会产生小文件。真正解决它,要靠”写入侧规范 + 存储侧归档 + 运行时优化 + 离线合并 + 长效监控”这一整套机制常年运转。NameNode 的元数据架构决定了 HDFS 天生不适合海量小文件,理解这个底层约束、把它转化为团队的开发习惯,比任何黑科技都管用。

把治理做成纪律,小文件就不再是灾难,而是日常。