Hub 文档

Dask

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Dask

Dask 是一个并行和分布式计算库,用于扩展现有的 Python 和 PyData 生态系统。

特别是,我们可以使用 Dask DataFrame 来扩展 pandas 的工作流程。Dask DataFrame 通过并行化 pandas 来处理大型表格数据。它与 pandas 的 API 非常相似,使得从在单个数据集上进行测试过渡到处理完整数据集变得简单。Dask 在处理 Parquet 格式时尤其高效,这是 Hugging Face Datasets 上的默认格式,因为它支持丰富的数据类型、高效的列过滤和压缩。

Dask 的一个很好的实际用例是以分布式方式在数据集上运行数据处理或模型推理。例如,请参阅 Coiled 发表的关于使用 Hugging Face + Dask 扩展基于 AI 的数据处理的优秀博客文章。

读和写

由于 Dask 使用 fsspec 来读写远程数据,您可以使用 Hugging Face 路径(hf://)来在 Hub 上读写数据。

首先,您需要使用您的 Hugging Face 账户登录,例如使用

hf auth login

然后您可以创建一个数据集仓库,例如使用

from huggingface_hub import HfApi

HfApi().create_repo(repo_id="username/my_dataset", repo_type="dataset")

最后,您可以在 Dask 中使用Hugging Face 路径。Dask DataFrame 支持在 Hugging Face 上以分布式方式写入 Parquet,它使用提交来跟踪数据集的更改。

import dask.dataframe as dd

df.to_parquet("hf://datasets/username/my_dataset")

# or write in separate directories if the dataset has train/validation/test splits
df_train.to_parquet("hf://datasets/username/my_dataset/train")
df_valid.to_parquet("hf://datasets/username/my_dataset/validation")
df_test .to_parquet("hf://datasets/username/my_dataset/test")

由于这会为每个文件创建一个提交,建议在上传后压缩历史记录。

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

这会创建一个名为 `username/my_dataset` 的数据集仓库,其中包含您的 Dask 数据集(Parquet 格式)。您稍后可以重新加载它。

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/username/my_dataset")

# or read from separate directories if the dataset has train/validation/test splits
df_train = dd.read_parquet("hf://datasets/username/my_dataset/train")
df_valid = dd.read_parquet("hf://datasets/username/my_dataset/validation")
df_test  = dd.read_parquet("hf://datasets/username/my_dataset/test")

有关 Hugging Face 路径及其实现方式的更多信息,请参阅客户端库中关于 HfFileSystem 的文档

处理数据

要使用 Dask 并行处理数据集,您可以首先为 pandas DataFrame 或 Series 定义您的数据处理函数,然后使用 Dask 的 `map_partitions` 函数将此函数并行地应用于数据集的所有分区。

def dummy_count_words(texts):
    return pd.Series([len(text.split(" ")) for text in texts])

或使用 pandas 字符串方法的类似函数(速度更快)

def dummy_count_words(texts):
    return texts.str.count(" ")

在 pandas 中,您可以在文本列上使用此函数

# pandas API
df["num_words"] = dummy_count_words(df.text)

在 Dask 中,您可以在每个分区上运行此函数

# Dask API: run the function on every partition
df["num_words"] = df.text.map_partitions(dummy_count_words, meta=int)

请注意,您还需要提供 `meta`,即函数输出中 pandas Series 或 DataFrame 的类型。这是必需的,因为 Dask DataFrame 使用惰性 API。由于 Dask 只有在调用 `.compute()` 时才会运行数据处理,因此它需要 `meta` 参数来在此期间了解新列的类型。

谓词和投影下推

从 Hugging Face 读取 Parquet 数据时,Dask 会自动利用 Parquet 文件中的元数据来跳过不需要的整个文件或行组。例如,如果您对 Parquet 格式的 Hugging Face 数据集应用筛选(谓词)或选择列的子集(投影),Dask 将读取 Parquet 文件的元数据,以丢弃不需要的部分,而无需下载它们。

这得益于对Dask DataFrame API 的重新实现,以支持查询优化,这使得 Dask 更快、更稳健。

例如,FineWeb-Edu 的这个子集包含许多 Parquet 文件。如果您可以筛选数据集以保留最近的 CC 转储中的文本,Dask 将跳过大部分文件,只下载与筛选条件匹配的数据。

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/HuggingFaceFW/fineweb-edu/sample/10BT/*.parquet")

# Dask will skip the files or row groups that don't
# match the query without downloading them.
df = df[df.dump >= "CC-MAIN-2023"]

Dask 也只会读取计算所需的列,并跳过其余的列。例如,如果您在代码后期删除了某一列,如果不需要该列,它就不会在管道的早期加载它。这在您想要操作列的子集或进行分析时非常有用。

# Dask will download the 'dump' and 'token_count' needed
# for the filtering and computation and skip the other columns.
df.token_count.mean().compute()

客户端

`dask` 中的大多数功能都针对集群或本地 `Client` 进行了优化,以启动并行计算。

import dask.dataframe as dd
from distributed import Client

if __name__ == "__main__":  # needed for creating new processes
    client = Client()
    df = dd.read_parquet(...)
    ...

对于本地使用,`Client` 默认使用带有 `LocalCluster` 的多进程。您可以手动配置 `LocalCluster` 的多进程:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8, threads_per_worker=8)
client = Client(cluster)

请注意,如果您在本地不使用 `Client` 的情况下使用默认的线程调度器,DataFrame 在某些操作后可能会变慢(更多详情请见此处)。

有关设置本地或云集群的更多信息,请参阅部署 Dask 文档

< > 在 GitHub 上更新