Datasets 文档

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

数据集流式传输使您无需下载即可使用数据集。数据在您迭代数据集时进行流式传输。这在以下情况尤其有用:

  • 您不想等待下载一个非常大的数据集。
  • 数据集大小超过了您计算机上的可用磁盘空间。
  • 您想快速探索数据集的几个样本。

例如,HuggingFaceFW/fineweb 数据集的英文部分为 45TB,但您可以通过流式传输即时使用它。通过在 load_dataset() 中设置 streaming=True 来流式传输数据集,如下所示:

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> print(next(iter(dataset)))
{'text': 'How AP reported in all formats from tornado-stricken regionsMarch 8, 2012\nWhen the first serious bout of tornadoes of 2012 blew through middle America in the middle of the night, they touched down in places hours from any AP bureau...', ...,
 'language_score': 0.9721424579620361, 'token_count': 717}

数据集流式传输还允许您处理由本地文件组成的数据集,而无需进行任何转换。在这种情况下,数据会在您迭代数据集时从本地文件流式传输。这在以下情况尤其有用:

  • 您不想等待将非常大的本地数据集转换为 Arrow。
  • 转换后的文件大小会超过您计算机上的可用磁盘空间。
  • 您想快速探索数据集的几个样本。
  • 您只想加载某些列或高效地过滤 Parquet 数据集。

例如,您可以流式传输一个包含数百个压缩 JSONL 文件的本地数据集,例如 oscar-corpus/OSCAR-2201,并即时使用它。

>>> from datasets import load_dataset
>>> data_files = {'train': 'path/to/OSCAR-2201/compressed/en_meta/*.jsonl.gz'}
>>> dataset = load_dataset('json', data_files=data_files, split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...

Parquet 是一种列式格式,允许您流式传输和加载列的子集,并忽略不需要的列。Parquet 还存储元数据,例如列统计信息(在文件和行组级别),从而实现高效过滤。使用 datasets.packaged_modules.parquet.ParquetConfigcolumnsfilters 参数来流式传输 Parquet 数据集,选择列,并应用过滤器。

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True, columns=["url", "date"])
>>> print(next(iter(dataset)))
{'url': 'http://%20jwashington@ap.org/Content/Press-Release/2012/How-AP-reported-in-all-formats-from-tornado-stricken-regions', 'date': '2013-05-18T05:48:54Z'}
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True, filters=[("language_score", ">=", 0.99)])
>>> print(next(iter(dataset)))
{'text': 'Everyone wishes for something. And lots of people believe they know how to make their wishes come true with magical thinking.\nWhat is it? "Magical thinking is a belief in forms of causation, with no known physical basis," said Professor Emily Pronin of Princeton...', ...,
 'language_score': 0.9900368452072144, 'token_count': 716}

以流式传输模式加载数据集会创建一个新的数据集类型实例(而不是经典的 Dataset 对象),称为 IterableDataset。这种特殊类型的数据集具有自己的一套处理方法,如下所示。

一个 IterableDataset 对于像训练模型这样的迭代任务很有用。您不应该将 IterableDataset 用于需要随机访问样本的任务,因为您必须使用 for 循环遍历它。获取可迭代数据集中的最后一个样本将需要您遍历所有之前的样本。您可以在 Dataset vs. IterableDataset 指南中找到更多详细信息。

列索引

有时按特定列的值进行迭代很方便。幸运的是,IterableDataset 支持列索引。

>>> from datasets import load_dataset
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> print(next(iter(dataset["text"])))
Beginners BBQ Class Taking Place in Missoula!...

从 Dataset 转换

如果您有一个现有的 Dataset 对象,您可以使用 to_iterable_dataset() 函数将其转换为 IterableDataset。这实际上比在 load_dataset() 中设置 streaming=True 参数更快,因为数据是从本地文件流式传输的。

>>> from datasets import load_dataset

# faster 🐇
>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset()

# slower 🐢
>>> iterable_dataset = load_dataset("ethz/food101", streaming=True)

