Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Dask

Dask 是一个并行和分布式计算库,可以扩展现有的 Python 和 PyData 生态系统。

特别是,我们可以使用 Dask DataFrame 来扩展 pandas 工作流程。Dask DataFrame 并行化 pandas 以处理大型表格数据。它紧密地镜像了 pandas API,使得从在单个数据集上测试到处理完整数据集的过渡变得简单。Dask 在处理 Parquet 格式时尤其有效,Parquet 是 Hugging Face Datasets 上的默认格式,因为它支持丰富的数据类型、高效的列式过滤和压缩。

Dask 的一个很好的实际用例是以分布式方式在数据集上运行数据处理或模型推理。例如,请参阅 Coiled’s 关于 使用 Hugging Face + Dask 扩展基于 AI 的数据处理 的优秀博文。

读取和写入

由于 Dask 使用 fsspec 来读取和写入远程数据,因此您可以使用 Hugging Face 路径 (hf://) 在 Hub 上读取和写入数据。

首先,您需要使用您的 Hugging Face 帐户登录,例如使用

huggingface-cli login

然后您可以创建一个数据集仓库,例如使用

from huggingface_hub import HfApi

HfApi().create_repo(repo_id="username/my_dataset", repo_type="dataset")

最后,您可以在 Dask 中使用Hugging Face 路径。Dask DataFrame 支持分布式写入 Hugging Face 上的 Parquet,它使用提交来跟踪数据集更改

import dask.dataframe as dd

df.to_parquet("hf://datasets/username/my_dataset")

# or write in separate directories if the dataset has train/validation/test splits
df_train.to_parquet("hf://datasets/username/my_dataset/train")
df_valid.to_parquet("hf://datasets/username/my_dataset/validation")
df_test .to_parquet("hf://datasets/username/my_dataset/test")

由于这会为每个文件创建一个提交,因此建议在上传后压缩历史记录

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

这将创建一个名为 username/my_dataset 的数据集仓库,其中包含 Parquet 格式的 Dask 数据集。您可以稍后重新加载它

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/username/my_dataset")

# or read from separate directories if the dataset has train/validation/test splits
df_train = dd.read_parquet("hf://datasets/username/my_dataset/train")
df_valid = dd.read_parquet("hf://datasets/username/my_dataset/validation")
df_test  = dd.read_parquet("hf://datasets/username/my_dataset/test")

有关 Hugging Face 路径以及它们如何实现的更多信息,请参阅客户端库关于 HfFileSystem 的文档

处理数据

要使用 Dask 并行处理数据集,您可以首先为 pandas DataFrame 或 Series 定义您的数据处理函数,然后使用 Dask 的 map_partitions 函数将此函数并行应用于数据集的所有分区

def dummy_count_words(texts):
    return pd.Series([len(text.split(" ")) for text in texts])

在 pandas 中,您可以在文本列上使用此函数

# pandas API
df["num_words"] = dummy_count_words(df.text)

在 Dask 中,您可以在每个分区上运行此函数

# Dask API: run the function on every partition
df["num_words"] = df.text.map_partitions(dummy_count_words, meta=int)

请注意,您还需要提供 meta,它是函数输出中 pandas Series 或 DataFrame 的类型。这是必需的,因为 Dask DataFrame 使用延迟 API。由于 Dask 只会在调用 .compute() 后运行数据处理,因此它需要 meta 参数来了解此时新列的类型。

谓词和投影下推

当从 Hugging Face 读取 Parquet 数据时,Dask 会自动利用 Parquet 文件中的元数据来跳过不需要的整个文件或行组。例如,如果您在 Parquet 格式的 Hugging Face 数据集上应用过滤器(谓词),或者如果您选择列的子集(投影),Dask 将读取 Parquet 文件的元数据以丢弃不需要的部分,而无需下载它们。

这要归功于 Dask DataFrame API 的重新实现,以支持查询优化,这使得 Dask 更快、更强大。

例如,FineWeb-Edu 的这个子集包含许多 Parquet 文件。如果您可以过滤数据集以保留来自最近 CC 转储的文本,Dask 将跳过大多数文件,并且只下载与过滤器匹配的数据

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/HuggingFaceFW/fineweb-edu/sample/10BT/*.parquet")

# Dask will skip the files or row groups that don't
# match the query without downloading them.
df = df[df.dump >= "CC-MAIN-2023"]

Dask 也只会读取您的计算所需的列,并跳过其余列。例如,如果您在代码后期删除一列,如果不需要,它不会费心在管道早期加载它。当您想要操作列的子集或进行分析时,这非常有用

# Dask will download the 'dump' and 'token_count' needed
# for the filtering and computation and skip the other columns.
df.token_count.mean().compute()
< > 在 GitHub 上更新