Hub 文档
Spark
并获得增强的文档体验
开始使用
Spark
Spark 支持在分布式环境中进行实时、大规模的数据处理。
你可以使用 pyspark_huggingface
通过 “huggingface” 数据源在 PySpark 中访问 Hugging Face 的数据集仓库。
请在 Hugging Face Spaces 上试用 Spark Notebooks,以获取预装了 PySpark 和 pyspark_huggingface
的 Notebooks。

设置
安装
要能够读写 Hugging Face 数据集,你需要安装 pyspark_huggingface
库。
pip install pyspark_huggingface
这也会安装所需的依赖项,如用于身份验证的 huggingface_hub
,以及用于读写数据集的 pyarrow
。
身份验证
你需要通过 Hugging Face 的身份验证才能读取私有/门控的数据集仓库或向你自己的数据集仓库写入数据。
你可以使用命令行界面(CLI)进行验证:
hf auth login
也可以通过 HF_TOKEN
环境变量提供你的 Hugging Face 令牌,或者将 token
选项传递给读取器。有关身份验证的更多详细信息,请查看此指南。
启用 “huggingface” 数据源
PySpark 4 引入了新的数据源 API,允许使用来自自定义源的数据集。如果安装了 pyspark_huggingface
,PySpark 会自动导入它并启用 “huggingface” 数据源。
该库还为 PySpark 3.5、3.4 和 3.3 向后移植了 “huggingface” 数据源的 API。但在这种情况下,需要显式导入 pyspark_huggingface
来激活向后移植的功能并启用 “huggingface” 数据源。
>>> import pyspark_huggingface
huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4)
读取
“huggingface” 数据源允许从 Hugging Face 读取数据集,其底层使用 pyarrow
来流式传输 Arrow 数据。这与 Hugging Face 上所有支持格式的数据集兼容,例如 Parquet 数据集。
例如,以下是如何加载 stanfordnlp/imdb 数据集:
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")
这是另一个使用 BAAI/Infinity-Instruct 数据集的例子。这是一个门控仓库,用户在访问之前必须接受使用条款。它还有多个子集,即“3M”和“7M”。因此,我们需要指定加载哪一个。