to_iterable_dataset() 函数支持分片(sharding),当 IterableDataset 被实例化时。这在使用大型数据集时非常有用,并且您希望对数据集进行混洗(shuffle)或使用 PyTorch DataLoader 实现快速并行加载。

>>> import torch
>>> from datasets import load_dataset

>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset(num_shards=64) # shard the dataset
>>> iterable_dataset = iterable_dataset.shuffle(buffer_size=10_000)  # shuffles the shards order and use a shuffle buffer when you start iterating
dataloader = torch.utils.data.DataLoader(iterable_dataset, num_workers=4)  # assigns 64 / 4 = 16 shards from the shuffled list of shards to each worker when you start iterating

混洗

与常规的 Dataset 对象一样,您也可以使用 IterableDataset.shuffle() 来混洗 IterableDataset

buffer_size 参数控制用于随机采样样本的缓冲区大小。假设您的数据集有 100 万个样本,并且您将 buffer_size 设置为 10,000。 IterableDataset.shuffle() 将从缓冲区的前 10,000 个样本中随机选择样本。缓冲区中的选定样本会被新样本替换。默认情况下,缓冲区大小为 1,000。

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10_000)

IterableDataset.shuffle() 还会混洗分片(shards)的顺序,如果数据集被分成多个文件。

重新混洗

有时您可能希望在每个 epoch(周期)后重新混洗数据集。这将需要您为每个 epoch 设置不同的种子。在 epoch 之间使用 IterableDataset.set_epoch() 来告知数据集当前是哪个 epoch。

您的种子实际上是:初始种子 + 当前 epoch

>>> for epoch in range(epochs):
...     shuffled_dataset.set_epoch(epoch)
...     for example in shuffled_dataset:
...         ...

拆分数据集

您可以按照两种方式之一拆分数据集:

>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> dataset_head = dataset.take(2)
>>> list(dataset_head)
[{'text': "How AP reported in all formats from tor...},
 {'text': 'Did you know you have two little yellow...}]
>>> train_dataset = shuffled_dataset.skip(1000)

takeskip 会阻止后续对 shuffle 的调用,因为它们会固定分片(shards)的顺序。您应该在拆分数据集之前对其进行 shuffle

分片

🤗 Datasets 支持分片,将非常大的数据集分割成预定义的块。在 shard() 中指定 num_shards 参数来确定要将数据集分割成的分片数量。您还需要使用 index 参数提供您想要返回的分片。

例如,amazon_polarity 数据集有 4 个分片(在本例中是 4 个 Parquet 文件)。

>>> from datasets import load_dataset
>>> dataset = load_dataset("fancyzhx/amazon_polarity", split="train", streaming=True)
>>> print(dataset)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 4
})

将数据集分片成两块后,第一个将只有 2 个分片。

>>> dataset.shard(num_shards=2, index=0)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 2
})

如果您的数据集具有 dataset.num_shards==1,则应使用 IterableDataset.skip()IterableDataset.take() 来对其进行分块。

交织

interleave_datasets() 可以将一个 IterableDataset 与其他数据集合并。合并后的数据集会交替返回原始数据集中每个数据集的样本。

>>> from datasets import interleave_datasets
>>> es_dataset = load_dataset('allenai/c4', 'es', split='train', streaming=True)
>>> fr_dataset = load_dataset('allenai/c4', 'fr', split='train', streaming=True)

