Hub 文档

Spark

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

Spark

Spark 支持在分布式环境中进行实时、大规模的数据处理。

你可以使用 pyspark_huggingface 通过 “huggingface” 数据源在 PySpark 中访问 Hugging Face 的数据集仓库。

请在 Hugging Face Spaces 上试用 Spark Notebooks,以获取预装了 PySpark 和 pyspark_huggingface 的 Notebooks。

设置

安装

要能够读写 Hugging Face 数据集,你需要安装 pyspark_huggingface 库。

pip install pyspark_huggingface

这也会安装所需的依赖项,如用于身份验证的 huggingface_hub,以及用于读写数据集的 pyarrow

身份验证

你需要通过 Hugging Face 的身份验证才能读取私有/门控的数据集仓库或向你自己的数据集仓库写入数据。

你可以使用命令行界面(CLI)进行验证:

hf auth login

也可以通过 HF_TOKEN 环境变量提供你的 Hugging Face 令牌,或者将 token 选项传递给读取器。有关身份验证的更多详细信息,请查看此指南

启用 “huggingface” 数据源

PySpark 4 引入了新的数据源 API,允许使用来自自定义源的数据集。如果安装了 pyspark_huggingface,PySpark 会自动导入它并启用 “huggingface” 数据源。

该库还为 PySpark 3.5、3.4 和 3.3 向后移植了 “huggingface” 数据源的 API。但在这种情况下,需要显式导入 pyspark_huggingface 来激活向后移植的功能并启用 “huggingface” 数据源。

>>> import pyspark_huggingface
huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4)

读取

“huggingface” 数据源允许从 Hugging Face 读取数据集,其底层使用 pyarrow 来流式传输 Arrow 数据。这与 Hugging Face 上所有支持格式的数据集兼容,例如 Parquet 数据集。

例如,以下是如何加载 stanfordnlp/imdb 数据集:

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")

这是另一个使用 BAAI/Infinity-Instruct 数据集的例子。这是一个门控仓库,用户在访问之前必须接受使用条款。它还有多个子集,即“3M”和“7M”。因此,我们需要指定加载哪一个。

我们使用 .format() 函数来指定 “huggingface” 数据源,并使用 .load() 来加载数据集(更确切地说,是名为 “7M” 的配置或子集,包含 700 万个样本)。然后,我们计算每种语言的对话数量并对数据集进行过滤。

