HDFS 写数据全流程:从 create 到 pipeline ack 的一致性语义
引言:一次 hdfs dfs -put 背后发生了什么
我第一次认真读 HDFS 写路径源码,是因为某集群上一次 flume 写入在 close 时 hang 了 30 秒。表面看是网络抖动,但只有真正把 DFSOutputStream 的 data queue / ack queue、packet 切分、pipeline 重建这一条链路在脑子里跑一遍,才能判断到底是 client 侧重试、DataNode 侧心跳超时,还是 NameNode 侧租约没释放。本篇不重复架构篇讲过的 NameNode/DataNode 角色,而是把放大镜对准”一个字节从 client 走到三副本落盘”的整条写路径,到 packet 头、checksum、ack seq 的粒度。
我们以 hdfs dfs -put localfile /data/x.log 为例,它内部走的是 FileSystem.create(...) → DFSOutputStream,和 Spark/Flume 用 Java API 写 HDFS 是同一条路。所以理解了这条路径,所有上层写行为都能解释。
建立写流程:create → NameNode 条目 → pipeline 分配
写路径的第一步不是发数据,而是 client 通过 RPC 调用 NameNode 的 create 方法。NameNode 在这里要做四件事,缺一不可:
- 权限与路径校验:检查父目录存在、写权限、是否已有同名文件(除非 overwrite)。
- 创建 INodeFile under construction:在目录树里挂一个新文件 inode,状态标记为”构建中”。这个状态非常关键——它意味着文件已经出现在命名空间,但还没有任何完整的 block,只有正在写的 block。
- 租约授予:NameNode 给 client 发一个 Lease(租约),软超时 60s、硬超时 1h 。租约是 HDFS 实现”写者互斥”的核心:同一个文件只能有一个持有写租约的 writer,别人想写得先回收租约。这也是很多 client crash 后 “Cannot obtain block length” 报错的根因——租约没还,NameNode 要等硬超时才能强制回收。
- 分配 blockId 与 pipeline 成员:这是写路径最有意思的一步。
NameNode 不是随便挑三个 DataNode。它会调用 BlockPlacementPolicyDefault 的机架感知策略,目标副本放置顺序大致是:
- 第 1 个副本:优先和 client 同节点(写本地省一跳),client 不在集群内则随机选一个;
- 第 2 个副本:放在不同机架(rack-aware,防整架故障);
- 第 3 个副本:与第 2 个同机架不同节点(降低跨架带宽)。
这个顺序 dn1, dn2, dn3 就是 pipeline 的数据流顺序。注意 NameNode 此时只返回这个有序列表,pipeline 还没真正建立,连接是 client 自己去连的。
返回给 client 的是 LocatedBlock,里面有 blockId、generationStamp、ExtendedBlock 信息和这三个 DataNode 的地址。client 拿到后构造 DFSOutputStream,进入下一阶段。
Pipeline 建立与数据流
这是写路径的真正核心。我画一张 pipeline 的数据流图:
1 | (1) create() RPC |
Pipeline 连接建立
client 不是直接连三个 DataNode。它向 dn1 发起 writeBlock 的 DataXceiver 请求,请求里带着完整的 pipeline 列表 [dn1, dn2, dn3]。dn1 收到后,自己作为第一个节点,再向 dn2 发同样的 writeBlock,dn2 再向 dn3 发。一直到 dn3(最后一个)建立成功,ack 沿着 dn3 → dn2 → dn1 → client 回流,pipeline 才算建好。这是一个链式握手,任何一环失败,pipeline 建立就失败,client 会回到 NameNode 要求换节点。
Packet 切分:data queue 与 ack queue
client 侧的 DFSOutputStream 内部有两个关键队列,这是理解写行为所有”卡顿/丢失/可见性”问题的钥匙:
- data queue:待发送的 packet,消费者是往 dn1 发数据的 sender 线程。
- ack queue:已发出但还没收到 ack 的 packet,消费者是 ack processor 线程。
应用层 write(bytes) 进来的数据,先被攒到 BufferedOutputStream(chunkSize 通常是 512B),攒够一个 packet 才入 data queue。默认 packet size 是 64KB(io.bytes.per.checksum × 128 左右,见 DFSClient.conf) 。这个”攒包”动作解释了一个常见现象:写完不 flush,对端 reader 可能完全看不到数据。
Packet 结构:到字节级
一个 packet 由 header + checksum + data 组成,我把它列成表:
| 字段 | 长度 | 含义 |
|---|---|---|
packetHeaderLen (4B) |
4 | header 自身长度 |
offsetInBlock (8B) |
8 | 该 packet 在 block 中的字节偏移 |
seqNo (8B) |
8 | packet 序号,单调递增,ack 比对的就是它 |
lastPacketInBlock (1B) |
1 | 是否是最后一个 packet(close 时) |
dataLen (4B) |
4 | 后面实际数据长度 |
| checksums | N×4B | 每个 chunk 一个 CRC32C,每 chunk 512B |
| data | dataLen | 真正的业务字节 |
这里有个细节:checksum 是按 512B 的 chunk 算的,不是按 packet 算。所以一个 64KB 的 packet 有 128 个 checksum。DN 落盘时把数据和 checksum 分开存(blk_xxx.data 和 blk_xxx.meta),读时按 chunk 校验,坏了只丢一个 chunk 而不是整个 block。
Ack 机制:从 pipeline 尾端回传
数据是单向从 client → dn1 → dn2 → dn3,但 ack 是反向从 dn3 开始回流。dn3 收到一个完整 packet 并校验 checksum 通过后,向 dn2 回 ack;dn2 校验自己的副本也 OK,再向 dn1 回;dn1 回给 client。
ack packet 里带的就是 seqNo,client 的 ack processor 拿到后,从 ack queue 里把所有 seqNo ≤ 该值的 packet 移除。注意是”≤”,因为 ack 是累积的——收到 ack(seqNo=100) 意味着 1~100 全部三副本都写成功了。
如果 ack 超时(默认 dfs.client.datanode-restart.timeout / 写 socket 超时,约 60~120s) ,或 checksum 不匹配,对应的 packet 会留在 ack queue,触发 pipeline 故障处理流程。
这里要强调一个常被忽略的点:ack 代表三副本都写到了 DN 的 OS page cache,不代表落盘。落到磁盘的 fsync 是另一个语义,下面讲 flush 时再区分。
Flush 语义层级:write / hflush / hsync / close
这是写路径里最容易踩坑、面试也最爱问的部分。HDFS 提供了四级”写完成”的承诺,从弱到强:
| API | 语义 | 可见性 | 持久性 |
|---|---|---|---|
write(byte[]) |
只进 client 内存缓冲,可能还没成 packet | 不可见 | 不保证 |
hflush() |
把缓冲切 packet 全部发完并等 pipeline ack | 其他 reader 可见 | 不落盘,DN crash 仍可能丢 |
hsync() |
hflush + 让 DN 对副本做 fsync | 可见 | 落盘(DN 进程 crash 不丢,整机掉电看存储) |
close() |
发 last packet + finalize | 可见 + 持久 + NameNode 登记 | 完整提交 |
一个典型误区是:很多人以为 hflush 之后数据就稳了。不是。hflush 只保证 packet 流到 DN 的 OS 缓存和 DFSOutputStream 的 ack queue 清空,DN 进程 crash 不丢,但 DN 整机断电,OS page cache 里的数据照样丢。要真正”落盘”,得调 hsync,它会触发 DN 侧 fsync_data,代价是吞吐显著下降(某次 benchmark 里我们测过,每条 record 都 hsync,写吞吐从 200MB/s 掉到 15MB/s) 。
生产里典型的折中是:N 秒一次 hsync + 中间用 hflush,既控制 crash 时的丢失窗口(RPO),又不把吞吐打没。这正是 Spark Structured Streaming 写 HDFS sink 时 commitsPerBatch / flush 策略要调的东西。
Java API 示例
1 | Configuration conf = new Configuration(); |
Pipeline 故障处理:摘除、重建、续写
这是写路径里最”硬核”的一段。假设 dn2 写超时或 socket reset,HDFS 不会直接让整次写失败,而是尝试降级续写。
故障检测
每个 packet 发出后会进 ack queue。DataStreamer 线程发现某个 packet 在 ack queue 里停留超过 dfs.client.datanode-restart.timeout(或 socket read 超时抛 IOException),就判定 pipeline 出问题了。
摘除坏节点与重新建 pipeline
client 的处理逻辑大致是:
- 标记坏掉的 DN(设为 dn2),把它从 pipeline 摘除,新 pipeline =
[dn1, dn3]。 - 把 ack queue 里的 packet 全部倒回 data queue(这些是没确认的,要重发)。
- 调用 NameNode 的
updatePipelineRPC:告诉 NameNode 这个 block 的新副本分布,NameNode 更新BlockInfo的expectedLocations,并记录generationStamp自增。 - client 用新的 pipeline 列表重新发起
writeBlock链式握手,从当前已 ack 的 offset 续写,而不是从头来。
这一步有个关键边界:新的 pipeline 必须满足 minReplication(默认 1) 。如果坏到只剩 1 个副本还能续写,但此时这个 block 处于”under replication”状态,NameNode 会异步补副本。如果连 minReplication 都凑不齐(比如三副本里坏了俩),client 这次 write 直接失败,应用层抛 IOException。
updatePipeline 这个 RPC 很精妙——它让 block 的元信息变更和 client 续写解耦:client 不用停,NameNode 后台慢慢补副本。我见过一个真实 case,某集群因为一台 ToR 交换机抖动,dn2、dn3 交替掉,导致一个 block 的 generationStamp 一晚上涨了 47 次,最终文件还是写成功,但 NameNode 上 getGenerationStampV2 计数器飙升,事后看 metrics 才发现。
Block 关闭与 finalize
数据写完,应用调 close()。client 的收尾流程:
DataStreamer把当前未满的 packet(哪怕不足 64KB)加上lastPacketInBlock=true标志发出。- 等最后一个 packet 的 ack 回来,pipeline 上三个 DN 都收到 last packet。
- 每个 DN 对该 block 做 finalize:把 block 从
RBW(Replica Being Written)状态转为FINALIZED,meta 文件落盘,此时该副本可以被 reader 读到完整长度。 - client 调 NameNode 的
completeFileRPC(带最后一个 block 的 length)。NameNode 校验所有 block 都满足副本数、都在 FINALIZED 状态,把 INodeFile 从 under construction 转正常,回收租约。 - NameNode 异步触发
BlockManager把 under-replicated 的 block 补到目标副本数。
只有 completeFile 返回 true,这次写才算真正落地。如果 client 在 close 前崩溃,文件会停在 under construction 状态,已 hflush 的数据对 reader 可见但租约还在,要等 NameNode 的 LeaseManager 硬超时回收后才能被别人 append。
一致性边界:什么时候会丢数据
把上面的语义汇总成几条实战铁律,这是我踩坑后刻在脑子里的:
- 未 hflush 的 write,client crash = 全丢。数据还在 client JVM 堆里,没成 packet,没进 pipeline,谁也救不了。
- hflush 但没 hsync,DN 整机断电 = 丢。OS page cache 没落盘。
- hsync 之后,三副本里即使坏两个,NameNode 也会从存活的那个副本补齐(前提是该 packet 已 ack)。
- append 的语义:append 复用同一条 pipeline,但 client 必须先拿到租约;如果上一个 writer 异常退出没 close,append 会触发
recoverLease,等租约回收后才能写。这就是为什么 Spark 任务重启写同一文件常报Failed to replace a bad datanode或Cannot obtain block length的根因。 - 可见性 ≠ 持久性:
getFileLength在 under construction 时能读到 hflush 过的长度,但这部分数据的持久性仍取决于是否 hsync。
理解了这几条边界,写任何 HDFS sink(Kafka Connect / Flume / Flink checkpoint)都不会再混淆”flush 完了是不是就安全了”。
生产调优:packet、重试、副本与吞吐
最后落到调参。我按影响维度列一张实战清单:
| 参数 | 默认 | 调优建议 | 权衡点 |
|---|---|---|---|
io.bytes.per.checksum |
512B | 一般不动 | chunk 粒度,影响校验恢复粒度 |
packet size(writePacketSize) |
64KB | 高吞吐场景可到 256KB | 大 packet 减小 ack 开销,但丢一个 packet 重发成本高 |
dfs.client.block.write.retries |
3 | 网络不稳可调到 5 | 重试次数,太大掩盖底层问题 |
dfs.client.write.max.paths-to-cache |
10 | 写大量小文件时调高 | 影响 client 侧 path→stream 缓存 |
| 副本数 | 3 | 热数据可 2,冷数据可 2/EC | 副本少→ack 路径短→吞吐高,但可靠性下降 |
dfs.client.block.write.locateFollowingBlock.retries |
5 | NameNode 抖动时调高 | 分配下一个 block 的重试 |
几个我真正在产线改过的:
- packet size 调到 256KB:某集群写 1GB 大文件时 ack 包太多,CPU 成了瓶颈,加大 packet 后吞吐从 180MB/s 涨到 320MB/s 。代价是 GC 抖动时丢的更多。
- 副本数从 3 降到 2(中间结果目录):写吞吐提升约 30%,但必须配套 EC 或冷备,否则坏一个盘就 under-replication。
- 关闭 prefetch:
dfs.client.write.prefetch在某些版本会预取下一个 block 的位置,写小文件反而拖慢,关闭后小文件场景 RTT 下降明显 。
这些数字都是我反复压测得来的,没有万能参数——packet 变大对小文件是负担,副本变少对关键数据是灾难。调优的本质是把”丢多少数据(RPO)/写多快(吞吐)/多稳(可靠性)”这三个轴子按业务显式权衡。
小结
把 HDFS 写路径从 create 到 pipeline ack 串下来,你会发现它本质上是一套**”双队列 + 累积 ack + pipeline 降级续写”**的协议:
- **双队列(data queue / ack queue)**解决了”边发边确认”的流水线化,ack queue 倒回是故障重发的基石;
- packet + chunk checksum 把可靠性粒度做到 512B,坏一个 chunk 不毁整个 block;
- 累积 ack 从 pipeline 尾端回流,保证 ack 语义清晰;
updatePipeline+ minReplication 让写过程对单点 DN 故障免疫,能续写不中断;- write/hflush/hsync/close 四级语义把”可见性”和”持久性”拆开,让上层按需选择。
真正在读路径篇之外把写路径吃透,意味着你能解释为什么 Spark checkpoint 要 hsync、为什么 Flume 的 batch 模式不会丢数据、为什么 client crash 后文件还能 append。这些不是面试八股,是大数据/AI 系统在 HDFS 上构建可靠 sink 的底层地基。下一篇我会拆读路径,把 cache、短路读、EC 读的细节再深挖一遍。
