数据集文档

流式

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

流式

数据集流式处理允许您在不下载数据集的情况下使用数据集。数据在您迭代数据集时进行流式传输。当以下情况时,这尤其有帮助:

  • 您不想等待极其庞大的数据集下载完成。
  • 数据集大小超过您计算机上可用的磁盘空间量。
  • 您只想快速浏览数据集的几个样本。

例如,HuggingFaceFW/fineweb 数据集的英文拆分有 45 TB,但您可以通过流式处理立即使用它。通过在 load_dataset() 中设置 streaming=True 来流式传输数据集,如下所示

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> print(next(iter(dataset)))
{'text': "How AP reported in all formats from tornado-stricken regionsMarch 8, 2012\nWhen the first serious bout of tornadoes of 2012 blew through middle America in the middle of the night, they touched down in places hours from any AP bureau...

数据集流式处理还允许您处理由本地文件组成的数据集,而无需进行任何转换。在这种情况下,数据在您迭代数据集时从本地文件流式传输。当以下情况时,这尤其有帮助:

  • 您不想等待将极其庞大的本地数据集转换为 Arrow。
  • 转换后的文件大小将超过您计算机上可用的磁盘空间量。
  • 您只想快速浏览数据集的几个样本。

例如,您可以流式传输由数百个压缩 JSONL 文件组成的本地数据集,例如 oscar-corpus/OSCAR-2201,以便立即使用它

>>> from datasets import load_dataset
>>> data_files = {'train': 'path/to/OSCAR-2201/compressed/en_meta/*.jsonl.gz'}
>>> dataset = load_dataset('json', data_files=data_files, split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...

以流式模式加载数据集会创建一个新的数据集类型实例(而不是经典的 Dataset 对象),称为 IterableDataset。这种特殊类型的数据集有其自身的一组处理方法,如下所示。

IterableDataset 对于迭代作业(如训练模型)非常有用。您不应将 IterableDataset 用于需要随机访问示例的作业,因为您必须使用 for 循环遍历它。要获取可迭代数据集中的最后一个示例,您需要遍历所有之前的示例。您可以在 Dataset vs. IterableDataset 指南中找到更多详细信息。

从 Dataset 转换

如果您有一个现有的 Dataset 对象,您可以使用 to_iterable_dataset() 函数将其转换为 IterableDataset。这实际上比在 load_dataset() 中设置 streaming=True 参数更快,因为数据是从本地文件流式传输的。

>>> from datasets import load_dataset

# faster 🐇
>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset()

# slower 🐢
>>> iterable_dataset = load_dataset("ethz/food101", streaming=True)

当实例化 IterableDataset 时,to_iterable_dataset() 函数支持分片。当处理大型数据集,并且您希望对数据集进行洗牌或使用 PyTorch DataLoader 启用快速并行加载时,这非常有用。

>>> import torch
>>> from datasets import load_dataset

>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset(num_shards=64) # shard the dataset
>>> iterable_dataset = iterable_dataset.shuffle(buffer_size=10_000)  # shuffles the shards order and use a shuffle buffer when you start iterating
dataloader = torch.utils.data.DataLoader(iterable_dataset, num_workers=4)  # assigns 64 / 4 = 16 shards from the shuffled list of shards to each worker when you start iterating

洗牌

与常规 Dataset 对象类似,您也可以使用 IterableDataset.shuffle()IterableDataset 进行洗牌。

buffer_size 参数控制从缓冲区中随机抽取示例的大小。假设您的数据集有一百万个示例,并且您将 buffer_size 设置为一万。IterableDataset.shuffle() 将从缓冲区中的前一万个示例中随机选择示例。缓冲区中选定的示例将替换为新的示例。默认情况下,缓冲区大小为 1,000。

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10_000)

如果数据集被分片到多个文件中,IterableDataset.shuffle() 也会对分片的顺序进行洗牌。

重新洗牌

有时您可能希望在每个 epoch 后重新洗牌数据集。这将需要您为每个 epoch 设置不同的种子。在 epoch 之间使用 IterableDataset.set_epoch() 来告知数据集您正在进行的 epoch。

您的种子实际上变为:初始种子 + 当前 epoch

>>> for epoch in range(epochs):
...     shuffled_dataset.set_epoch(epoch)
...     for example in shuffled_dataset:
...         ...

拆分数据集

您可以通过以下两种方式之一拆分数据集

>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> dataset_head = dataset.take(2)
>>> list(dataset_head)
[{'text': "How AP reported in all formats from tor...},
 {'text': 'Did you know you have two little yellow...}]
>>> train_dataset = shuffled_dataset.skip(1000)

takeskip 会阻止将来调用 shuffle,因为它们会锁定分片的顺序。您应该在拆分数据集之前 shuffle 您的数据集。

分片

🤗 Datasets 支持分片,将非常大的数据集划分为预定义数量的块。在 shard() 中指定 num_shards 参数以确定将数据集拆分成的分片数。您还需要使用 index 参数提供您要返回的分片。

例如,amazon_polarity 数据集有 4 个分片(在这种情况下,它们是 4 个 Parquet 文件)

>>> from datasets import load_dataset
>>> dataset = load_dataset("amazon_polarity", split="train", streaming=True)
>>> print(dataset)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 4
})

将数据集分片为两个块后,第一个块将只有 2 个分片

>>> dataset.shard(num_shards=2, index=0)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 2
})

