Datasets 文档

流式传输

Hugging Face's logo
加入 Hugging Face 社区

并获得增强的文档体验

开始使用

流式传输

数据集流式传输允许您在不下载数据集的情况下处理数据集。数据在您迭代数据集时进行流式传输。这在以下情况下特别有用:

  • 您不想等待超大型数据集下载完成。
  • 数据集大小超出计算机上的可用磁盘空间。
  • 您只想快速浏览数据集中的少量样本。

例如,HuggingFaceFW/fineweb 数据集的英文拆分是 45 TB,但您可以通过流式传输立即使用它。在 `load_dataset()` 中设置 `streaming=True` 来流式传输数据集,如下所示

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> print(next(iter(dataset)))
{'text': "How AP reported in all formats from tornado-stricken regionsMarch 8, 2012\nWhen the first serious bout of tornadoes of 2012 blew through middle America in the middle of the night, they touched down in places hours from any AP bureau...

数据集流式传输还允许您处理由本地文件组成的数据集,而无需进行任何转换。在这种情况下,数据在您迭代数据集时从本地文件流式传输。这在以下情况下特别有用:

  • 您不想等待超大型本地数据集转换为 Arrow。
  • 转换后的文件大小会超出计算机上的可用磁盘空间。
  • 您只想快速浏览数据集中的少量样本。

例如,您可以流式传输数百个压缩 JSONL 文件的本地数据集,例如 oscar-corpus/OSCAR-2201,以便立即使用它

>>> from datasets import load_dataset
>>> data_files = {'train': 'path/to/OSCAR-2201/compressed/en_meta/*.jsonl.gz'}
>>> dataset = load_dataset('json', data_files=data_files, split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...

以流式模式加载数据集会创建一个新的数据集类型实例(而不是经典的 `Dataset` 对象),称为 `IterableDataset`。这种特殊类型的数据集有自己的一套处理方法,如下所示。

`IterableDataset` 对于像训练模型这样的迭代作业很有用。您不应该将 `IterableDataset` 用于需要随机访问示例的作业,因为您必须使用 for 循环遍历所有示例。获取可迭代数据集中的最后一个示例需要您遍历所有先前的示例。您可以在Dataset 与 IterableDataset 指南中找到更多详细信息。

列索引

有时方便地迭代特定列的值。幸运的是,`IterableDataset` 支持列索引

>>> from datasets import load_dataset
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> print(next(iter(dataset["text"])))
Beginners BBQ Class Taking Place in Missoula!...

从数据集转换

如果您有一个现有的 `Dataset` 对象,您可以使用 `to_iterable_dataset()` 函数将其转换为 `IterableDataset`。这实际上比在 `load_dataset()` 中设置 `streaming=True` 参数更快,因为数据是从本地文件流式传输的。

>>> from datasets import load_dataset

# faster 🐇
>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset()

# slower 🐢
>>> iterable_dataset = load_dataset("ethz/food101", streaming=True)

`IterableDataset` 实例化时,`to_iterable_dataset()` 函数支持分片。当处理大型数据集时,这非常有用,您可能希望打乱数据集或使用 PyTorch DataLoader 启用快速并行加载。

>>> import torch
>>> from datasets import load_dataset

>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset(num_shards=64) # shard the dataset
>>> iterable_dataset = iterable_dataset.shuffle(buffer_size=10_000)  # shuffles the shards order and use a shuffle buffer when you start iterating
dataloader = torch.utils.data.DataLoader(iterable_dataset, num_workers=4)  # assigns 64 / 4 = 16 shards from the shuffled list of shards to each worker when you start iterating

打乱

与常规的 `Dataset` 对象一样,您也可以使用 `IterableDataset.shuffle()` 来打乱 `IterableDataset`

`buffer_size` 参数控制用于随机采样示例的缓冲区大小。假设您的数据集有一百万个示例,并且您将 `buffer_size` 设置为一万。 `IterableDataset.shuffle()` 将从缓冲区中的前一万个示例中随机选择示例。缓冲区中选定的示例将被新示例替换。默认情况下,缓冲区大小为 1,000。

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10_000)

如果数据集分片为多个文件,`IterableDataset.shuffle()` 还会打乱分片的顺序。

重新洗牌

有时您可能希望在每个 epoch 之后重新打乱数据集。这将要求您为每个 epoch 设置不同的种子。在 epoch 之间使用 `IterableDataset.set_epoch()` 来告诉数据集您当前在哪个 epoch。

您的种子实际上变为:`初始种子 + 当前 epoch`。

>>> for epoch in range(epochs):
...     shuffled_dataset.set_epoch(epoch)
...     for example in shuffled_dataset:
...         ...

拆分数据集

您可以通过两种方式拆分数据集

>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> dataset_head = dataset.take(2)
>>> list(dataset_head)
[{'text': "How AP reported in all formats from tor...},
 {'text': 'Did you know you have two little yellow...}]
>>> train_dataset = shuffled_dataset.skip(1000)

`take` 和 `skip` 会阻止将来对 `shuffle` 的调用,因为它们会锁定分片的顺序。您应该在拆分数据集之前对其进行 `shuffle`。

分片

🤗 Datasets 支持分片,将非常大的数据集划分为预定义数量的块。在 `shard()` 中指定 `num_shards` 参数以确定将数据集拆分为多少个分片。您还需要使用 `index` 参数提供您要返回的分片。

例如,amazon_polarity 数据集有 4 个分片(在这种情况下它们是 4 个 Parquet 文件)

>>> from datasets import load_dataset
>>> dataset = load_dataset("amazon_polarity", split="train", streaming=True)
>>> print(dataset)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 4
})

将数据集分片成两个块后,第一个块将只有 2 个分片

>>> dataset.shard(num_shards=2, index=0)
IterableDataset({
    features: ['label', 'title', 'content'],
    num_shards: 2
})

如果您的数据集有 `dataset.num_shards==1`,您应该使用 `IterableDataset.skip()``IterableDataset.take()` 来分块。

交错

`interleave_datasets()` 可以将 `IterableDataset` 与其他数据集结合。组合后的数据集会从每个原始数据集中返回交替的示例。

>>> from datasets import interleave_datasets
>>> es_dataset = load_dataset('allenai/c4', 'es', split='train', streaming=True)
>>> fr_dataset = load_dataset('allenai/c4', 'fr', split='train', streaming=True)

>>> multilingual_dataset = interleave_datasets([es_dataset, fr_dataset])
>>> list(multilingual_dataset.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Le sacre de philippe ier, 23 mai 1059 - Compte Rendu...'}]

定义每个原始数据集的采样概率,以更好地控制它们的采样和组合方式。使用 `probabilities` 参数设置您所需的采样概率

>>> multilingual_dataset_with_oversampling = interleave_datasets([es_dataset, fr_dataset], probabilities=[0.8, 0.2], seed=42)
>>> list(multilingual_dataset_with_oversampling.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
 {'text': 'Chevrolet Cavalier Usados en Bogota - Carros en Vent...'}]

最终数据集约 80% 由 `es_dataset` 组成,20% 由 `fr_dataset` 组成。

您还可以指定 `stopping_strategy`。默认策略 `first_exhausted` 是一种子采样策略,即一旦其中一个数据集的样本用尽,数据集构建就会停止。您可以指定 `stopping_strategy=all_exhausted` 来执行过采样策略。在这种情况下,一旦每个数据集中的所有样本都至少添加了一次,数据集构建就会停止。实际上,这意味着如果一个数据集用尽,它将返回到该数据集的开头,直到达到停止条件。请注意,如果未指定采样概率,则新数据集将包含 `max_length_datasets*nb_dataset` 个样本。

重命名、删除和转换

以下方法允许您修改数据集的列。这些方法对于重命名或删除列以及将列更改为一组新的特征很有用。

重命名

当您需要重命名数据集中的列时,请使用 `IterableDataset.rename_column()`。与原始列关联的特征实际上会移动到新的列名下,而不是仅仅替换原始列。

`IterableDataset.rename_column()` 提供原始列名和新列名

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.rename_column("text", "content")

删除

当您需要删除一个或多个列时,向 `IterableDataset.remove_columns()` 提供要删除的列名。通过提供列名列表来删除多个列

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.remove_columns('timestamp')

类型转换

`IterableDataset.cast()` 更改一个或多个列的特征类型。此方法将您的新 `Features` 作为其参数。以下示例代码显示了如何更改 `ClassLabel` 和 `Value` 的特征类型

>>> from datasets import load_dataset
>>> dataset = load_dataset('nyu-mll/glue', 'mrpc', split='train', streaming=True)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['not_equivalent', 'equivalent']),
'idx': Value('int32')}

>>> from datasets import ClassLabel, Value
>>> new_features = dataset.features.copy()
>>> new_features["label"] = ClassLabel(names=['negative', 'positive'])
>>> new_features["idx"] = Value('int64')
>>> dataset = dataset.cast(new_features)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['negative', 'positive']),
'idx': Value('int64')}

