文章摘要
飞宇 GPT

引言:一次 hdfs dfs -put 背后发生了什么

我第一次认真读 HDFS 写路径源码,是因为某集群上一次 flume 写入在 close 时 hang 了 30 秒。表面看是网络抖动,但只有真正把 DFSOutputStream 的 data queue / ack queue、packet 切分、pipeline 重建这一条链路在脑子里跑一遍,才能判断到底是 client 侧重试、DataNode 侧心跳超时,还是 NameNode 侧租约没释放。本篇不重复架构篇讲过的 NameNode/DataNode 角色,而是把放大镜对准”一个字节从 client 走到三副本落盘”的整条写路径,到 packet 头、checksum、ack seq 的粒度。

我们以 hdfs dfs -put localfile /data/x.log 为例,它内部走的是 FileSystem.create(...)DFSOutputStream,和 Spark/Flume 用 Java API 写 HDFS 是同一条路。所以理解了这条路径,所有上层写行为都能解释。

建立写流程:create → NameNode 条目 → pipeline 分配

写路径的第一步不是发数据,而是 client 通过 RPC 调用 NameNode 的 create 方法。NameNode 在这里要做四件事,缺一不可:

  1. 权限与路径校验:检查父目录存在、写权限、是否已有同名文件(除非 overwrite)。
  2. 创建 INodeFile under construction:在目录树里挂一个新文件 inode,状态标记为”构建中”。这个状态非常关键——它意味着文件已经出现在命名空间,但还没有任何完整的 block,只有正在写的 block。
  3. 租约授予:NameNode 给 client 发一个 Lease(租约),软超时 60s、硬超时 1h 。租约是 HDFS 实现”写者互斥”的核心:同一个文件只能有一个持有写租约的 writer,别人想写得先回收租约。这也是很多 client crash 后 “Cannot obtain block length” 报错的根因——租约没还,NameNode 要等硬超时才能强制回收。
  4. 分配 blockId 与 pipeline 成员:这是写路径最有意思的一步。

NameNode 不是随便挑三个 DataNode。它会调用 BlockPlacementPolicyDefault 的机架感知策略,目标副本放置顺序大致是:

  • 第 1 个副本:优先和 client 同节点(写本地省一跳),client 不在集群内则随机选一个;
  • 第 2 个副本:放在不同机架(rack-aware,防整架故障);
  • 第 3 个副本:与第 2 个同机架不同节点(降低跨架带宽)。

这个顺序 dn1, dn2, dn3 就是 pipeline 的数据流顺序。注意 NameNode 此时只返回这个有序列表,pipeline 还没真正建立,连接是 client 自己去连的。

返回给 client 的是 LocatedBlock,里面有 blockIdgenerationStampExtendedBlock 信息和这三个 DataNode 的地址。client 拿到后构造 DFSOutputStream,进入下一阶段。

Pipeline 建立与数据流

这是写路径的真正核心。我画一张 pipeline 的数据流图:

1
2
3
4
5
6
7
8
9
10
11
12
13
       (1) create() RPC
Client ──────────────────────► NameNode : 分配 blockId, dn列表, Lease
◄────────────────────────── (LocatedBlock)

(2) 建立 pipeline (DataXceiver)
Client ──── writeBlock(dn1) ──► dn1 ──── writeBlock(dn2) ──► dn2 ──── writeBlock(dn3) ──► dn3
◄───── pipeline ack (from dn3 回流) ─────────────────────────────────────────────────────

(3) 数据流 (单向往下游,全双工 ack)
data packet data packet data packet
Client ───────► dn1 ───────► dn2 ───────► dn3
▲ ack seq (dn3 起逐跳回) │
└────────────────────────────────────────────────────────┘

Pipeline 连接建立

client 不是直接连三个 DataNode。它向 dn1 发起 writeBlockDataXceiver 请求,请求里带着完整的 pipeline 列表 [dn1, dn2, dn3]。dn1 收到后,自己作为第一个节点,再向 dn2 发同样的 writeBlock,dn2 再向 dn3 发。一直到 dn3(最后一个)建立成功,ack 沿着 dn3 → dn2 → dn1 → client 回流,pipeline 才算建好。这是一个链式握手,任何一环失败,pipeline 建立就失败,client 会回到 NameNode 要求换节点。