如果您的数据集有 dataset.num_shards==1,您应该使用 IterableDataset.skip()IterableDataset.take() 对其进行分块。

交错

interleave_datasets() 可以将 IterableDataset 与其他数据集组合。组合后的数据集从每个原始数据集中返回交替的示例。

>>> from datasets import interleave_datasets
>>> es_dataset = load_dataset('allenai/c4', 'es', split='train', streaming=True)
>>> fr_dataset = load_dataset('allenai/c4', 'fr', split='train', streaming=True)

>>> multilingual_dataset = interleave_datasets([es_dataset, fr_dataset])
>>> list(multilingual_dataset.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Le sacre de philippe ier, 23 mai 1059 - Compte Rendu...'}]

定义每个原始数据集的采样概率,以便更好地控制如何对每个数据集进行采样和组合。使用您所需的采样概率设置 probabilities 参数

>>> multilingual_dataset_with_oversampling = interleave_datasets([es_dataset, fr_dataset], probabilities=[0.8, 0.2], seed=42)
>>> list(multilingual_dataset_with_oversampling.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Chevrolet Cavalier Usados en Bogota - Carros en Vent...'}]

最终数据集约 80% 由 en_dataset 组成,20% 由 fr_dataset 组成。

您还可以指定 stopping_strategy。默认策略 first_exhausted 是一种子采样策略,即一旦其中一个数据集用完样本,数据集构建就会停止。您可以指定 stopping_strategy=all_exhausted 来执行过采样策略。在这种情况下,一旦每个数据集中的每个样本都至少添加过一次,数据集构建就会停止。实际上,这意味着如果一个数据集已耗尽,它将返回到该数据集的开头,直到达到停止标准。请注意,如果未指定采样概率,则新数据集将具有 max_length_datasets*nb_dataset samples

重命名、删除和转换

以下方法允许您修改数据集的列。这些方法对于重命名或删除列以及将列更改为一组新特征非常有用。

重命名

当您需要重命名数据集中的列时,请使用 IterableDataset.rename_column()。与原始列关联的特征实际上是移动到新的列名下,而不是仅仅替换原始列。

IterableDataset.rename_column() 提供原始列的名称和新列名

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.rename_column("text", "content")

删除

当您需要删除一列或多列时,请向 IterableDataset.remove_columns() 提供要删除的列的名称。要删除多列,请提供列名列表

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.remove_columns('timestamp')

类型转换

IterableDataset.cast() 更改一个或多个列的特征类型。此方法将您的新 Features 作为其参数。以下示例代码展示了如何更改 ClassLabelValue 的特征类型

>>> from datasets import load_dataset
>>> dataset = load_dataset('nyu-mll/glue', 'mrpc', split='train', streaming=True)
>>> dataset.features
{'sentence1': Value(dtype='string', id=None),
'sentence2': Value(dtype='string', id=None),
'label': ClassLabel(names=['not_equivalent', 'equivalent'], id=None),
'idx': Value(dtype='int32', id=None)}

>>> from datasets import ClassLabel, Value
>>> new_features = dataset.features.copy()
>>> new_features["label"] = ClassLabel(names=['negative', 'positive'])
>>> new_features["idx"] = Value('int64')
>>> dataset = dataset.cast(new_features)
>>> dataset.features
{'sentence1': Value(dtype='string', id=None),
'sentence2': Value(dtype='string', id=None),
'label': ClassLabel(names=['negative', 'positive'], id=None),
'idx': Value(dtype='int64', id=None)}

只有当原始特征类型和新特征类型兼容时,类型转换才有效。例如,如果原始列仅包含 1 和 0,则可以将特征类型为 Value('int32') 的列转换为 Value('bool')

使用 IterableDataset.cast_column() 来更改仅一列的特征类型。将列名及其新的特征类型作为参数传递

>>> dataset.features
{'audio': Audio(sampling_rate=44100, mono=True, id=None)}

>>> dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
>>> dataset.features
{'audio': Audio(sampling_rate=16000, mono=True, id=None)}

映射

类似于常规 DatasetDataset.map() 函数,🤗 Datasets 提供了 IterableDataset.map() 用于处理 IterableDatasetIterableDataset.map() 会在示例被流式传输时即时应用处理。

它允许您将处理函数应用于数据集中的每个示例,可以独立应用,也可以批量应用。此函数甚至可以创建新的行和列。

以下示例演示了如何对 IterableDataset 进行分词。该函数需要接受并输出一个 dict

>>> def add_prefix(example):
...     example['text'] = 'My text: ' + example['text']
...     return example

接下来,使用 IterableDataset.map() 将此函数应用于数据集

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> updated_dataset = dataset.map(add_prefix)
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...',
  'timestamp': '2019-04-25 12:57:54',
  'url': 'https://klyq.com/beginners-bbq-class-taking-place-in-missoula/'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...',
  'timestamp': '2019-04-21 10:07:13',
  'url': 'https://forums.macrumors.com/threads/restore-from-larger-disk-to-smaller-disk.1311329/'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...',
  'timestamp': '2019-04-25 10:40:23',
  'url': 'https://awishcometrue.com/Catalogs/Clearance/Tweens/V1960-Find-A-Way'}]