>>> multilingual_dataset = interleave_datasets([es_dataset, fr_dataset])
>>> list(multilingual_dataset.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Le sacre de philippe ier, 23 mai 1059 - Compte Rendu...'}]

为每个原始数据集定义采样概率,以更好地控制它们的采样和组合方式。使用所需的采样概率设置 probabilities 参数。

>>> multilingual_dataset_with_oversampling = interleave_datasets([es_dataset, fr_dataset], probabilities=[0.8, 0.2], seed=42)
>>> list(multilingual_dataset_with_oversampling.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Chevrolet Cavalier Usados en Bogota - Carros en Vent...'}]

最终数据集中约 80% 来自 es_dataset,20% 来自 fr_dataset

您还可以指定 stopping_strategy。默认策略 first_exhausted 是一种欠采样策略,即当其中一个数据集样本耗尽时,数据集的构建就停止。您可以指定 stopping_strategy=all_exhausted 来执行一种过采样策略。在这种情况下,数据集的构建将在每个数据集中的所有样本至少被添加一次后停止。实际上,这意味着如果一个数据集耗尽了,它会回到该数据集的开头,直到达到停止条件。请注意,如果没有指定采样概率,新数据集将包含 max_length_datasets*nb_dataset 个样本。还有 stopping_strategy=all_exhausted_without_replacement 来确保每个样本都被精确地看到一次。

重命名、移除和类型转换

以下方法允许您修改数据集的列。这些方法对于重命名或移除列以及将列更改为一组新特征非常有用。

重命名

当您需要重命名数据集中的列时,请使用 IterableDataset.rename_column()。与原始列关联的特征实际上会移动到新列名下,而不是直接替换原始列。

IterableDataset.rename_column() 提供原始列名和新列名。

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.rename_column("text", "content")

移除

当您需要移除一个或多个列时,请将 IterableDataset.remove_columns() 的参数设置为要移除的列名。通过提供列名列表来移除多个列。

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.remove_columns('timestamp')

类型转换

IterableDataset.cast() 更改一个或多个列的特征类型。此方法以您的新 Features 作为参数。下面的示例代码显示了如何更改 ClassLabelValue 的特征类型。

>>> from datasets import load_dataset
>>> dataset = load_dataset('nyu-mll/glue', 'mrpc', split='train', streaming=True)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['not_equivalent', 'equivalent']),
'idx': Value('int32')}

>>> from datasets import ClassLabel, Value
>>> new_features = dataset.features.copy()
>>> new_features["label"] = ClassLabel(names=['negative', 'positive'])
>>> new_features["idx"] = Value('int64')
>>> dataset = dataset.cast(new_features)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['negative', 'positive']),
'idx': Value('int64')}

类型转换仅在原始特征类型和新特征类型兼容时有效。例如,您可以将具有特征类型 Value('int32') 的列转换为 Value('bool'),前提是原始列仅包含 0 和 1。

使用 IterableDataset.cast_column() 来更改单个列的特征类型。将列名及其新特征类型作为参数传入。

>>> dataset.features
{'audio': Audio(sampling_rate=44100, mono=True)}

>>> dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
>>> dataset.features
{'audio': Audio(sampling_rate=16000, mono=True)}

映射

与常规 DatasetDataset.map() 函数类似,🤗 Datasets 提供了 IterableDataset.map() 来处理 IterableDatasetIterableDataset.map() 在样本流式传输时即时应用处理。

它允许您将处理函数应用于数据集中的每个样本,独立地或批量处理。此函数甚至可以创建新的行和列。

以下示例演示了如何对 IterableDataset 进行分词(tokenize)。函数需要接受并输出一个 dict

>>> def add_prefix(example):
...     example['text'] = 'My text: ' + example['text']
...     return example

接下来,使用 IterableDataset.map() 将此函数应用于数据集。

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> updated_dataset = dataset.map(add_prefix)
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...',
  'timestamp': '2019-04-25 12:57:54',
  'url': 'https://klyq.com/beginners-bbq-class-taking-place-in-missoula/'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...',
  'timestamp': '2019-04-21 10:07:13',
  'url': 'https://forums.macrumors.com/threads/restore-from-larger-disk-to-smaller-disk.1311329/'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...',
  'timestamp': '2019-04-25 10:40:23',
  'url': 'https://awishcometrue.com/Catalogs/Clearance/Tweens/V1960-Find-A-Way'}]

让我们看另一个例子,这次您将使用 IterableDataset.map() 移除列。当您移除一列时,它仅在样本已提供给映射函数后才被移除。这允许映射函数在使用它们被移除之前使用这些列的内容。

IterableDataset.map() 中使用 remove_columns 参数指定要移除的列。

>>> updated_dataset = dataset.map(add_prefix, remove_columns=["timestamp", "url"])
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...'}]

批量处理