Packet 切分:data queue 与 ack queue

client 侧的 DFSOutputStream 内部有两个关键队列,这是理解写行为所有”卡顿/丢失/可见性”问题的钥匙:

  • data queue:待发送的 packet,消费者是往 dn1 发数据的 sender 线程。
  • ack queue:已发出但还没收到 ack 的 packet,消费者是 ack processor 线程。

应用层 write(bytes) 进来的数据,先被攒到 BufferedOutputStreamchunkSize 通常是 512B),攒够一个 packet 才入 data queue。默认 packet size 是 64KBio.bytes.per.checksum × 128 左右,见 DFSClient.conf。这个”攒包”动作解释了一个常见现象:写完不 flush,对端 reader 可能完全看不到数据。

Packet 结构:到字节级

一个 packet 由 header + checksum + data 组成,我把它列成表:

字段 长度 含义
packetHeaderLen (4B) 4 header 自身长度
offsetInBlock (8B) 8 该 packet 在 block 中的字节偏移
seqNo (8B) 8 packet 序号,单调递增,ack 比对的就是它
lastPacketInBlock (1B) 1 是否是最后一个 packet(close 时)
dataLen (4B) 4 后面实际数据长度
checksums N×4B 每个 chunk 一个 CRC32C,每 chunk 512B
data dataLen 真正的业务字节

这里有个细节:checksum 是按 512B 的 chunk 算的,不是按 packet 算。所以一个 64KB 的 packet 有 128 个 checksum。DN 落盘时把数据和 checksum 分开存(blk_xxx.datablk_xxx.meta),读时按 chunk 校验,坏了只丢一个 chunk 而不是整个 block。

Ack 机制:从 pipeline 尾端回传

数据是单向从 client → dn1 → dn2 → dn3,但 ack 是反向从 dn3 开始回流。dn3 收到一个完整 packet 并校验 checksum 通过后,向 dn2 回 ack;dn2 校验自己的副本也 OK,再向 dn1 回;dn1 回给 client。

ack packet 里带的就是 seqNo,client 的 ack processor 拿到后,从 ack queue 里把所有 seqNo ≤ 该值的 packet 移除。注意是”≤”,因为 ack 是累积的——收到 ack(seqNo=100) 意味着 1~100 全部三副本都写成功了。

如果 ack 超时(默认 dfs.client.datanode-restart.timeout / 写 socket 超时,约 60~120s) ,或 checksum 不匹配,对应的 packet 会留在 ack queue,触发 pipeline 故障处理流程。

这里要强调一个常被忽略的点:ack 代表三副本都写到了 DN 的 OS page cache,不代表落盘。落到磁盘的 fsync 是另一个语义,下面讲 flush 时再区分。

Flush 语义层级:write / hflush / hsync / close

这是写路径里最容易踩坑、面试也最爱问的部分。HDFS 提供了四级”写完成”的承诺,从弱到强:

API 语义 可见性 持久性
write(byte[]) 只进 client 内存缓冲,可能还没成 packet 不可见 不保证
hflush() 把缓冲切 packet 全部发完并等 pipeline ack 其他 reader 可见 不落盘,DN crash 仍可能丢
hsync() hflush + 让 DN 对副本做 fsync 可见 落盘(DN 进程 crash 不丢,整机掉电看存储)
close() 发 last packet + finalize 可见 + 持久 + NameNode 登记 完整提交

一个典型误区是:很多人以为 hflush 之后数据就稳了。不是。hflush 只保证 packet 流到 DN 的 OS 缓存和 DFSOutputStream 的 ack queue 清空,DN 进程 crash 不丢,但 DN 整机断电,OS page cache 里的数据照样丢。要真正”落盘”,得调 hsync,它会触发 DN 侧 fsync_data,代价是吞吐显著下降(某次 benchmark 里我们测过,每条 record 都 hsync,写吞吐从 200MB/s 掉到 15MB/s)

