⭐ PySpark 和 🤗 Hugging Face Parquet 文件
社区文章 发布于2024年8月13日
简介
欢迎阅读本指南,了解如何使用 PySpark 加载和处理来自 Hugging Face 数据集的 Parquet 文件!我们将引导您设置 Spark 会话、加载 Parquet 文件并执行基本数据操作,所有这些都以葡萄酒评论数据集为例。让我们开始吧!
目录
1. 设置
开始之前,让我们先设置环境。首先,我们将安装必要的库并启动一个 Spark 会话。
pip install pyspark
接下来,我们将导入所需的依赖项并初始化 Spark 会话。Spark 会话是您使用 Spark 处理 DataFrame 的入口点。
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WineReviews").getOrCreate()
2. 显示数据集
我们可以直接从 Hugging Face 显示一个交互式查看器,以更好地理解我们的数据集。此步骤是可选的,但强烈推荐!
from IPython.display import HTML
dataset = "james-burton/wine_reviews"
iframe_html = """
<iframe src="https://huggingface.co/datasets/{dataset}/embed/viewer" width="80%" height="560px"></iframe>
""".format(dataset=dataset)
display(HTML(iframe_html))
葡萄酒评论数据集包含专业葡萄酒评论家和爱好者对葡萄酒的评论,详细介绍了各种葡萄酒特征,如品种、评分、价格和产地。
3. 加载 Hugging Face Parquet 文件
现在,让我们通过从 Hugging Face API 获取 Parquet 文件 URL 并将它们添加到 Spark 上下文中来加载数据集。
import requests
HUGGING_FACE_PARQUET_API = "https://huggingface.co/api/datasets/{dataset}/parquet"
r = requests.get(HUGGING_FACE_PARQUET_API.format(dataset=dataset))
train_parquet_files = r.json()['default']['train']
for url in train_parquet_files:
spark.sparkContext.addFile(url)
df = spark.read.parquet(SparkFiles.getRootDirectory() + "/*.parquet")
4. 探索数据
将数据加载到 DataFrame 后,我们可以探索其结构和内容。
# Shape of the dataset
print(f"Shape of the dataset: {df.count()}, {len(df.columns)}")
# Displaying first 10 rows
df.show(n=10)
# Getting a statistical summary of the data
df.describe().show()
# Print the schema of the DataFrame
df.printSchema()
5. 数据转换
让我们对数据集进行一些基本转换。
# Display all values of a particular column
df.select('country').show()
# Select multiple columns
df.select(['country','province']).show()
# Display data types of columns
df.dtypes
# Create a subset of the dataset
df1 = df.limit(5)
df1.show()
添加新列
我们将添加一个新列,将 `country` 和 `province` 的值用连字符连接起来。
from pyspark.sql.functions import concat, lit
df1 = df.withColumn("location", concat(df['country'], lit('-'), df['province']))
df1.show()
分组数据
按国家/地区对数据进行分组,并计算每个国家/地区的记录数,然后按降序对结果进行排序。
df.groupBy('country').count().orderBy('count', ascending=False).show()
执行 SQL 查询
您也可以使用 SQL 查询实现相同的结果。
df.createOrReplaceTempView("wine_reviews_table")
spark.sql("SHOW TABLES;").show()
result_df = spark.sql("SELECT country, count(*) as count from wine_reviews_table GROUP BY country ORDER BY count DESC")
result_df.show()
处理缺失值
让我们检查并处理数据集中的缺失值。
from pyspark.sql.functions import col, isnan, when, count
null_df_counts = df.select([count(when(col(c).contains('None') | \
col(c).contains('NULL') | \
(col(c) == '') | \
col(c).isNull() | \
isnan(c), c
)).alias(c)
for c in df.columns])
null_df_counts.show()
# Removing rows with any null values
df_clean = df.dropna()
df_clean.show()
您可以在此处查看包含代码的笔记本。
6. 结论
在本教程中,我们介绍了:
- 初始化 Spark 会话。
- 将 Hugging Face 的 Parquet 文件加载到 PySpark DataFrame 中。
- 探索数据结构和内容。
- 执行数据转换。
- 执行 SQL 查询。
- 处理缺失值。
这些步骤为使用 PySpark 进行更高级的数据分析和转换奠定了基础。
🤗 祝您数据处理愉快!