让我们看另一个例子,这次,您将使用 IterableDataset.map() 删除列。当您删除列时,只有在示例提供给映射函数后才会被删除。这允许映射函数在使用列的内容之后再删除它们。

使用 IterableDataset.map() 中的 remove_columns 参数指定要删除的列

>>> updated_dataset = dataset.map(add_prefix, remove_columns=["timestamp", "url"])
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...'}]

批量处理

IterableDataset.map() 也支持处理批量的示例。通过设置 batched=True 来对批量进行操作。默认的批量大小为 1000,但您可以使用 batch_size 参数进行调整。这为许多有趣的应用打开了大门,例如分词、将长句分割成较短的块以及数据增强。

分词

>>> from datasets import load_dataset
>>> from transformers import AutoTokenizer
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
>>> def encode(examples):
...     return tokenizer(examples['text'], truncation=True, padding='max_length')
>>> dataset = dataset.map(encode, batched=True, remove_columns=["text", "timestamp", "url"])
>>> next(iter(dataset))
{'input_ids': [101, 4088, 16912, 22861, 4160, 2465, 2635, 2173, 1999, 3335, ..., 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..., 0, 0]}

批量映射处理 文档中查看批量处理的其他示例。它们对于可迭代数据集的工作方式相同。

过滤

您可以使用 Dataset.filter() 基于谓词函数来过滤数据集中的行。它返回与指定条件匹配的行

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', streaming=True, split='train')
>>> start_with_ar = dataset.filter(lambda example: example['text'].startswith('San Francisco'))
>>> next(iter(start_with_ar))
{'text': 'San Francisco 49ers cornerback Shawntae Spencer will miss the rest of the sea...}

如果您设置 with_indices=TrueDataset.filter() 也可以按索引进行过滤

>>> even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
>>> list(even_dataset.take(3))
[{'text': 'How AP reported in all formats from tornado-stricken regionsMarch 8, 2012 Whe...},
 {'text': 'Car Wash For Clara! Now is your chance to help! 2 year old Clara Woodward has...},
 {'text': 'Log In Please enter your ECode to log in. Forgotten your eCode? If you create...}]

批量

batch 方法将您的 IterableDataset 转换为批量的可迭代对象。当您想在训练循环中使用批量或在使用期望批量输入的框架时,这尤其有用。

当使用 map 函数将函数应用于批量数据时,还有一个“批量处理”选项,这在上面的 Map 部分 中讨论过。此处描述的 batch 方法是不同的,它提供了一种更直接的方式从您的数据集创建批量。

您可以像这样使用 batch 方法

from datasets import load_dataset

# Load a dataset in streaming mode
dataset = load_dataset("some_dataset", split="train", streaming=True)

# Create batches of 32 samples
batched_dataset = dataset.batch(batch_size=32)

# Iterate over the batched dataset
for batch in batched_dataset:
    print(batch)
    break

在此示例中,batched_dataset 仍然是一个 IterableDataset,但是现在每次迭代返回的项目是一个包含 32 个样本的批量,而不是单个样本。这种批量处理是在您迭代数据集时即时完成的,从而保留了 IterableDataset 的内存高效特性。

batch 方法还提供了一个 drop_last_batch 参数。当设置为 True 时,如果最后一个批量小于指定的 batch_size,它将丢弃最后一个批量。这在下游处理需要所有批量大小相同的场景中非常有用。

batched_dataset = dataset.batch(batch_size=32, drop_last_batch=True)

在训练循环中使用流

IterableDataset 可以集成到训练循环中。首先,打乱数据集

Pytorch
隐藏 Pytorch 内容
>>> seed, buffer_size = 42, 10_000
>>> dataset = dataset.shuffle(seed, buffer_size=buffer_size)

最后,创建一个简单的训练循环并开始训练。

>>> import torch
>>> from torch.utils.data import DataLoader
>>> from transformers import AutoModelForMaskedLM, DataCollatorForLanguageModeling
>>> from tqdm import tqdm
>>> dataset = dataset.with_format("torch")
>>> dataloader = DataLoader(dataset, collate_fn=DataCollatorForLanguageModeling(tokenizer))
>>> device = 'cuda' if torch.cuda.is_available() else 'cpu' 
>>> model = AutoModelForMaskedLM.from_pretrained("distilbert-base-uncased")
>>> model.train().to(device)
>>> optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
>>> for epoch in range(3):
...     dataset.set_epoch(epoch)
...     for i, batch in enumerate(tqdm(dataloader, total=5)):
...         if i == 5:
...             break
...         batch = {k: v.to(device) for k, v in batch.items()}
...         outputs = model(**batch)
...         loss = outputs[0]
...         loss.backward()
...         optimizer.step()
...         optimizer.zero_grad()
...         if i % 10 == 0:
...             print(f"loss: {loss}")

保存数据集检查点并恢复迭代

如果您的训练循环停止,您可能希望从停止的位置重新开始训练。为此,您可以保存模型和优化器的检查点,以及您的数据加载器。

IterableDataset 不提供对特定示例索引的随机访问以从中恢复,但您可以改用 IterableDataset.state_dict()IterableDataset.load_state_dict() 从检查点恢复,类似于您可以对模型和优化器执行的操作。

>>> iterable_dataset = Dataset.from_dict({"a": range(6)}).to_iterable_dataset(num_shards=3)
>>> for idx, example in enumerate(iterable_dataset):
...     print(example)
...     if idx == 2:
...         state_dict = iterable_dataset.state_dict()
...         print("checkpoint")
...         break
>>> iterable_dataset.load_state_dict(state_dict)
>>> print(f"restart from checkpoint")
>>> for example in iterable_dataset:
...     print(example)

返回

{'a': 0}
{'a': 1}
{'a': 2}
checkpoint
restart from checkpoint
{'a': 3}
{'a': 4}
{'a': 5}

在底层,可迭代数据集会跟踪当前正在读取的分片和当前分片中的示例索引,并将此信息存储在 state_dict 中。

要从检查点恢复,数据集会跳过之前读取的所有分片,以从当前分片重新开始。然后它读取分片并跳过示例,直到到达检查点中的确切示例。

因此,重新启动数据集非常快,因为它不会重新读取已迭代的分片。尽管如此,恢复数据集通常不是瞬时的,因为它必须从当前分片的开头重新开始读取并跳过示例,直到到达检查点位置。

这可以与 torchdata 中的 StatefulDataLoader 一起使用

>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(iterable_dataset, batch_size=32, num_workers=4)
>>> # checkpoint
>>> state_dict = dataloader.state_dict()  # uses iterable_dataset.state_dict() under the hood
>>> # resume from checkpoint
>>> dataloader.load_state_dict(state_dict)  # uses iterable_dataset.load_state_dict() under the hood

恢复操作会返回到检查点保存的确切位置,除非使用了 .shuffle():恢复时会丢失来自 shuffle 缓冲区的示例,并且缓冲区会用新数据重新填充。

< > 更新 在 GitHub 上