生产里典型的折中是:N 秒一次 hsync + 中间用 hflush,既控制 crash 时的丢失窗口(RPO),又不把吞吐打没。这正是 Spark Structured Streaming 写 HDFS sink 时 commitsPerBatch / flush 策略要调的东西。

Java API 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://namenode:8020");
FileSystem fs = FileSystem.get(conf);

Path path = new Path("/data/write-demo.log");
try (FSDataOutputStream out = fs.create(
path, (short) 3, 64 * 1024)) { // 副本3, 64KB buffer
for (int i = 0; i < 1000; i++) {
out.write(("line-" + i + "\n").getBytes(StandardCharsets.UTF_8));
if (i % 100 == 0) {
out.hflush(); // 让下游 reader 能读到这批
}
if (i % 500 == 0) {
out.hsync(); // 关键 checkpoint 点,落盘
}
}
}
// close() 触发 last packet + finalize + 归还租约
fs.close();

Pipeline 故障处理:摘除、重建、续写

这是写路径里最”硬核”的一段。假设 dn2 写超时或 socket reset,HDFS 不会直接让整次写失败,而是尝试降级续写

故障检测

每个 packet 发出后会进 ack queue。DataStreamer 线程发现某个 packet 在 ack queue 里停留超过 dfs.client.datanode-restart.timeout(或 socket read 超时抛 IOException),就判定 pipeline 出问题了。

摘除坏节点与重新建 pipeline

client 的处理逻辑大致是:

  1. 标记坏掉的 DN(设为 dn2),把它从 pipeline 摘除,新 pipeline = [dn1, dn3]
  2. 把 ack queue 里的 packet 全部倒回 data queue(这些是没确认的,要重发)。
  3. 调用 NameNode 的 updatePipeline RPC:告诉 NameNode 这个 block 的新副本分布,NameNode 更新 BlockInfoexpectedLocations,并记录 generationStamp 自增。
  4. client 用新的 pipeline 列表重新发起 writeBlock 链式握手,从当前已 ack 的 offset 续写,而不是从头来。

这一步有个关键边界:新的 pipeline 必须满足 minReplication(默认 1) 。如果坏到只剩 1 个副本还能续写,但此时这个 block 处于”under replication”状态,NameNode 会异步补副本。如果连 minReplication 都凑不齐(比如三副本里坏了俩),client 这次 write 直接失败,应用层抛 IOException

updatePipeline 这个 RPC 很精妙——它让 block 的元信息变更和 client 续写解耦:client 不用停,NameNode 后台慢慢补副本。我见过一个真实 case,某集群因为一台 ToR 交换机抖动,dn2、dn3 交替掉,导致一个 block 的 generationStamp 一晚上涨了 47 次,最终文件还是写成功,但 NameNode 上 getGenerationStampV2 计数器飙升,事后看 metrics 才发现。

Block 关闭与 finalize

数据写完,应用调 close()。client 的收尾流程:

  1. DataStreamer 把当前未满的 packet(哪怕不足 64KB)加上 lastPacketInBlock=true 标志发出。
  2. 等最后一个 packet 的 ack 回来,pipeline 上三个 DN 都收到 last packet。
  3. 每个 DN 对该 block 做 finalize:把 block 从 RBW(Replica Being Written)状态转为 FINALIZED,meta 文件落盘,此时该副本可以被 reader 读到完整长度。
  4. client 调 NameNode 的 completeFile RPC(带最后一个 block 的 length)。NameNode 校验所有 block 都满足副本数、都在 FINALIZED 状态,把 INodeFile 从 under construction 转正常,回收租约
  5. NameNode 异步触发 BlockManager 把 under-replicated 的 block 补到目标副本数。

只有 completeFile 返回 true,这次写才算真正落地。如果 client 在 close 前崩溃,文件会停在 under construction 状态,已 hflush 的数据对 reader 可见但租约还在,要等 NameNode 的 LeaseManager 硬超时回收后才能被别人 append。