类型转换仅在原始特征类型和新特征类型兼容时才有效。例如,如果原始列只包含 1 和 0,您可以将特征类型为 `Value('int32')` 的列转换为 `Value('bool')`。

使用 `IterableDataset.cast_column()` 更改单个列的特征类型。将列名及其新特征类型作为参数传递

>>> dataset.features
{'audio': Audio(sampling_rate=44100, mono=True)}

>>> dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
>>> dataset.features
{'audio': Audio(sampling_rate=16000, mono=True)}

映射

类似于常规 `Dataset``Dataset.map()` 函数,🤗 Datasets 提供了 `IterableDataset.map()` 来处理 `IterableDataset``IterableDataset.map()` 在示例流式传输时即时应用处理。

它允许您将处理函数应用于数据集中的每个示例,无论是独立应用还是批量应用。此函数甚至可以创建新的行和列。

以下示例演示了如何对 `IterableDataset` 进行分词。该函数需要接受并输出一个 `dict`

>>> def add_prefix(example):
...     example['text'] = 'My text: ' + example['text']
...     return example

接下来,使用 `IterableDataset.map()` 将此函数应用于数据集

>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> updated_dataset = dataset.map(add_prefix)
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...',
  'timestamp': '2019-04-25 12:57:54',
  'url': 'https://klyq.com/beginners-bbq-class-taking-place-in-missoula/'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...',
  'timestamp': '2019-04-21 10:07:13',
  'url': 'https://forums.macrumors.com/threads/restore-from-larger-disk-to-smaller-disk.1311329/'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...',
  'timestamp': '2019-04-25 10:40:23',
  'url': 'https://awishcometrue.com/Catalogs/Clearance/Tweens/V1960-Find-A-Way'}]

