Spark
Spark 可以在分布式环境中实现实时、大规模数据处理。
特别是,您可以使用 huggingface_hub
在 PySpark 中访问 Hugging Face 数据集存储库
安装
为了能够读取和写入 Hugging Face URL(例如 hf://datasets/username/dataset/data.parquet
),您需要安装 huggingface_hub
库
pip install huggingface_hub
您还需要安装 pyarrow
以使用 huggingFace_hub
提供的文件系统 API 读取/写入 Parquet/JSON/CSV 等文件
pip install pyarrow
身份验证
您需要对 Hugging Face 进行身份验证才能读取私有/受限数据集存储库或写入您的数据集存储库。
例如,您可以使用 CLI
huggingface-cli login
也可以使用 HF_TOKEN
环境变量提供您的 Hugging Face 令牌,或者将 storage_options
参数传递给下面的辅助函数
storage_options = {"token": "hf_xxx"}
有关身份验证的更多详细信息,请查看 本指南。
读取
PySpark 没有正式支持 Hugging Face 路径,因此我们提供了一个辅助函数以分布式方式读取数据集。
例如,您可以使用 PyArrow 以优化方式从 Hugging Face 读取 Parquet 文件,方法是定义此 read_parquet
辅助函数
from functools import partial
from typing import Iterator, Optional, Union
import pyarrow as pa
import pyarrow.parquet as pq
from huggingface_hub import HfFileSystem
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.pandas.types import from_arrow_schema
def _read(iterator: Iterator[pa.RecordBatch], columns: Optional[list[str]], filters: Optional[Union[list[tuple], list[list[tuple]]]], **kwargs) -> Iterator[pa.RecordBatch]:
for batch in iterator:
paths = batch[0].to_pylist()
ds = pq.ParquetDataset(paths, **kwargs)
yield from ds._dataset.to_batches(columns=columns, filter=pq.filters_to_expression(filters) if filters else None)
def read_parquet(
path: str,
columns: Optional[list[str]] = None,
filters: Optional[Union[list[tuple], list[list[tuple]]]] = None,
**kwargs,
) -> DataFrame:
"""
Loads Parquet files from Hugging Face using PyArrow, returning a PySPark `DataFrame`.
It reads Parquet files in a distributed manner.
Access private or gated repositories using `huggingface-cli login` or passing a token
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}`
Parameters
----------
path : str
Path to the file. Prefix with a protocol like `hf://` to read from Hugging Face.
You can read from multiple files if you pass a globstring.
columns : list, default None
If not None, only these columns will be read from the file.
filters : List[Tuple] or List[List[Tuple]], default None
To filter out data.
Filter syntax: [[(column, op, val), ...],...]
where op is [==, =, >, >=, <, <=, !=, in, not in]
The innermost tuples are transposed into a set of filters applied
through an `AND` operation.
The outer list combines these sets of filters through an `OR`
operation.
A single list of tuples can also be used, meaning that no `OR`
operation between set of filters is to be conducted.
**kwargs
Any additional kwargs are passed to pyarrow.parquet.ParquetDataset.
Returns
-------
DataFrame
DataFrame based on parquet file.
Examples
--------
>>> path = "hf://datasets/username/dataset/data.parquet"
>>> pd.DataFrame({"foo": range(5), "bar": range(5, 10)}).to_parquet(path)
>>> read_parquet(path).show()
+---+---+
|foo|bar|
+---+---+
| 0| 5|
| 1| 6|
| 2| 7|
| 3| 8|
| 4| 9|
+---+---+
>>> read_parquet(path, columns=["bar"]).show()
+---+
|bar|
+---+
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
>>> sel = [("foo", ">", 2)]
>>> read_parquet(path, filters=sel).show()
+---+---+
|foo|bar|
+---+---+
| 3| 8|
| 4| 9|
+---+---+
"""
filesystem: HfFileSystem = kwargs.pop("filesystem") if "filesystem" in kwargs else HfFileSystem(**kwargs.pop("storage_options", {}))
paths = filesystem.glob(path)
if not paths:
raise FileNotFoundError(f"Counldn't find any file at {path}")
rdd = spark.sparkContext.parallelize([{"path": path} for path in paths], len(paths))
df = spark.createDataFrame(rdd)
arrow_schema = pq.read_schema(filesystem.open(paths[0]))
schema = pa.schema([field for field in arrow_schema if (columns is None or field.name in columns)], metadata=arrow_schema.metadata)
return df.mapInArrow(
partial(_read, columns=columns, filters=filters, filesystem=filesystem, schema=arrow_schema, **kwargs),
from_arrow_schema(schema),
)
以下是如何在 BAAI/Infinity-Instruct 数据集上使用它。这是一个受限存储库,用户必须在访问它之前接受使用条款。
我们使用 read_parquet
函数从数据集中读取数据,计算每种语言的对话数量并过滤数据集。
登录以访问受限存储库后,我们可以运行
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet")
>>> df.show()
+---+----------------------------+-----+----------+--------------------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+--------------------+
| 0| [{human, def exti...| | en| code_exercises|
| 1| [{human, See the ...| | en| flan|
| 2| [{human, This is ...| | en| flan|
| 3| [{human, If you d...| | en| flan|
| 4| [{human, In a Uni...| | en| flan|
| 5| [{human, Read the...| | en| flan|
| 6| [{human, You are ...| | en| code_bagel|
| 7| [{human, I want y...| | en| Subjective|
| 8| [{human, Given th...| | en| flan|
| 9|[{human, 因果联系原则是法...| | zh-cn| Subjective|
| 10| [{human, Provide ...| | en|self-oss-instruct...|
| 11| [{human, The univ...| | en| flan|
| 12| [{human, Q: I am ...| | en| flan|
| 13| [{human, What is ...| | en| OpenHermes-2.5|
| 14| [{human, In react...| | en| flan|
| 15| [{human, Write Py...| | en| code_exercises|
| 16| [{human, Find the...| | en| MetaMath|
| 17| [{human, Three of...| | en| MetaMath|
| 18| [{human, Chandra ...| | en| MetaMath|
| 19|[{human, 用经济学知识分析...| | zh-cn| Subjective|
+---+----------------------------+-----+----------+--------------------+
要计算每种语言的对话数量,我们运行此代码。columns
参数用于仅加载我们需要的 数据,因为在这种情况下,PySpark 不会启用谓词下推。还有一个 filters
参数用于仅加载特定范围内的值的数据。
>>> df_langdetect_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", columns=["langdetect"])
>>> df_langdetect_only.groupBy("langdetect").count().show()
+----------+-------+
|langdetect| count|
+----------+-------+
| en|6697793|
| zh-cn| 751313|
+----------+-------+
要过滤数据集并仅保留中文对话
>>> criteria = [("langdetect", "=", "zh-cn")]
>>> df_chinese_only = read_parquet("hf://datasets/BAAI/Infinity-Instruct/7M/*.parquet", filters=criteria)
>>> df_chinese_only.show()
+---+----------------------------+-----+----------+----------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+----------+
| 9|[{human, 因果联系原则是法...| | zh-cn|Subjective|
| 19|[{human, 用经济学知识分析...| | zh-cn|Subjective|
| 38| [{human, 某个考试共有A、...| | zh-cn|Subjective|
| 39|[{human, 撰写一篇关于斐波...| | zh-cn|Subjective|
| 57|[{human, 总结世界历史上的...| | zh-cn|Subjective|
| 61|[{human, 生成一则广告词。...| | zh-cn|Subjective|
| 66|[{human, 描述一个有效的团...| | zh-cn|Subjective|
| 94|[{human, 如果比利和蒂芙尼...| | zh-cn|Subjective|
|102|[{human, 生成一句英文名言...| | zh-cn|Subjective|
|106|[{human, 写一封感谢信,感...| | zh-cn|Subjective|
|118| [{human, 生成一个故事。}...| | zh-cn|Subjective|
|174|[{human, 高胆固醇水平的后...| | zh-cn|Subjective|
|180|[{human, 基于以下角色信息...| | zh-cn|Subjective|
|192|[{human, 请写一篇文章,概...| | zh-cn|Subjective|
|221|[{human, 以诗歌形式表达对...| | zh-cn|Subjective|
|228|[{human, 根据给定的指令,...| | zh-cn|Subjective|
|236|[{human, 打开一个新的生成...| | zh-cn|Subjective|
|260|[{human, 生成一个有关未来...| | zh-cn|Subjective|
|268|[{human, 如果有一定数量的...| | zh-cn|Subjective|
|273| [{human, 题目:小明有5个...| | zh-cn|Subjective|
+---+----------------------------+-----+----------+----------+
写入
我们还提供了一个辅助函数以分布式方式将数据集写入 Hugging Face 存储库。
您可以使用此基于 huggingface_hub
API 的 write_parquet
辅助函数将 PySpark Dataframe 写入 Hugging Face。特别是,它使用 preupload_lfs_files
实用程序以分布式方式并行上传 Parquet 文件,并且仅在所有文件上传后才提交文件
import math
import pickle
import tempfile
from functools import partial
from typing import Iterator, Optional
import pyarrow as pa
import pyarrow.parquet as pq
from huggingface_hub import CommitOperationAdd, HfFileSystem
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
def _preupload(iterator: Iterator[pa.RecordBatch], path: str, schema: pa.Schema, filesystem: HfFileSystem, row_group_size: Optional[int] = None, **kwargs) -> Iterator[pa.RecordBatch]:
resolved_path = filesystem.resolve_path(path)
with tempfile.NamedTemporaryFile(suffix=".parquet") as temp_file:
with pq.ParquetWriter(temp_file.name, schema=schema, **kwargs) as writer:
for batch in iterator:
writer.write_batch(batch, row_group_size=row_group_size)
addition = CommitOperationAdd(path_in_repo=temp_file.name, path_or_fileobj=temp_file.name)
filesystem._api.preupload_lfs_files(repo_id=resolved_path.repo_id, additions=[addition], repo_type=resolved_path.repo_type, revision=resolved_path.revision)
yield pa.record_batch({"addition": [pickle.dumps(addition)]}, schema=pa.schema({"addition": pa.binary()}))
def _commit(iterator: Iterator[pa.RecordBatch], path: str, filesystem: HfFileSystem, max_operations_per_commit=50) -> Iterator[pa.RecordBatch]:
resolved_path = filesystem.resolve_path(path)
additions: list[CommitOperationAdd] = [pickle.loads(addition) for addition in pa.Table.from_batches(iterator, schema=pa.schema({"addition": pa.binary()}))[0].to_pylist()]
num_commits = math.ceil(len(additions) / max_operations_per_commit)
for shard_idx, addition in enumerate(additions):
addition.path_in_repo = resolved_path.path_in_repo.replace("{shard_idx:05d}", f"{shard_idx:05d}")
for i in range(0, num_commits):
operations = additions[i * max_operations_per_commit : (i + 1) * max_operations_per_commit]
commit_message = "Upload using PySpark" + (f" (part {i:05d}-of-{num_commits:05d})" if num_commits > 1 else "")
filesystem._api.create_commit(repo_id=resolved_path.repo_id, repo_type=resolved_path.repo_type, revision=resolved_path.revision, operations=operations, commit_message=commit_message)
yield pa.record_batch({"path": [addition.path_in_repo for addition in operations]}, schema=pa.schema({"path": pa.string()}))
def write_parquet(df: DataFrame, path: str, **kwargs) -> None:
"""
Write Parquet files to Hugging Face using PyArrow.
It uploads Parquet files in a distributed manner in two steps:
1. Preupload the Parquet files in parallel in a distributed banner
2. Commit the preuploaded files
Authenticate using `huggingface-cli login` or passing a token
using the `storage_options` argument: `storage_options={"token": "hf_xxx"}`
Parameters
----------
path : str
Path of the file or directory. Prefix with a protocol like `hf://` to read from Hugging Face.
It writes Parquet files in the form "part-xxxxx.parquet", or to a single file if `path ends with ".parquet".
**kwargs
Any additional kwargs are passed to pyarrow.parquet.ParquetWriter.
Returns
-------
DataFrame
DataFrame based on parquet file.
Examples
--------
>>> spark.createDataFrame(pd.DataFrame({"foo": range(5), "bar": range(5, 10)}))
>>> # Save to one file
>>> write_parquet(df, "hf://datasets/username/dataset/data.parquet")
>>> # OR save to a directory (possibly in many files)
>>> write_parquet(df, "hf://datasets/username/dataset")
"""
filesystem: HfFileSystem = kwargs.pop("filesystem", HfFileSystem(**kwargs.pop("storage_options", {})))
if path.endswith(".parquet") or path.endswith(".pq"):
df = df.coalesce(1)
else:
path += "/part-{shard_idx:05d}.parquet"
df.mapInArrow(
partial(_preupload, path=path, schema=to_arrow_schema(df.schema), filesystem=filesystem, **kwargs),
from_arrow_schema(pa.schema({"addition": pa.binary()})),
).repartition(1).mapInArrow(
partial(_commit, path=path, filesystem=filesystem),
from_arrow_schema(pa.schema({"path": pa.string()})),
).collect()
以下是如何使用此函数将 BAAI/Infinity-Instruct 数据集的过滤版本写回 Hugging Face。
首先,您需要 创建一个数据集存储库,例如 username/Infinity-Instruct-Chinese-Only
(如果需要,您可以将其设置为私有)。然后,确保您已进行身份验证,并且可以运行
>>> write_parquet(df_chinese_only, "hf://datasets/username/Infinity-Instruct-Chinese-Only")
tmph9jwu9py.parquet: 100%|██████████| 50.5M/50.5M [00:03<00:00, 14.6MB/s]
tmp0oqt99nc.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 17.9MB/s]
tmpgnizkwqp.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 19.6MB/s]
tmpanm04k4n.parquet: 100%|██████████| 51.4M/51.4M [00:02<00:00, 22.9MB/s]
tmp14uy9oqb.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.0MB/s]
tmpcp8t_qdl.parquet: 100%|██████████| 50.4M/50.4M [00:02<00:00, 23.5MB/s]
tmpjui5mns8.parquet: 100%|██████████| 50.3M/50.3M [00:02<00:00, 24.1MB/s]
tmpydqh6od1.parquet: 100%|██████████| 50.9M/50.9M [00:02<00:00, 23.8MB/s]
tmp52f2t8tu.parquet: 100%|██████████| 50.5M/50.5M [00:02<00:00, 23.7MB/s]
tmpg7egv3ye.parquet: 100%|██████████| 50.1M/50.1M [00:06<00:00, 7.68MB/s]
tmp2s0fq2hm.parquet: 100%|██████████| 50.8M/50.8M [00:02<00:00, 18.1MB/s]
tmpmj97ab30.parquet: 100%|██████████| 71.3M/71.3M [00:02<00:00, 23.9MB/s]
在 Hugging Face Spaces 上的 JupyterLab 中运行
您可以复制 Spark on HF JupyterLab 空间,以获取预安装了 PySpark 和这些辅助函数的 Notebook。
点击“复制空间”,为您的空间选择一个名称,选择您的硬件,然后您就可以开始了。
< > 在 GitHub 上更新