使用 Hugging Face + Dask 扩展基于 AI 的数据处理
Hugging Face 平台拥有众多数据集和预训练模型,使得使用和训练最先进的机器学习模型变得越来越容易。然而,扩展 AI 任务可能很困难,因为 AI 数据集通常很大(从几百 GB 到几 TB 不等),并且使用 Hugging Face Transformers 进行模型推理有时计算成本很高。
Dask 是一个用于分布式计算的 Python 库,它通过将数据集分解成可管理的块来处理核外计算(即处理无法装入内存的数据)。这使得完成以下任务变得容易:
- 通过一个模仿 pandas 且易于使用的 API,高效加载和预处理 TB 级数据集
- 并行模型推理(可选择多节点 GPU 推理)
在这篇文章中,我们将展示一个来自 FineWeb 数据集的数据处理示例,使用 FineWeb-Edu 分类器来识别具有高教育价值的网页。我们将展示:
- 如何使用 pandas 在本地处理 100 行数据
- 如何使用 Dask 在云端多个 GPU 上扩展到 2.11 亿行数据
使用 Pandas 处理 100 行数据
FineWeb 数据集 包含来自 Common Crawl 的 15 万亿个英文网页数据 token。Common Crawl 是一个非营利组织,托管着每月更新的公共网络爬取数据集。该数据集常用于各种任务,如大型语言模型训练、分类、内容过滤以及跨多个行业的信息检索。
在笔记本电脑上使用 pandas 下载并读入单个文件可能需要超过 1 分钟。
import pandas as pd
df = pd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)
接下来,我们将使用 HF 的 FineWeb-Edu 分类器来评估我们数据集中网页的教育价值。网页的评分范围从 0 到 5,其中 0 表示没有教育价值,5 表示具有很高的教育价值。我们可以使用 pandas 对一个较小的、100 行的数据子集进行此操作,在带有 GPU 的 M1 Mac 上大约需要 10 秒。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=25, # Choose batch size based on data size and hardware
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]
请注意,我们还在 compute_scores
函数中增加了一个检查可用硬件的步骤,因为在下一步使用 Dask 进行扩展时,该函数将被分发执行。这使得从在单台机器上(无论是 CPU 还是带 Apple silicon GPU 的 MacBook)进行本地测试,到扩展到多台机器(如 NVIDIA GPU)上的分布式执行变得容易。
使用 Dask 扩展至 2.11 亿行数据
整个 2024 年 2 月/3 月的爬取数据在磁盘上大小为 432 GB,在内存中约为 715 GB,分布在 250 个 Parquet 文件中。即使在一台有足够内存容纳整个数据集的机器上,串行处理也会非常缓慢。
为了进行扩展,我们可以使用 Dask DataFrame,它通过并行化 pandas 来帮助您处理大型表格数据。它的 API 与 pandas 非常相似,使得从在单个数据集上测试到扩展到完整数据集变得容易。Dask 与 Hugging Face 数据集的默认格式 Parquet 配合得很好,可以实现丰富的数据类型、高效的列式过滤和压缩。
import dask.dataframe as dd
df = dd.read_parquet(
# Load the full dataset lazily with Dask
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
我们将使用 map_partitions
在 Dask DataFrame 上并行应用用于文本分类的 compute_scores
函数。map_partitions
会在更大的 Dask DataFrame 中的每个 pandas DataFrame 上并行应用我们的函数。meta
参数是 Dask 特有的,用于指定输出的数据结构(列名和数据类型)。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device,
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none",
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
请注意,我们为这个示例选择了一个效果不错的 batch_size
,但您可能需要根据自己工作流中的硬件、数据和模型来自定义这个值(请参阅 HF 关于 pipeline 批处理的文档)。
现在我们已经确定了我们感兴趣的数据集行,我们可以保存结果以供其他下游分析使用。Dask DataFrame 自动支持分布式写入 Parquet。Hugging Face 使用提交(commit)来跟踪数据集的更改,并允许并行写入 Dask DataFrame。
repo_id = "<your-hf-user>/<your-dataset-name>" # Update with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")
由于这会为每个文件创建一个提交,建议在上传后压缩历史记录。
from huggingface_hub import HfApi
HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")
或者,您可以使用这个自定义函数,它可以在每次提交中上传多个文件。
多 GPU 并行模型推理
有多种方法可以在各种硬件上部署 Dask。在这里,我们将使用 Coiled 在云上部署 Dask,这样我们就可以按需启动虚拟机,并在完成后清理它们。
cluster = coiled.Cluster(
region="us-east-1", # Same region as data
n_workers=100,
spot_policy="spot_with_fallback", # Use spot instances, if available
worker_vm_types="g5.xlarge", # NVIDIA A10 Tensor Core GPU
worker_options={"nthreads": 1},
)
client = cluster.get_client()
在幕后,Coiled 会处理:
- 配置带有 GPU 硬件的云虚拟机。在本例中,是 AWS 上的 g5.xlarge 实例。
- 设置相应的 NVIDIA 驱动、CUDA 运行时等。
- 通过包同步功能,在云虚拟机上自动安装与您本地相同的包。这包括您工作目录中的 Python 文件。
该工作流耗时约 5 小时完成,并且我们获得了良好的 GPU 硬件利用率。

总而言之,以下是完整的工作流程
import dask.dataframe as dd
from transformers import pipeline
from huggingface_hub import HfApi
import os
import coiled
cluster = coiled.Cluster(
region="us-east-1",
n_workers=100,
spot_policy="spot_with_fallback",
worker_vm_types="g5.xlarge",
worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
{"HF_TOKEN": "<your-hf-token>"} # Send credentials over encrypted connection
)
df = dd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
repo_id = "<your-hf-user>/<your-dataset-name>" # Replace with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")
HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset") # optional: squash commit history
结论
Hugging Face + Dask 是一个强大的组合。在本例中,我们通过使用 Dask + Coiled 在云端多个 GPU 上并行运行工作流,将分类任务从 100 行扩展到了 2.11 亿行。
同样类型的工作流也可以用于其他用例,例如:
- 过滤基因组数据以选择感兴趣的基因
- 从非结构化文本中提取信息并将其转化为结构化数据集
- 清理从互联网或 Common Crawl 抓取的文本数据
- 运行多模态模型推理以分析大型音频、图像或视频数据集