让我们看看另一个例子,这次您将使用 `IterableDataset.map()` 删除列。当您删除列时,它只在示例提供给映射函数后才删除。这允许映射函数在使用列内容后才删除它们。

`IterableDataset.map()` 中使用 `remove_columns` 参数指定要删除的列

>>> updated_dataset = dataset.map(add_prefix, remove_columns=["timestamp", "url"])
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...'},
 {'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...'},
 {'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...'}]

批量处理

`IterableDataset.map()` 也支持处理批量示例。通过设置 `batched=True` 来批量操作。默认批量大小为 1000,但您可以通过 `batch_size` 参数进行调整。这为许多有趣的应用程序打开了大门,例如分词、将长句拆分为较短的块以及数据增强。

分词

>>> from datasets import load_dataset
>>> from transformers import AutoTokenizer
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
>>> def encode(examples):
...     return tokenizer(examples['text'], truncation=True, padding='max_length')
>>> dataset = dataset.map(encode, batched=True, remove_columns=["text", "timestamp", "url"])
>>> next(iter(dataset))
{'input_ids': [101, 4088, 16912, 22861, 4160, 2465, 2635, 2173, 1999, 3335, ..., 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..., 0, 0]}

请参阅 批量映射处理 文档中的其他批量处理示例。它们对可迭代数据集的工作方式相同。