IterableDataset.map() 也支持处理样本批次。通过设置 batched=True 来操作批次。默认批次大小为 1000,但您可以使用 batch_size 参数进行调整。这为许多有趣的应用打开了大门,例如分词、将长句子拆分成更短的块以及数据增强。

分词

>>> from datasets import load_dataset
>>> from transformers import AutoTokenizer
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
>>> def encode(examples):
...     return tokenizer(examples['text'], truncation=True, padding='max_length')
>>> dataset = dataset.map(encode, batched=True, remove_columns=["text", "timestamp", "url"])
>>> next(iter(dataset))
{'input_ids': [101, 4088, 16912, 22861, 4160, 2465, 2635, 2173, 1999, 3335, ..., 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..., 0, 0]}

有关批量处理的其他示例,请参阅 批量映射处理 文档。它们对于可迭代数据集的工作方式相同。

过滤

您可以使用 Dataset.filter() 根据谓词函数过滤数据集中的行。它返回满足指定条件的行。

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', streaming=True, split='train')
>>> start_with_ar = dataset.filter(lambda example: example['text'].startswith('San Francisco'))
>>> next(iter(start_with_ar))
{'text': 'San Francisco 49ers cornerback Shawntae Spencer will miss the rest of the sea...}

Dataset.filter() 也可以通过索引进行过滤,如果您设置 with_indices=True

>>> even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
>>> list(even_dataset.take(3))
[{'text': 'How AP reported in all formats from tornado-stricken regionsMarch 8, 2012 Whe...},
 {'text': 'Car Wash For Clara! Now is your chance to help! 2 year old Clara Woodward has...},
 {'text': 'Log In Please enter your ECode to log in. Forgotten your eCode? If you create...}]

批处理

batch 方法将您的 IterableDataset 转换为一个可迭代的批次。当您想在训练循环中使用批次,或使用需要批次输入的框架时,这尤其有用。

在使用 map 函数将函数应用于数据批次时,还有一个“批处理”选项,这在上面的 Map 部分中有讨论。这里描述的 batch 方法是不同的,它提供了更直接的方式来从您的数据集中创建批次。

您可以使用 batch 方法如下

from datasets import load_dataset

# Load a dataset in streaming mode
dataset = load_dataset("some_dataset", split="train", streaming=True)

# Create batches of 32 samples
batched_dataset = dataset.batch(batch_size=32)

# Iterate over the batched dataset
for batch in batched_dataset:
    print(batch)
    break

在此示例中,batched_dataset 仍然是一个 IterableDataset,但每个生成的项现在是 32 个样本的批次,而不是单个样本。此批处理是按需进行的,当您迭代数据集时,它会保留 IterableDataset 的内存效率。

batch 方法还提供了 drop_last_batch 参数。当设置为 True 时,它将丢弃最后一个批次,如果它小于指定的 batch_size。这在下游处理需要所有批次大小相同的场景中非常有用。

batched_dataset = dataset.batch(batch_size=32, drop_last_batch=True)

在训练循环中流式传输

IterableDataset 可以集成到训练循环中。首先,对数据集进行shuffle。

Pytorch
隐藏 Pytorch 内容
>>> seed, buffer_size = 42, 10_000
>>> dataset = dataset.shuffle(seed, buffer_size=buffer_size)

最后,创建一个简单的训练循环并开始训练。

>>> import torch
>>> from torch.utils.data import DataLoader
>>> from transformers import AutoModelForMaskedLM, DataCollatorForLanguageModeling
>>> from tqdm import tqdm
>>> dataset = dataset.with_format("torch")
>>> dataloader = DataLoader(dataset, collate_fn=DataCollatorForLanguageModeling(tokenizer))
>>> device = 'cuda' if torch.cuda.is_available() else 'cpu' 
>>> model = AutoModelForMaskedLM.from_pretrained("distilbert-base-uncased")
>>> model.train().to(device)
>>> optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
>>> for epoch in range(3):
...     dataset.set_epoch(epoch)
...     for i, batch in enumerate(tqdm(dataloader, total=5)):
...         if i == 5:
...             break
...         batch = {k: v.to(device) for k, v in batch.items()}
...         outputs = model(**batch)
...         loss = outputs[0]
...         loss.backward()
...         optimizer.step()
...         optimizer.zero_grad()
...         if i % 10 == 0:
...             print(f"loss: {loss}")

保存数据集检查点并恢复迭代

如果您的训练循环停止了,您可能希望从停止的地方重新开始训练。要做到这一点,您可以保存模型的检查点和优化器,以及您的数据加载器。

可迭代数据集不提供对特定示例索引的随机访问来恢复,但您可以使用 IterableDataset.state_dict()IterableDataset.load_state_dict() 来从检查点恢复,这与您可以对模型和优化器做的一样。

>>> iterable_dataset = Dataset.from_dict({"a": range(6)}).to_iterable_dataset(num_shards=3)
>>> for idx, example in enumerate(iterable_dataset):
...     print(example)
...     if idx == 2:
...         state_dict = iterable_dataset.state_dict()
...         print("checkpoint")
...         break
>>> iterable_dataset.load_state_dict(state_dict)
>>> print(f"restart from checkpoint")
>>> for example in iterable_dataset:
...     print(example)

返回

{'a': 0}
{'a': 1}
{'a': 2}
checkpoint
restart from checkpoint
{'a': 3}
{'a': 4}
{'a': 5}

在底层,可迭代数据集会跟踪正在读取的当前分片和当前分片中的示例索引,并将此信息存储在 state_dict 中。

要从检查点恢复,数据集会跳过所有先前读取的分片,以从当前分片重新开始。然后它会读取该分片并跳过示例,直到达到检查点中的确切示例。

因此,重新启动数据集的速度很快,因为它不会重新读取已经迭代过的分片。尽管如此,恢复数据集通常不是瞬时的,因为它必须从当前分片开始重新读取并跳过示例,直到达到检查点位置。

这可以与 torchdata 中的 StatefulDataLoader 一起使用。

>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(iterable_dataset, batch_size=32, num_workers=4)
>>> # checkpoint
>>> state_dict = dataloader.state_dict()  # uses iterable_dataset.state_dict() under the hood
>>> # resume from checkpoint
>>> dataloader.load_state_dict(state_dict)  # uses iterable_dataset.load_state_dict() under the hood

恢复会精确地回到检查点保存的位置,除非使用了 .shuffle():shuffle 缓冲区中的示例在恢复时会丢失,并且缓冲区会重新填充新数据。

保存

一旦您的可迭代数据集准备就绪,您就可以将其保存为 Hugging Face 数据集,格式为 Parquet,并稍后使用 load_dataset() 重用它。

通过提供您希望保存到的 Hugging Face 数据集存储库的名称,使用 push_to_hub() 保存您的数据集。这将迭代数据集并将数据逐步上传到 Hugging Face。

dataset.push_to_hub("username/my_dataset")

如果数据集包含多个分片(dataset.num_shards > 1),您可以使用多个进程并行上传。如果应用了 map()filter() 步骤,这会特别有用,因为它们将并行运行得更快。

dataset.push_to_hub("username/my_dataset", num_proc=8)

使用 load_dataset() 函数重新加载数据集。

from datasets import load_dataset
reloaded_dataset = load_dataset("username/my_dataset")

导出

🤗 Datasets 也支持导出,这样您就可以在其他应用程序中使用您的数据集。下表显示了当前支持的您可以导出到的文件格式。

文件类型 导出方法
CSV IterableDataset.to_csv()
JSON IterableDataset.to_json()
Parquet IterableDataset.to_parquet()
SQL IterableDataset.to_sql()
内存中的 Python 对象 IterableDataset.to_pandas(), IterableDataset.to_polars()IterableDataset.to_dict()

例如,像这样将您的数据集导出到 CSV 文件

>>> dataset.to_csv("path/of/my/dataset.csv")

如果您有一个大型数据集,您可以为每个分片保存一个文件,例如:

>>> num_shards = dataset.num_shards
>>> for index in range(num_shards):
...     shard = dataset.shard(index, num_shards)
...     shard.to_parquet(f"path/of/my/dataset/data-{index:05d}.parquet")
在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.