Parquet 内容定义分块
通过利用新的 Xet 存储层和 Apache Arrow 的 Parquet 内容定义分块 (CDC) 功能,减少在 Hugging Face Hub 上传和下载 Parquet 文件的时间,从而实现更高效、更具可扩展性的数据工作流。
摘要: PyArrow 和 Pandas 现已支持 Parquet 内容定义分块 (CDC),可在 Hugging Face 的 Xet 存储层等内容可寻址存储系统上实现 Parquet 文件的高效去重。CDC 通过只上传或下载已更改的数据块,极大地降低了数据传输和存储成本。通过传递 use_content_defined_chunking
参数来启用 CDC。
import pandas as pd
import pyarrow.parquet as pq
df.to_parquet("hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
pq.write_table(table, "hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
目录
简介
Apache Parquet 是一种列式存储格式,在数据工程社区中被广泛使用。
截至今日,Hugging Face 托管了近 21 PB 的数据集,其中仅 Parquet 文件就占用了超过 4 PB 的存储空间。因此,优化 Parquet 存储是一项高度优先的任务。Hugging Face 引入了一个名为 Xet 的新存储层,它利用内容定义分块来高效地对数据块进行去重,从而降低存储成本并提高下载/上传速度。
虽然 Xet 是格式无关的,但 Parquet 的布局和基于列块(数据页)的压缩方式,可能会因微小的数据变化而产生完全不同的字节级表示,从而导致去重性能不佳。为了解决这个问题,Parquet 文件的写入方式应该尽量减少相似数据之间的字节级差异,而这正是内容定义分块 (CDC) 发挥作用的地方。
让我们来探讨一下新的 Parquet CDC 功能与 Hugging Face 的 Xet 存储层结合使用所带来的性能优势。
数据准备
为了演示,我们将使用 OpenOrca 数据集的一个大小适中的子集。
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from huggingface_hub import hf_hub_download
def shuffle_table(table, seed=40):
rng = np.random.default_rng(seed)
indices = rng.permutation(len(table))
return table.take(indices)
# download the dataset from Hugging Face Hub into local cache
path = hf_hub_download(
repo_id="Open-Orca/OpenOrca",
filename="3_5M-GPT3_5-Augmented.parquet",
repo_type="dataset"
)
# read the cached parquet file into a PyArrow table
orca = pq.read_table(path, schema=pa.schema([
pa.field("id", pa.string()),
pa.field("system_prompt", pa.string()),
pa.field("question", pa.large_string()),
pa.field("response", pa.large_string()),
]))
# augment the table with some additional columns
orca = orca.add_column(
orca.schema.get_field_index("question"),
"question_length",
pc.utf8_length(orca["question"])
)
orca = orca.add_column(
orca.schema.get_field_index("response"),
"response_length",
pc.utf8_length(orca["response"])
)
# shuffle the table to make it unique to the Xet storage
orca = shuffle_table(orca)
# limit the table to the first 100,000 rows
table = orca[:100_000]
# take a look at the first 3 rows of the table
table[:3].to_pandas()
id | system_prompt | question_length | question | response_length | response | |
---|---|---|---|---|---|---|
0 | cot.64099 | 你是一个帮助人们找到……的 AI 助手。 | 241 | 思考这个问题。幼发拉底河是…… | 1663 | 问题是问幼发拉底河…… |
1 | flan.1206442 | 你是一个 AI 助手。你会得到一个…… | 230 | 单选/多选题:是否可以…… | 751 | 无法断定牛仔…… |
2 | t0.1170225 | 你是一个 AI 助手。用户会给你…… | 1484 | 问:我正在参加考试,必须猜对…… | 128 | 这段话主要告诉我们哪些东西很重要…… |
将表格作为 Parquet 文件上传到 Hugging Face Hub
自从 pyarrow>=21.0.0 版本以来,我们可以在 pyarrow
函数中使用 Hugging Face URI,通过 hf://
URI 方案直接向 Hub 读写 parquet (及其他格式) 文件。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 96.1MB / 96.1MB, 48.0kB/s
Total Bytes: 96.1M
Total Transfer: 96.1M
我们可以看到表格已全部作为新数据上传(总字节数 == 总传输量),因为 Xet 存储层尚不知道此数据。现在将其读回为一个 pyarrow
表格。
downloaded_table = pq.read_table("hf://datasets/kszucs/pq/orca.parquet")
assert downloaded_table.equals(table)
请注意,所有接受文件路径的 pyarrow
函数也都接受 Hugging Face URI,例如 pyarrow datasets、CSV 函数、增量 Parquet 写入器或只读取 parquet 元数据。
pq.read_metadata("hf://datasets/kszucs/pq/orca.parquet")
<pyarrow._parquet.FileMetaData object at 0x16ebfa980>
created_by: parquet-cpp-arrow version 21.0.0-SNAPSHOT
num_columns: 6
num_rows: 100000
num_row_groups: 1
format_version: 2.6
serialized_size: 4143
Parquet 去重的不同用例
为了展示内容定义分块功能的有效性,我们将尝试它在以下情况下的表现:
- 重新上传表格的精确副本
- 在表格中添加/删除列
- 在表格中更改列类型
- 追加新行和连接表格
- 在表格中插入/删除行
- 更改表格的行组大小
- 使用不同的文件级拆分
1. 重新上传完全相同的表格副本
虽然这个用例听起来微不足道,但传统文件系统不会对文件进行去重,导致数据的完全重新上传和重新下载。相比之下,一个利用内容定义分块的系统可以识别出文件内容是相同的,从而避免不必要的数据传输。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-copy.parquet")
New Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
我们可以看到没有新数据被上传,操作是瞬时完成的。现在让我们看看如果我们将相同的文件再次上传到不同的仓库会发生什么。
>>> pq.write_table(table, "hf://datasets/kszucs/pq-copy/orca-copy-again.parquet")
New Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
上传再次瞬时完成,因为去重也适用于跨仓库。这是 Xet 存储层的一个关键特性,能够实现高效的数据共享和协作。你可以在从分块到数据块:加速 Hub 上的上传和下载这篇博客文章中阅读更多关于细节和扩展挑战的内容。
2. 在表格中添加和删除列
首先将原始表格和修改后的表格写入本地 parquet 文件,以查看它们的大小。
table_with_new_columns = table.add_column(
table.schema.get_field_index("response"),
"response_short",
pc.utf8_slice_codeunits(table["response"], 0, 10)
)
table_with_removed_columns = table.drop(["response"])
pq.write_table(table, "/tmp/original.parquet")
pq.write_table(table_with_new_columns, "/tmp/with-new-columns.parquet")
pq.write_table(table_with_removed_columns, "/tmp/with-removed-columns.parquet")
!ls -lah /tmp/*.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/original.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/with-new-columns.parquet
-rw-r--r-- 1 kszucs wheel 67M Jul 22 14:47 /tmp/with-removed-columns.parquet
现在将它们上传到 Hugging Face,看看实际传输了多少数据。
>>> pq.write_table(table_with_new_columns, "hf://datasets/kszucs/pq/orca-added-columns.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 575kB / 575kB, 288kB/s
Total Bytes: 96.6M
Total Transfer: 575k
我们可以看到,只有新增的列和位于文件尾部的新 parquet 元数据被上传,而原始数据没有再次传输。这是 Xet 存储层的一大优势,因为它允许我们高效地添加新列而无需再次传输整个数据集。
同样的情况也适用于删除列,如下所示。
>>> pq.write_table(table_with_removed_columns, "hf://datasets/kszucs/pq/orca-removed-columns.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 37.7kB / 37.7kB, 27.0kB/s
Total Bytes: 70.6M
Total Transfer: 37.7k
为了更好地理解上传了什么,我们可以使用去重估算工具来可视化两个 parquet 文件之间的差异。
from de import visualize
visualize(table, table_with_new_columns, title="With New Columns", prefix="orca")
添加新列
添加两列新的数据意味着我们有未见过的数据页需要传输(用红色高亮显示),但其余数据保持不变(用绿色高亮显示),因此不会再次传输。注意文件尾部元数据中的小红色区域,这在我们修改 parquet 文件时几乎总是会改变。去重统计显示为 <去重后大小> / <总大小> = <去重率>
,其中比率越小意味着去重性能越高。
同样可视化删除一列后的差异。
visualize(table, table_with_removed_columns, title="With Removed Columns", prefix="orca")
删除列
由于我们正在删除整个列,我们只能看到文件尾部元数据的变化,所有其他列都保持不变并且已经存在于存储层中,因此它们不会被再次传输。
3. 在表格中更改列类型
另一个常见的用例是更改表中的列类型,例如,为了减少存储大小或为特定查询优化数据。让我们将 question_length
列从 int64
数据类型更改为 int32
,然后看看传输了多少数据。
# first make the table much smaller by removing the large string columns
# to highlight the differences better
table_without_text = table_with_new_columns.drop(["question", "response"])
# cast the question_length column to int64
table_with_casted_column = table_without_text.set_column(
table_without_text.schema.get_field_index("question_length"),
"question_length",
table_without_text["question_length"].cast("int32")
)
>>> pq.write_table(table_with_casted_column, "hf://datasets/kszucs/pq/orca-casted-column.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 181kB / 181kB, 113kB/s
Total Bytes: 1.80M
Total Transfer: 181k
同样,我们可以看到只有新的列和更新后的 parquet 元数据被上传。现在可视化去重热图。
visualize(table_without_text, table_with_casted_column, title="With Casted Column", prefix="orca")
转换类型后的列
第一个红色区域表示新添加的列,而第二个红色区域表示页脚中更新的元数据。其余数据保持不变,不会再次传输。
4. 追加新行和连接表格
我们将通过将原始数据集的另一个切片连接到表中来追加新行。
table = orca[:100_000]
next_10k_rows = orca[100_000:110_000]
table_with_appended_rows = pa.concat_tables([table, next_10k_rows])
assert len(table_with_appended_rows) == 110_000
现在检查是否只有新行被上传,因为原始数据已经被 Xet 存储层所知。
>>> pq.write_table(table_with_appended_rows, "hf://datasets/kszucs/pq/orca-appended-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 10.3MB / 10.3MB, 1.36MB/s
Total Bytes: 106M
Total Transfer: 10.3M
visualize(table, table_with_appended_rows, title="With Appended Rows", prefix="orca")
追加行
由于每一列都获得了新数据,我们可以看到多个红色区域。这是由于实际的 parquet 文件规范,其中整个列是依次排列的(在每个行组内)。
5. 在表格中插入/删除行
这里是困难的部分,因为插入和删除会移动现有的行,这会导致在 parquet 术语中称为列块或数据页的不同。由于每个数据页都是独立压缩的,即使是单行的插入或删除也可能导致从编辑的行到 parquet 文件末尾的字节级表示完全不同。
这个 Parquet 特有的问题不能仅由 Xet 存储层解决,parquet 文件本身需要以一种即使有插入或删除的行也能最小化数据页差异的方式写入。
让我们尝试使用现有的机制,看看它的表现如何。
table = orca[:100_000]
# remove 4k rows from two places
table_with_deleted_rows = pa.concat_tables([
orca[:15_000],
orca[18_000:60_000],
orca[61_000:100_000]
])
# add 1k rows at the first third of the table
table_with_inserted_rows = pa.concat_tables([
orca[:10_000],
orca[100_000:101_000],
orca[10_000:50_000],
orca[101_000:103_000],
orca[50_000:100_000],
])
assert len(table) == 100_000
assert len(table_with_deleted_rows) == 96_000
assert len(table_with_inserted_rows) == 103_000
>>> pq.write_table(table_with_inserted_rows, "hf://datasets/kszucs/pq/orca-inserted-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 89.8MB / 89.8MB, 42.7kB/s
Total Bytes: 99.1M
Total Transfer: 89.8M
>>> pq.write_table(table_with_deleted_rows, "hf://datasets/kszucs/pq/orca-deleted-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 78.2MB / 78.2MB, 46.5kB/s
Total Bytes: 92.2M
Total Transfer: 78.2M
同时可视化两种情况以查看差异。
visualize(table, table_with_deleted_rows, title="Deleted Rows", prefix="orca")
visualize(table, table_with_inserted_rows, title="Inserted Rows", prefix="orca")
已删除的行
已插入的行
我们可以看到去重性能显著下降(比率更高),去重热图显示压缩后的 parquet 文件彼此之间有很大差异。这是因为插入和删除的行移动了现有的行,导致从编辑的行到 parquet 文件末尾的数据页不同。
我们可以通过写入具有新的pyarrow 功能,称为内容定义分块 (CDC) 的 parquet 文件来解决此问题。此功能确保列始终根据其内容被分块到数据页中,类似于 Xet 存储层去重数据的方式,但在任何序列化或压缩发生之前应用于列的逻辑值。
该功能可以通过将 use_content_defined_chunking=True
传递给 write_parquet
函数来启用。
import pyarrow.parquet as pq
pq.write_table(table, "hf://user/repo/filename.parquet", use_content_defined_chunking=True)
Pandas 也支持这项新功能。
df.to_parquet("hf://user/repo/filename.parquet", use_content_defined_chunking=True)
让我们可视化使用 Parquet CDC 功能前后的去重差异。
visualize(table, table_with_deleted_rows, title="With Deleted Rows", prefix="orca", with_cdc=True)
visualize(table, table_with_inserted_rows, title="With Inserted Rows", prefix="orca", with_cdc=True)
已删除的行
压缩 | 原生 Parquet | CDC Parquet |
---|---|---|
无 | ![]() |
![]() |
去重统计 | 185.3 MB / 306.8 MB = 60% | 162.9 MB / 307.2 MB = 53% |
Snappy 压缩 | ![]() |
![]() |
去重统计 | 174.4 MB / 188.3 MB = 92% | 104.3 MB / 188.8 MB = 55% |
已插入的行
压缩 | 原生 Parquet | CDC Parquet |
---|---|---|
无 | ![]() |
![]() |
去重统计 | 190.1 MB / 318.0 MB = 59% | 164.1 MB / 318.4 MB = 51% |
Snappy 压缩 | ![]() |
![]() |
去重统计 | 186.2 MB / 195.2 MB = 95% | 102.8 MB / 195.7 MB = 52% |
看起来好多了!俗话说,实践是检验真理的唯一标准,让我们实际使用内容定义分块 parquet 功能上传表格,看看传输了多少数据。
请注意,我们需要首先上传启用了内容定义分块的原始表格。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-cdc.parquet", use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 94.5MB / 94.5MB, 46.5kB/s
Total Bytes: 96.4M
Total Transfer: 94.5M
>>> pq.write_table(
... table_with_inserted_rows,
... "hf://datasets/kszucs/pq/orca-inserted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
New Data Upload: 100%|███████████████████████████████████████████████| 6.00MB / 6.00MB, 1.00MB/s
Total Bytes: 99.3M
Total Transfer: 6.00M
>>> pq.write_table(
... table_with_deleted_rows,
... "hf://datasets/kszucs/pq/orca-deleted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
New Data Upload: 100%|███████████████████████████████████████████████| 7.57MB / 7.57MB, 1.35MB/s
Total Bytes: 92.4M
Total Transfer: 7.57M
上传的数据明显小于之前,显示出更好的去重性能,如上面的热图所示。
需要注意的是,使用 huggingface_hub.hf_hub_download()
和 datasets.load_dataset()
函数下载时,同样适用这些性能优势。
6. 使用不同的行组大小
根据读取器/写入器的限制,在某些情况下,较大或较小的行组大小可能更有利。parquet 写入器实现默认使用固定大小的行组,对于 pyarrow,默认值为 1 百万行。数据集写入器可能会减小行组大小以提高随机访问性能或减少读取器应用程序的内存占用。
更改行组大小将在行组之间移动行,从而在数据页之间移动值,因此我们遇到了与插入或删除行类似的问题。让我们使用 parquet CDC 功能比较不同行组大小之间的去重性能。
from de import visualize
# pick a larger subset of the dataset to have enough rows for the row group size tests
table = orca[2_000_000:3_000_000]
visualize(table, (table, {"row_group_size": 128 * 1024}), title="Small Row Groups", with_cdc=True, prefix="orca")
visualize(table, (table, {"row_group_size": 256 * 1024}), title="Medium Row Groups", with_cdc=True, prefix="orca")
小行组
压缩 | 原生 Parquet | CDC Parquet |
---|---|---|
无 | ![]() |
![]() |
去重统计 | 1.6 GB / 3.1 GB = 52% | 1.6 GB / 3.1 GB = 50% |
Snappy 压缩 | ![]() |
![]() |
去重统计 | 1.1 GB / 1.9 GB = 59% | 995.0 MB / 1.9 GB = 51% |
中等行组
压缩 | 原生 Parquet | CDC Parquet |
---|---|---|
无 | ![]() |
![]() |
去重统计 | 1.6 GB / 3.1 GB = 51% | 1.6 GB / 3.1 GB = 50% |
Snappy 压缩 | ![]() |
![]() |
去重统计 | 1.1 GB / 1.9 GB = 57% | 976.5 MB / 1.9 GB = 50% |
7. 使用不同的文件级拆分
数据集通常被分割成多个文件以提高并行性和随机访问。Parquet CDC 结合 Xet 存储层可以有效地对多个文件中的数据进行去重,即使数据是在不同的边界处分割的。
让我们用三种不同的文件级拆分方式写出数据集,然后比较去重性能。
from pathlib import Path
from de import estimate
def write_dataset(table, base_dir, num_shards, **kwargs):
"""Simple utility to write a pyarrow table to multiple Parquet files."""
# ensure that directory exists
base_dir = Path(base_dir)
base_dir.mkdir(parents=True, exist_ok=True)
# split and write the table into multiple files
rows_per_file = len(table) / num_shards
for i in range(num_shards):
start = i * rows_per_file
end = min((i + 1) * rows_per_file, len(table))
shard = table.slice(start, end - start)
path = base_dir / f"part-{i}.parquet"
pq.write_table(shard, path, **kwargs)
write_dataset(orca, "orca5-cdc", num_shards=5, use_content_defined_chunking=True)
write_dataset(orca, "orca10-cdc", num_shards=10, use_content_defined_chunking=True)
write_dataset(orca, "orca20-cdc", num_shards=20, use_content_defined_chunking=True)
estimate("orca5-cdc/*.parquet", "orca10-cdc/*.parquet", "orca20-cdc/*.parquet")
Total size: 9.3 GB
Chunk size: 3.2 GB
尽管我们用三种不同的分片配置上传了数据集,但总上传大小将仅略大于原始数据集大小。
使用 Pandas 的 Parquet CDC 功能
到目前为止,我们一直使用 PyArrow,现在让我们通过下载、筛选然后上传启用了内容定义分块功能的数据集,来探索如何将相同的 CDC 功能与 Pandas 结合使用。
import pandas as pd
src = "hf://datasets/teknium/OpenHermes-2.5/openhermes2_5.json"
df = pd.read_json(src)
>>> dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc.parquet"
>>> df.to_parquet(dst, use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 799MB / 799MB, 197kB/s
Total Bytes: 799M
Total Transfer: 799M
>>> short_df = df[[len(c) < 10 for c in df.conversations]]
>>> short_dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc-short.parquet"
>>> short_df.to_parquet(short_dst, use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 21.9MB / 21.9MB, 45.4kB/s
Total Bytes: 801M
Total Transfer: 21.9M
import pyarrow as pa
from de import visualize
visualize(
pa.Table.from_pandas(df),
pa.Table.from_pandas(short_df),
title="Hermes 2.5 Short Conversations",
with_cdc=True,
prefix="hermes"
)
Hermes 2.5 短对话
压缩 | 原生 Parquet | CDC Parquet |
---|---|---|
无 | ![]() |
![]() |
去重统计 | 1.9 GB / 3.2 GB = 58% | 1.6 GB / 3.2 GB = 51% |
Snappy 压缩 | ![]() |
![]() |
去重统计 | 1.5 GB / 1.6 GB = 94% | 821.1 MB / 1.6 GB = 51% |
由于 Parquet CDC 应用于 parquet 数据页级别(列块级别),去重性能取决于筛选器的选择性,或者更确切地说,是变化在整个数据集中的分布。如果大部分数据页受到影响,那么去重率将显著下降。
参考文献
关于此功能的更多详情可以在以下链接找到:
结论
我们探讨了新的 Parquet 内容定义分块功能与 Hugging Face 的 Xet 存储层结合使用的性能优势。我们展示了它如何在各种场景中高效地进行数据去重,使 parquet 操作更快、更节省存储空间。与传统的云存储解决方案相比,Xet 存储层与 Parquet CDC 可以显著减少数据传输时间和成本。
在这里将您的 Hugging Face 仓库从 Git LFS 迁移到 Xet 以享受此优势:https://huggingface.co/join/xet