我们使用 .format()
函数来指定 “huggingface” 数据源,并使用 .load()
来加载数据集(更确切地说,是名为 “7M” 的配置或子集,包含 700 万个样本)。然后,我们计算每种语言的对话数量并对数据集进行过滤。
登录以访问门控仓库后,我们可以运行:
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
>>> df.show()
+---+----------------------------+-----+----------+--------------------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+--------------------+
| 0| [{human, def exti...| | en| code_exercises|
| 1| [{human, See the ...| | en| flan|
| 2| [{human, This is ...| | en| flan|
| 3| [{human, If you d...| | en| flan|
| 4| [{human, In a Uni...| | en| flan|
| 5| [{human, Read the...| | en| flan|
| 6| [{human, You are ...| | en| code_bagel|
| 7| [{human, I want y...| | en| Subjective|
| 8| [{human, Given th...| | en| flan|
| 9|[{human, 因果联系原则是法...| | zh-cn| Subjective|
| 10| [{human, Provide ...| | en|self-oss-instruct...|
| 11| [{human, The univ...| | en| flan|
| 12| [{human, Q: I am ...| | en| flan|
| 13| [{human, What is ...| | en| OpenHermes-2.5|
| 14| [{human, In react...| | en| flan|
| 15| [{human, Write Py...| | en| code_exercises|
| 16| [{human, Find the...| | en| MetaMath|
| 17| [{human, Three of...| | en| MetaMath|
| 18| [{human, Chandra ...| | en| MetaMath|
| 19|[{human, 用经济学知识分析...| | zh-cn| Subjective|
+---+----------------------------+-----+----------+--------------------+
这将以流式方式加载数据集,并且输出的 DataFrame 中每个数据文件对应一个分区,以实现高效的分布式处理。
为了计算每种语言的对话数量,我们运行这段使用 columns
选项和 groupBy()
操作的代码。columns
选项很有用,因为它只加载我们需要的数据,因为 PySpark 的数据源 API 不支持谓词下推。还有一个 filters
选项,可以只加载值在特定范围内的数据。
>>> df_langdetect_only = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("columns", '["langdetect"]')
... .load("BAAI/Infinity-Instruct")
... )
>>> df_langdetect_only.groupBy("langdetect").count().show()
+----------+-------+
|langdetect| count|
+----------+-------+
| en|6697793|
| zh-cn| 751313|
+----------+-------+
要过滤数据集,只保留中文对话:
>>> df_chinese_only = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("filters", '[("langdetect", "=", "zh-cn")]')
... .load("BAAI/Infinity-Instruct")
... )
>>> df_chinese_only.show()
+---+----------------------------+-----+----------+----------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+----------+
| 9|[{human, 因果联系原则是法...| | zh-cn|Subjective|
| 19|[{human, 用经济学知识分析...| | zh-cn|Subjective|
| 38| [{human, 某个考试共有A、...| | zh-cn|Subjective|
| 39|[{human, 撰写一篇关于斐波...| | zh-cn|Subjective|
| 57|[{human, 总结世界历史上的...| | zh-cn|Subjective|
| 61|[{human, 生成一则广告词。...| | zh-cn|Subjective|
| 66|[{human, 描述一个有效的团...| | zh-cn|Subjective|
| 94|[{human, 如果比利和蒂芙尼...| | zh-cn|Subjective|
|102|[{human, 生成一句英文名言...| | zh-cn|Subjective|
|106|[{human, 写一封感谢信,感...| | zh-cn|Subjective|
|118| [{human, 生成一个故事。}...| | zh-cn|Subjective|
|174|[{human, 高胆固醇水平的后...| | zh-cn|Subjective|
|180|[{human, 基于以下角色信息...| | zh-cn|Subjective|
|192|[{human, 请写一篇文章,概...| | zh-cn|Subjective|
|221|[{human, 以诗歌形式表达对...| | zh-cn|Subjective|
|228|[{human, 根据给定的指令,...| | zh-cn|Subjective|
|236|[{human, 打开一个新的生成...| | zh-cn|Subjective|
|260|[{human, 生成一个有关未来...| | zh-cn|Subjective|
|268|[{human, 如果有一定数量的...| | zh-cn|Subjective|
|273| [{human, 题目:小明有5个...| | zh-cn|Subjective|
+---+----------------------------+-----+----------+----------+
也可以在加载的 DataFrame 上应用过滤器或删除列,但在加载时进行这些操作效率更高,尤其是在 Parquet 数据集上。这是因为 Parquet 在文件和行组级别包含了元数据,这使得可以跳过不包含满足条件样本的整个数据集部分。Parquet 中的列也可以独立加载,这允许跳过排除的列并避免加载不必要的数据。
选项
以下是可传递给 read..option()
的选项列表:
config
(string):选择一个数据集子集/配置split
(string):选择一个数据集划分(默认为 “train”)token
(string):你的 Hugging Face 令牌
对于 Parquet 数据集
columns
(string):选择要加载的列的子集,例如'["id"]'
filters
(string):用于跳过不符合条件的文件和行组,例如'["source", "=", "code_exercises"]'
。过滤器会传递给 pyarrow.parquet.ParquetDataset。
任何其他选项都会作为参数传递给 [datasets.load_dataset] (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset)
运行 SQL 查询
一旦你的 PySpark DataFrame 准备就绪,你就可以使用 spark.sql
运行 SQL 查询。
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("columns", '["source"]')
... .load("BAAI/Infinity-Instruct")
... )
>>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show()
+--------------------+-------+
| source| total|
+--------------------+-------+
| flan|2435840|
| Subjective|1342427|
| OpenHermes-2.5| 855478|
| MetaMath| 690138|
| code_exercises| 590958|
|Orca-math-word-pr...| 398168|
| code_bagel| 386649|
| MathInstruct| 329254|
|python-code-datas...| 88632|
|instructional_cod...| 82920|
| CodeFeedback| 79513|
|self-oss-instruct...| 50467|
|Evol-Instruct-Cod...| 43354|
|CodeExercise-Pyth...| 27159|
|code_instructions...| 23130|
| Code-Instruct-700k| 10860|
|Glaive-code-assis...| 9281|
|python_code_instr...| 2581|
|Python-Code-23k-S...| 2297|
+--------------------+-------+
再次强调,指定 columns
选项不是必须的,但它有助于避免加载不必要的数据,从而使查询更快。
写入
你可以使用 “huggingface” 数据源将 PySpark DataFrame 写入 Hugging Face。它以分布式方式并行上传 Parquet 文件,并且只有在所有文件都上传完毕后才提交。工作方式如下:
>>> import pyspark_huggingface
>>> df.write.format("huggingface").save("username/dataset_name")
下面是我们如何使用这个函数将过滤后的 BAAI/Infinity-Instruct 数据集写回 Hugging Face 的例子。
首先你需要创建一个数据集仓库,例如 username/Infinity-Instruct-Chinese-Only
(如果你愿意,可以将其设为私有)。然后,确保你已经通过身份验证并且可以使用 “huggingface” 数据源,将 mode
设置为 “overwrite”(或 “append” 如果你想扩展现有数据集),然后使用 .save()
推送到 Hugging Face。
>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only")


模式
将数据集推送到 Hugging Face 时有两种模式可用:
- “overwrite”:如果数据集已存在,则覆盖它
- “append”:将数据集追加到现有数据集中
选项
以下是可传递给 write.option()
的选项列表:
token
(string):你的 Hugging Face 令牌
欢迎贡献代码以在此处添加更多选项,特别是 subset
和 split
。