筛选

您可以使用 `Dataset.filter()` 根据谓词函数过滤数据集中的行。它返回符合指定条件的行

>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', streaming=True, split='train')
>>> start_with_ar = dataset.filter(lambda example: example['text'].startswith('San Francisco'))
>>> next(iter(start_with_ar))
{'text': 'San Francisco 49ers cornerback Shawntae Spencer will miss the rest of the sea...}

如果您设置 `with_indices=True`,`Dataset.filter()` 也可以通过索引进行过滤

>>> even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
>>> list(even_dataset.take(3))
[{'text': 'How AP reported in all formats from tornado-stricken regionsMarch 8, 2012 Whe...},
 {'text': 'Car Wash For Clara! Now is your chance to help! 2 year old Clara Woodward has...},
 {'text': 'Log In Please enter your ECode to log in. Forgotten your eCode? If you create...}]

批量

`batch` 方法将您的 `IterableDataset` 转换为一个批量迭代器。这在您希望在训练循环中使用批量或在使用需要批量输入的框架时特别有用。

在使用 `map` 函数对数据批量应用函数时,也有一个“批量处理”选项,这在上面的映射部分中讨论过。此处描述的 `batch` 方法不同,它提供了从数据集创建批量的更直接方式。

您可以这样使用 `batch` 方法

from datasets import load_dataset

# Load a dataset in streaming mode
dataset = load_dataset("some_dataset", split="train", streaming=True)

# Create batches of 32 samples
batched_dataset = dataset.batch(batch_size=32)

# Iterate over the batched dataset
for batch in batched_dataset:
    print(batch)
    break

在此示例中,`batched_dataset` 仍然是一个 `IterableDataset`,但现在生成的每个项目都是 32 个样本的批次,而不是单个样本。这种批处理在您遍历数据集时即时完成,保留了 `IterableDataset` 的内存高效特性。

`batch` 方法还提供了一个 `drop_last_batch` 参数。当设置为 `True` 时,如果最后一个批次小于指定的 `batch_size`,它将丢弃最后一个批次。这在您的下游处理要求所有批次大小相同的情况下很有用

batched_dataset = dataset.batch(batch_size=32, drop_last_batch=True)

在训练循环中流式传输

`IterableDataset` 可以集成到训练循环中。首先,打乱数据集

Pytorch
隐藏 Pytorch 内容
>>> seed, buffer_size = 42, 10_000
>>> dataset = dataset.shuffle(seed, buffer_size=buffer_size)

最后,创建一个简单的训练循环并开始训练

>>> import torch
>>> from torch.utils.data import DataLoader
>>> from transformers import AutoModelForMaskedLM, DataCollatorForLanguageModeling
>>> from tqdm import tqdm
>>> dataset = dataset.with_format("torch")
>>> dataloader = DataLoader(dataset, collate_fn=DataCollatorForLanguageModeling(tokenizer))
>>> device = 'cuda' if torch.cuda.is_available() else 'cpu' 
>>> model = AutoModelForMaskedLM.from_pretrained("distilbert-base-uncased")
>>> model.train().to(device)
>>> optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
>>> for epoch in range(3):
...     dataset.set_epoch(epoch)
...     for i, batch in enumerate(tqdm(dataloader, total=5)):
...         if i == 5:
...             break
...         batch = {k: v.to(device) for k, v in batch.items()}
...         outputs = model(**batch)
...         loss = outputs[0]
...         loss.backward()
...         optimizer.step()
...         optimizer.zero_grad()
...         if i % 10 == 0:
...             print(f"loss: {loss}")

保存数据集检查点并恢复迭代

如果您的训练循环停止,您可能希望从停止的地方重新开始训练。为此,您可以保存模型和优化器的检查点,以及数据加载器。

可迭代数据集不提供对特定示例索引的随机访问以从中恢复,但您可以使用 `IterableDataset.state_dict()``IterableDataset.load_state_dict()` 从检查点恢复,类似于您对模型和优化器所做的操作

>>> iterable_dataset = Dataset.from_dict({"a": range(6)}).to_iterable_dataset(num_shards=3)
>>> for idx, example in enumerate(iterable_dataset):
...     print(example)
...     if idx == 2:
...         state_dict = iterable_dataset.state_dict()
...         print("checkpoint")
...         break
>>> iterable_dataset.load_state_dict(state_dict)
>>> print(f"restart from checkpoint")
>>> for example in iterable_dataset:
...     print(example)

返回

{'a': 0}
{'a': 1}
{'a': 2}
checkpoint
restart from checkpoint
{'a': 3}
{'a': 4}
{'a': 5}

在底层,可迭代数据集跟踪当前正在读取的分片以及当前分片中的示例索引,并将其信息存储在 `state_dict` 中。

要从检查点恢复,数据集会跳过所有先前读取的分片,从当前分片重新开始。然后它读取分片并跳过示例,直到它到达检查点中的确切示例。

因此,重新启动数据集非常快,因为它不会重新读取已迭代的分片。尽管如此,恢复数据集通常不是即时的,因为它必须从当前分片的开头重新读取并跳过示例,直到到达检查点位置。

这可以与 `torchdata` 中的 `StatefulDataLoader` 一起使用

>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(iterable_dataset, batch_size=32, num_workers=4)
>>> # checkpoint
>>> state_dict = dataloader.state_dict()  # uses iterable_dataset.state_dict() under the hood
>>> # resume from checkpoint
>>> dataloader.load_state_dict(state_dict)  # uses iterable_dataset.load_state_dict() under the hood

恢复会准确返回检查点保存的位置,除非使用了 `shuffle()`:恢复时会丢失 shuffle 缓冲区中的示例,并且缓冲区会重新填充新数据。

保存

一旦您的可迭代数据集准备就绪,您可以将其保存为 Parquet 格式的 Hugging Face 数据集,并稍后通过 `load_dataset()` 重用。

通过提供您希望将其保存到的 Hugging Face 数据集仓库的名称给 `push_to_hub()` 来保存您的数据集。这会迭代数据集并逐步将数据上传到 Hugging Face

dataset.push_to_hub("username/my_dataset")

如果数据集包含多个分片(`dataset.num_shards > 1`),您可以使用多个进程并行上传。如果您应用了 `map()` 或 `filter()` 步骤,这将特别有用,因为它们将更快地并行运行

dataset.push_to_hub("username/my_dataset", num_proc=8)

使用 `load_dataset()` 函数重新加载数据集

from datasets import load_dataset
reloaded_dataset = load_dataset("username/my_dataset")

导出

🤗 Datasets 也支持导出,因此您可以在其他应用程序中使用数据集。下表显示了当前支持导出的文件格式

文件类型 导出方法
CSV IterableDataset.to_csv()
JSON IterableDataset.to_json()
Parquet IterableDataset.to_parquet()
SQL IterableDataset.to_sql()
内存中的 Python 对象 `IterableDataset.to_pandas()`、`IterableDataset.to_polars()` 或 `IterableDataset.to_dict()`

例如,将数据集导出到 CSV 文件,如下所示

>>> dataset.to_csv("path/of/my/dataset.csv")

如果您的数据集很大,您可以按分片保存一个文件,例如

>>> num_shards = dataset.num_shards
>>> for index in range(num_shards):
...     shard = dataset.shard(index, num_shards)
...     shard.to_parquet(f"path/of/my/dataset/data-{index:05d}.parquet")
< > 在 GitHub 上更新