流
数据集流使您无需下载即可使用数据集。数据将在您遍历数据集时进行流式传输。这在以下情况下特别有用
- 您不想等待一个非常大的数据集下载。
- 数据集大小超过了您计算机上可用的磁盘空间。
- 您希望快速浏览数据集的几个样本。
例如,oscar-corpus/OSCAR-2201 数据集的英文拆分大小为 1.2 TB,但您可以使用流式传输立即使用它。通过在 load_dataset() 中设置 streaming=True
来流式传输数据集,如下所示
>>> from datasets import load_dataset
>>> dataset = load_dataset('oscar-corpus/OSCAR-2201', 'en', split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...
数据集流还允许您使用由本地文件组成的数据集,而无需进行任何转换。在这种情况下,数据将在您遍历数据集时从本地文件进行流式传输。这在以下情况下特别有用
- 您不想等待一个非常大的本地数据集转换为 Arrow。
- 转换后的文件大小将超过计算机上可用磁盘空间。
- 您希望快速浏览数据集的几个样本。
例如,您可以流式传输数百个压缩 JSONL 文件的本地数据集,例如 oscar-corpus/OSCAR-2201,以便立即使用它。
>>> from datasets import load_dataset
>>> data_files = {'train': 'path/to/OSCAR-2201/compressed/en_meta/*.jsonl.gz'}
>>> dataset = load_dataset('json', data_files=data_files, split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...
以流式模式加载数据集会创建一个新的数据集类型实例(而不是经典的 Dataset 对象),称为 IterableDataset。这种特殊类型的数据集有自己的一套处理方法,如下所示。
一个 IterableDataset 对于像训练模型这样的迭代作业很有用。您不应该使用 IterableDataset 对于需要随机访问示例的作业,因为您必须使用 for 循环遍历它。获取可迭代数据集中的最后一个示例需要您遍历所有之前的示例。您可以在 Dataset vs. IterableDataset 指南 中找到更多详细信息。
从 Dataset 转换
如果您有一个现有的 Dataset 对象,您可以使用 to_iterable_dataset() 函数将其转换为 IterableDataset。这实际上比在 load_dataset() 中设置 streaming=True
参数更快,因为数据是从本地文件流式传输的。
>>> from datasets import load_dataset
# faster 🐇
>>> dataset = load_dataset("food101")
>>> iterable_dataset = dataset.to_iterable_dataset()
# slower 🐢
>>> iterable_dataset = load_dataset("food101", streaming=True)
当 IterableDataset 实例化时,to_iterable_dataset() 函数支持分片。这在处理大型数据集时非常有用,并且您希望对数据集进行洗牌或使用 PyTorch DataLoader 启用快速并行加载。
>>> import torch
>>> from datasets import load_dataset
>>> dataset = load_dataset("food101")
>>> iterable_dataset = dataset.to_iterable_dataset(num_shards=64) # shard the dataset
>>> iterable_dataset = iterable_dataset.shuffle(buffer_size=10_000) # shuffles the shards order and use a shuffle buffer when you start iterating
dataloader = torch.utils.data.DataLoader(iterable_dataset, num_workers=4) # assigns 64 / 4 = 16 shards from the shuffled list of shards to each worker when you start iterating
洗牌
就像一个普通的 Dataset 对象,您也可以使用 IterableDataset.shuffle() 对 IterableDataset 进行洗牌。
buffer_size
参数控制用于从随机样本中选择的缓冲区的尺寸。假设您的数据集有 100 万个示例,并且您将 buffer_size
设置为一万。IterableDataset.shuffle() 将从缓冲区中的前一万个示例中随机选择示例。缓冲区中选择的示例将被新的示例替换。默认情况下,缓冲区大小为 1,000。
>>> from datasets import load_dataset
>>> dataset = load_dataset('oscar', "unshuffled_deduplicated_en", split='train', streaming=True)
>>> shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10_000)
IterableDataset.shuffle() 还会对分片进行洗牌,如果数据集被分片到多个文件中。
重新洗牌
有时您可能希望在每个 epoch 后重新洗牌数据集。这将要求您为每个 epoch 设置不同的种子。在 epoch 之间使用 IterableDataset.set_epoch()
告诉数据集您处于哪个 epoch。
您的种子实际上变为:初始种子 + 当前 epoch
。
>>> for epoch in range(epochs):
... shuffled_dataset.set_epoch(epoch)
... for example in shuffled_dataset:
... ...
拆分数据集
您可以通过两种方式之一拆分数据集。
- IterableDataset.take() 返回数据集中的前
n
个示例。
>>> dataset = load_dataset('oscar', "unshuffled_deduplicated_en", split='train', streaming=True)
>>> dataset_head = dataset.take(2)
>>> list(dataset_head)
[{'id': 0, 'text': 'Mtendere Village was...'}, {'id': 1, 'text': 'Lily James cannot fight the music...'}]
- IterableDataset.skip() 省略数据集中的前
n
个示例并返回剩余的示例。
>>> train_dataset = shuffled_dataset.skip(1000)
take
和 skip
阻止对 shuffle
的未来调用,因为它们锁定分片的顺序。您应该在拆分数据集之前对其进行洗牌。
交织
interleave_datasets() 可以将 IterableDataset 与其他数据集组合。组合后的数据集从每个原始数据集返回交替的示例。
>>> from datasets import interleave_datasets
>>> en_dataset = load_dataset('oscar', "unshuffled_deduplicated_en", split='train', streaming=True, trust_remote_code=True)
>>> fr_dataset = load_dataset('oscar', "unshuffled_deduplicated_fr", split='train', streaming=True, trust_remote_code=True)
>>> multilingual_dataset = interleave_datasets([en_dataset, fr_dataset])
>>> list(multilingual_dataset.take(2))
[{'text': 'Mtendere Village was inspired by the vision...'}, {'text': "Média de débat d'idées, de culture et de littérature..."}]
定义来自每个原始数据集的采样概率,以更好地控制每个数据集的采样和组合方式。使用您想要的采样概率设置 probabilities
参数。
>>> multilingual_dataset_with_oversampling = interleave_datasets([en_dataset, fr_dataset], probabilities=[0.8, 0.2], seed=42)
>>> list(multilingual_dataset_with_oversampling.take(2))
[{'text': 'Mtendere Village was inspired by the vision...'}, {'text': 'Lily James cannot fight the music...'}]
最终数据集约 80% 由 en_dataset
组成,20% 由 fr_dataset
组成。
您还可以指定 stopping_strategy
。默认策略 first_exhausted
是一种二次采样策略,即数据集构造一旦某个数据集样本耗尽就停止。您可以指定 stopping_strategy=all_exhausted
来执行过采样策略。在这种情况下,数据集构造一旦每个数据集中的每个样本至少被添加一次就停止。实际上,这意味着如果一个数据集被耗尽,它将返回到该数据集的开头,直到停止条件被满足。请注意,如果没有指定采样概率,新数据集将有 max_length_datasets*nb_dataset 个样本
。
重命名、删除和转换
以下方法允许您修改数据集的列。这些方法对于重命名或删除列以及将列更改为一组新的特征很有用。
重命名
当您需要重命名数据集中的列时,请使用 IterableDataset.rename_column()。与原始列关联的特征实际上会移动到新的列名下,而不是仅就地替换原始列。
为 IterableDataset.rename_column() 提供原始列的名称和新的列名。
>>> from datasets import load_dataset
>>> dataset = load_dataset('mc4', 'en', streaming=True, split='train', trust_remote_code=True)
>>> dataset = dataset.rename_column("text", "content")
删除
当您需要删除一个或多个列时,请为 IterableDataset.remove_columns() 提供要删除的列的名称。通过提供列名列表来删除多个列。
>>> from datasets import load_dataset
>>> dataset = load_dataset('mc4', 'en', streaming=True, split='train', trust_remote_code=True)
>>> dataset = dataset.remove_columns('timestamp')
转换
IterableDataset.cast() 更改一个或多个列的特征类型。此方法以新的 Features
作为参数。以下示例代码展示了如何更改 ClassLabel
和 Value
的特征类型
>>> from datasets import load_dataset
>>> dataset = load_dataset('glue', 'mrpc', split='train', streaming=True)
>>> dataset.features
{'sentence1': Value(dtype='string', id=None),
'sentence2': Value(dtype='string', id=None),
'label': ClassLabel(num_classes=2, names=['not_equivalent', 'equivalent'], names_file=None, id=None),
'idx': Value(dtype='int32', id=None)}
>>> from datasets import ClassLabel, Value
>>> new_features = dataset.features.copy()
>>> new_features["label"] = ClassLabel(names=['negative', 'positive'])
>>> new_features["idx"] = Value('int64')
>>> dataset = dataset.cast(new_features)
>>> dataset.features
{'sentence1': Value(dtype='string', id=None),
'sentence2': Value(dtype='string', id=None),
'label': ClassLabel(num_classes=2, names=['negative', 'positive'], names_file=None, id=None),
'idx': Value(dtype='int64', id=None)}
转换仅在原始特征类型和新特征类型兼容的情况下才有效。例如,如果原始列仅包含 1 和 0,则可以将特征类型为 Value('int32')
的列转换为 Value('bool')
。
使用 IterableDataset.cast_column() 更改单个列的特征类型。将列名及其新特征类型作为参数传递。
>>> dataset.features
{'audio': Audio(sampling_rate=44100, mono=True, id=None)}
>>> dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
>>> dataset.features
{'audio': Audio(sampling_rate=16000, mono=True, id=None)}
映射
类似于 Dataset.map() 函数,用于处理一个常规 Dataset,🤗 Datasets 为处理 IterableDataset 提供了 IterableDataset.map()。 IterableDataset.map() 在流式传输示例时即时应用处理。
它允许您将处理函数应用于数据集中的每个示例,独立地或以批次的形式。此函数甚至可以创建新行和新列。
以下示例演示了如何对 IterableDataset 进行标记化。该函数需要接受并输出一个 dict
。
>>> def add_prefix(example):
... example['text'] = 'My text: ' + example['text']
... return example
接下来,使用 IterableDataset.map() 将此函数应用于数据集。
>>> from datasets import load_dataset
>>> dataset = load_dataset('oscar', 'unshuffled_deduplicated_en', streaming=True, split='train', trust_remote_code=True)
>>> updated_dataset = dataset.map(add_prefix)
>>> list(updated_dataset.take(3))
[{'id': 0, 'text': 'My text: Mtendere Village was inspired by...'},
{'id': 1, 'text': 'My text: Lily James cannot fight the music...'},
{'id': 2, 'text': 'My text: "I\'d love to help kickstart...'}]
让我们看看另一个示例,除了这次,您将使用 IterableDataset.map() 删除一列。当您删除一列时,只有在将示例提供给映射函数后才会将其删除。这使得映射函数可以在删除列之前使用列内容。
在 IterableDataset.map() 中使用 remove_columns
参数指定要删除的列。
>>> updated_dataset = dataset.map(add_prefix, remove_columns=["id"])
>>> list(updated_dataset.take(3))
[{'text': 'My text: Mtendere Village was inspired by...'},
{'text': 'My text: Lily James cannot fight the music...'},
{'text': 'My text: "I\'d love to help kickstart...'}]
批处理
IterableDataset.map() 也支持处理批次示例。通过设置 batched=True
来对批次进行操作。默认批次大小为 1000,但您可以使用 batch_size
参数进行调整。这为许多有趣的应用打开了大门,例如标记化、将长句子拆分为更短的块以及数据增强。
标记化
>>> from datasets import load_dataset
>>> from transformers import AutoTokenizer
>>> dataset = load_dataset("mc4", "en", streaming=True, split="train", trust_remote_code=True)
>>> tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
>>> def encode(examples):
... return tokenizer(examples['text'], truncation=True, padding='max_length')
>>> dataset = dataset.map(encode, batched=True, remove_columns=["text", "timestamp", "url"])
>>> next(iter(dataset))
{'input_ids': [101, 8466, 1018, 1010, 4029, 2475, 2062, 18558, 3100, 2061, ...,1106, 3739, 102],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..., 1, 1]}
在 批处理映射处理 文档中查看批处理的其他示例。它们对可迭代数据集也一样有效。
过滤
您可以使用 Dataset.filter() 基于谓词函数过滤数据集中的行。它返回匹配指定条件的行。
>>> from datasets import load_dataset
>>> dataset = load_dataset('oscar', 'unshuffled_deduplicated_en', streaming=True, split='train', trust_remote_code=True)
>>> start_with_ar = dataset.filter(lambda example: example['text'].startswith('Ar'))
>>> next(iter(start_with_ar))
{'id': 4, 'text': 'Are you looking for Number the Stars (Essential Modern Classics)?...'}
如果您设置 with_indices=True
,Dataset.filter() 还可以通过索引进行过滤。
>>> even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
>>> list(even_dataset.take(3))
[{'id': 0, 'text': 'Mtendere Village was inspired by the vision of Chief Napoleon Dzombe, ...'},
{'id': 2, 'text': '"I\'d love to help kickstart continued development! And 0 EUR/month...'},
{'id': 4, 'text': 'Are you looking for Number the Stars (Essential Modern Classics)? Normally, ...'}]
批次
batch
方法将您的 IterableDataset
转换为批次的可迭代对象。当您希望在训练循环中使用批次或使用期望批次输入的框架时,这特别有用。
使用 map
函数对数据批次应用函数时,也有一个“批处理”选项,如上面 映射部分 中所述。此处描述的 batch
方法不同,它提供了一种更直接的方法来从您的数据集中创建批次。
您可以像这样使用 batch
方法
from datasets import load_dataset
# Load a dataset in streaming mode
dataset = load_dataset("some_dataset", split="train", streaming=True)
# Create batches of 32 samples
batched_dataset = dataset.batch(batch_size=32)
# Iterate over the batched dataset
for batch in batched_dataset:
print(batch)
break
在这个例子中,batched_dataset
仍然是一个 IterableDataset,但现在每个生成的项都是由 32 个样本组成的批次,而不是单个样本。这种批处理是在您遍历数据集时即时完成的,保留了 IterableDataset 的内存效率。
batch
方法还提供了一个 drop_last_batch
参数。当设置为 True 时,如果最后一个批次小于指定的 batch_size
,它将丢弃最后一个批次。这在您的下游处理要求所有批次大小相同的情况下很有用。
batched_dataset = dataset.batch(batch_size=32, drop_last_batch=True)
在训练循环中流式传输
IterableDataset 可以集成到训练循环中。首先,对数据集进行洗牌。
>>> seed, buffer_size = 42, 10_000
>>> dataset = dataset.shuffle(seed, buffer_size=buffer_size)
最后,创建一个简单的训练循环并开始训练。
>>> import torch
>>> from torch.utils.data import DataLoader
>>> from transformers import AutoModelForMaskedLM, DataCollatorForLanguageModeling
>>> from tqdm import tqdm
>>> dataset = dataset.with_format("torch")
>>> dataloader = DataLoader(dataset, collate_fn=DataCollatorForLanguageModeling(tokenizer))
>>> device = 'cuda' if torch.cuda.is_available() else 'cpu'
>>> model = AutoModelForMaskedLM.from_pretrained("distilbert-base-uncased")
>>> model.train().to(device)
>>> optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
>>> for epoch in range(3):
... dataset.set_epoch(epoch)
... for i, batch in enumerate(tqdm(dataloader, total=5)):
... if i == 5:
... break
... batch = {k: v.to(device) for k, v in batch.items()}
... outputs = model(**batch)
... loss = outputs[0]
... loss.backward()
... optimizer.step()
... optimizer.zero_grad()
... if i % 10 == 0:
... print(f"loss: {loss}")
保存数据集检查点并恢复迭代
如果您的训练循环停止,您可能希望从停止的地方重新开始训练。为此,您可以保存模型和优化器的检查点,以及数据加载器。
Iterable 数据集不提供对特定示例索引的随机访问以从中恢复,但您可以使用 IterableDataset.state_dict() 和 IterableDataset.load_state_dict() 从检查点恢复,类似于您对模型和优化器可以做的事情。
>>> iterable_dataset = Dataset.from_dict({"a": range(6)}).to_iterable_dataset(num_shards=3)
>>> for idx, example in enumerate(iterable_dataset):
... print(example)
... if idx == 2:
... state_dict = iterable_dataset.state_dict()
... print("checkpoint")
... break
>>> iterable_dataset.load_state_dict(state_dict)
>>> print(f"restart from checkpoint")
>>> for example in iterable_dataset:
... print(example)
返回
{'a': 0}
{'a': 1}
{'a': 2}
checkpoint
restart from checkpoint
{'a': 3}
{'a': 4}
{'a': 5}
在内部,可迭代数据集会跟踪当前读取的分片以及当前分片中的示例索引,并将此信息存储在 state_dict
中。
为了从检查点恢复,数据集会跳过所有之前读取的分片,以从当前分片重新开始。然后,它会读取分片并跳过示例,直到它到达检查点中的确切示例。
因此,重新启动数据集速度非常快,因为它不会重新读取已经迭代过得分片。但是,重新启动数据集通常不会立即完成,因为它需要从当前分片的开头重新开始读取并跳过示例,直到它到达检查点位置。
这可以与 torchdata
中的 StatefulDataLoader
一起使用。
>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(iterable_dataset, batch_size=32, num_workers=4)
>>> # checkpoint
>>> state_dict = dataloader.state_dict() # uses iterable_dataset.state_dict() under the hood
>>> # resume from checkpoint
>>> dataloader.load_state_dict(state_dict) # uses iterable_dataset.load_state_dict() under the hood
恢复会返回检查点保存的确切位置,除非使用 .shuffle()
:从洗牌缓冲区中的示例在恢复时会丢失,并且缓冲区会用新数据重新填充。