一致性边界:什么时候会丢数据

把上面的语义汇总成几条实战铁律,这是我踩坑后刻在脑子里的:

  • 未 hflush 的 write,client crash = 全丢。数据还在 client JVM 堆里,没成 packet,没进 pipeline,谁也救不了。
  • hflush 但没 hsync,DN 整机断电 = 丢。OS page cache 没落盘。
  • hsync 之后,三副本里即使坏两个,NameNode 也会从存活的那个副本补齐(前提是该 packet 已 ack)。
  • append 的语义:append 复用同一条 pipeline,但 client 必须先拿到租约;如果上一个 writer 异常退出没 close,append 会触发 recoverLease,等租约回收后才能写。这就是为什么 Spark 任务重启写同一文件常报 Failed to replace a bad datanodeCannot obtain block length 的根因。
  • 可见性 ≠ 持久性getFileLength 在 under construction 时能读到 hflush 过的长度,但这部分数据的持久性仍取决于是否 hsync。

理解了这几条边界,写任何 HDFS sink(Kafka Connect / Flume / Flink checkpoint)都不会再混淆”flush 完了是不是就安全了”。

生产调优:packet、重试、副本与吞吐

最后落到调参。我按影响维度列一张实战清单:

参数 默认 调优建议 权衡点
io.bytes.per.checksum 512B 一般不动 chunk 粒度,影响校验恢复粒度
packet size(writePacketSize 64KB 高吞吐场景可到 256KB 大 packet 减小 ack 开销,但丢一个 packet 重发成本高
dfs.client.block.write.retries 3 网络不稳可调到 5 重试次数,太大掩盖底层问题
dfs.client.write.max.paths-to-cache 10 写大量小文件时调高 影响 client 侧 path→stream 缓存
副本数 3 热数据可 2,冷数据可 2/EC 副本少→ack 路径短→吞吐高,但可靠性下降
dfs.client.block.write.locateFollowingBlock.retries 5 NameNode 抖动时调高 分配下一个 block 的重试

几个我真正在产线改过的:

  1. packet size 调到 256KB:某集群写 1GB 大文件时 ack 包太多,CPU 成了瓶颈,加大 packet 后吞吐从 180MB/s 涨到 320MB/s 。代价是 GC 抖动时丢的更多。
  2. 副本数从 3 降到 2(中间结果目录):写吞吐提升约 30%,但必须配套 EC 或冷备,否则坏一个盘就 under-replication。
  3. 关闭 prefetchdfs.client.write.prefetch 在某些版本会预取下一个 block 的位置,写小文件反而拖慢,关闭后小文件场景 RTT 下降明显

这些数字都是我反复压测得来的,没有万能参数——packet 变大对小文件是负担,副本变少对关键数据是灾难。调优的本质是把”丢多少数据(RPO)/写多快(吞吐)/多稳(可靠性)”这三个轴子按业务显式权衡。

小结

把 HDFS 写路径从 createpipeline ack 串下来,你会发现它本质上是一套**”双队列 + 累积 ack + pipeline 降级续写”**的协议:

  • **双队列(data queue / ack queue)**解决了”边发边确认”的流水线化,ack queue 倒回是故障重发的基石;
  • packet + chunk checksum 把可靠性粒度做到 512B,坏一个 chunk 不毁整个 block;
  • 累积 ack 从 pipeline 尾端回流,保证 ack 语义清晰;
  • updatePipeline + minReplication 让写过程对单点 DN 故障免疫,能续写不中断;
  • write/hflush/hsync/close 四级语义把”可见性”和”持久性”拆开,让上层按需选择。

真正在读路径篇之外把写路径吃透,意味着你能解释为什么 Spark checkpoint 要 hsync、为什么 Flume 的 batch 模式不会丢数据、为什么 client crash 后文件还能 append。这些不是面试八股,是大数据/AI 系统在 HDFS 上构建可靠 sink 的底层地基。下一篇我会拆读路径,把 cache、短路读、EC 读的细节再深挖一遍。