登录以访问门控仓库后,我们可以运行:

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
>>> df.show()
+---+----------------------------+-----+----------+--------------------+        
| id|               conversations|label|langdetect|              source|
+---+----------------------------+-----+----------+--------------------+
|  0|        [{human, def exti...|     |        en|      code_exercises|
|  1|        [{human, See the ...|     |        en|                flan|
|  2|        [{human, This is ...|     |        en|                flan|
|  3|        [{human, If you d...|     |        en|                flan|
|  4|        [{human, In a Uni...|     |        en|                flan|
|  5|        [{human, Read the...|     |        en|                flan|
|  6|        [{human, You are ...|     |        en|          code_bagel|
|  7|        [{human, I want y...|     |        en|          Subjective|
|  8|        [{human, Given th...|     |        en|                flan|
|  9|[{human, 因果联系原则是法...|     |     zh-cn|          Subjective|
| 10|        [{human, Provide ...|     |        en|self-oss-instruct...|
| 11|        [{human, The univ...|     |        en|                flan|
| 12|        [{human, Q: I am ...|     |        en|                flan|
| 13|        [{human, What is ...|     |        en|      OpenHermes-2.5|
| 14|        [{human, In react...|     |        en|                flan|
| 15|        [{human, Write Py...|     |        en|      code_exercises|
| 16|        [{human, Find the...|     |        en|            MetaMath|
| 17|        [{human, Three of...|     |        en|            MetaMath|
| 18|        [{human, Chandra ...|     |        en|            MetaMath|
| 19|[{human, 用经济学知识分析...|     |     zh-cn|          Subjective|
+---+----------------------------+-----+----------+--------------------+

这将以流式方式加载数据集,并且输出的 DataFrame 中每个数据文件对应一个分区,以实现高效的分布式处理。

为了计算每种语言的对话数量,我们运行这段使用 columns 选项和 groupBy() 操作的代码。columns 选项很有用,因为它只加载我们需要的数据,因为 PySpark 的数据源 API 不支持谓词下推。还有一个 filters 选项,可以只加载值在特定范围内的数据。

>>> df_langdetect_only = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("columns", '["langdetect"]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> df_langdetect_only.groupBy("langdetect").count().show()
+----------+-------+                                                            
|langdetect|  count|
+----------+-------+
|        en|6697793|
|     zh-cn| 751313|
+----------+-------+

要过滤数据集,只保留中文对话:

>>> df_chinese_only = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("filters", '[("langdetect", "=", "zh-cn")]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> df_chinese_only.show()
+---+----------------------------+-----+----------+----------+                  
| id|               conversations|label|langdetect|    source|
+---+----------------------------+-----+----------+----------+
|  9|[{human, 因果联系原则是法...|     |     zh-cn|Subjective|
| 19|[{human, 用经济学知识分析...|     |     zh-cn|Subjective|
| 38| [{human, 某个考试共有A、...|     |     zh-cn|Subjective|
| 39|[{human, 撰写一篇关于斐波...|     |     zh-cn|Subjective|
| 57|[{human, 总结世界历史上的...|     |     zh-cn|Subjective|
| 61|[{human, 生成一则广告词。...|     |     zh-cn|Subjective|
| 66|[{human, 描述一个有效的团...|     |     zh-cn|Subjective|
| 94|[{human, 如果比利和蒂芙尼...|     |     zh-cn|Subjective|
|102|[{human, 生成一句英文名言...|     |     zh-cn|Subjective|
|106|[{human, 写一封感谢信,感...|     |     zh-cn|Subjective|
|118| [{human, 生成一个故事。}...|     |     zh-cn|Subjective|
|174|[{human, 高胆固醇水平的后...|     |     zh-cn|Subjective|
|180|[{human, 基于以下角色信息...|     |     zh-cn|Subjective|
|192|[{human, 请写一篇文章,概...|     |     zh-cn|Subjective|
|221|[{human, 以诗歌形式表达对...|     |     zh-cn|Subjective|
|228|[{human, 根据给定的指令,...|     |     zh-cn|Subjective|
|236|[{human, 打开一个新的生成...|     |     zh-cn|Subjective|
|260|[{human, 生成一个有关未来...|     |     zh-cn|Subjective|
|268|[{human, 如果有一定数量的...|     |     zh-cn|Subjective|
|273| [{human, 题目:小明有5个...|     |     zh-cn|Subjective|
+---+----------------------------+-----+----------+----------+

也可以在加载的 DataFrame 上应用过滤器或删除列,但在加载时进行这些操作效率更高,尤其是在 Parquet 数据集上。这是因为 Parquet 在文件和行组级别包含了元数据,这使得可以跳过不包含满足条件样本的整个数据集部分。Parquet 中的列也可以独立加载,这允许跳过排除的列并避免加载不必要的数据。

选项

以下是可传递给 read..option() 的选项列表:

  • config (string):选择一个数据集子集/配置
  • split (string):选择一个数据集划分(默认为 “train”)
  • token (string):你的 Hugging Face 令牌

对于 Parquet 数据集

  • columns (string):选择要加载的列的子集,例如 '["id"]'
  • filters (string):用于跳过不符合条件的文件和行组,例如 '["source", "=", "code_exercises"]'。过滤器会传递给 pyarrow.parquet.ParquetDataset

任何其他选项都会作为参数传递给 [datasets.load_dataset] (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset)

运行 SQL 查询

一旦你的 PySpark DataFrame 准备就绪,你就可以使用 spark.sql 运行 SQL 查询。

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("columns", '["source"]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show()
+--------------------+-------+
|              source|  total|
+--------------------+-------+
|                flan|2435840|
|          Subjective|1342427|
|      OpenHermes-2.5| 855478|
|            MetaMath| 690138|
|      code_exercises| 590958|
|Orca-math-word-pr...| 398168|
|          code_bagel| 386649|
|        MathInstruct| 329254|
|python-code-datas...|  88632|
|instructional_cod...|  82920|
|        CodeFeedback|  79513|
|self-oss-instruct...|  50467|
|Evol-Instruct-Cod...|  43354|
|CodeExercise-Pyth...|  27159|
|code_instructions...|  23130|
|  Code-Instruct-700k|  10860|
|Glaive-code-assis...|   9281|
|python_code_instr...|   2581|
|Python-Code-23k-S...|   2297|
+--------------------+-------+

再次强调,指定 columns 选项不是必须的,但它有助于避免加载不必要的数据,从而使查询更快。

写入

你可以使用 “huggingface” 数据源将 PySpark DataFrame 写入 Hugging Face。它以分布式方式并行上传 Parquet 文件,并且只有在所有文件都上传完毕后才提交。工作方式如下:

>>> import pyspark_huggingface
>>> df.write.format("huggingface").save("username/dataset_name")

下面是我们如何使用这个函数将过滤后的 BAAI/Infinity-Instruct 数据集写回 Hugging Face 的例子。

首先你需要创建一个数据集仓库,例如 username/Infinity-Instruct-Chinese-Only(如果你愿意,可以将其设为私有)。然后,确保你已经通过身份验证并且可以使用 “huggingface” 数据源,将 mode 设置为 “overwrite”(或 “append” 如果你想扩展现有数据集),然后使用 .save() 推送到 Hugging Face。

>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only")

模式

将数据集推送到 Hugging Face 时有两种模式可用:

  • “overwrite”:如果数据集已存在,则覆盖它
  • “append”:将数据集追加到现有数据集中

选项

以下是可传递给 write.option() 的选项列表:

  • token (string):你的 Hugging Face 令牌

欢迎贡献代码以在此处添加更多选项,特别是 subsetsplit

< > 在 GitHub 上更新