与 PyTorch 结合使用
本文档简要介绍了如何将 datasets
与 PyTorch 结合使用,特别关注如何从数据集获取 torch.Tensor
对象,以及如何使用 PyTorch DataLoader
和 Hugging Face Dataset
获得最佳性能。
数据集格式
默认情况下,数据集返回普通的 Python 对象:整数、浮点数、字符串、列表等。
要获取 PyTorch 张量,可以使用 Dataset.with_format() 将数据集的格式设置为 pytorch
。
>>> from datasets import Dataset
>>> data = [[1, 2],[3, 4]]
>>> ds = Dataset.from_dict({"data": data})
>>> ds = ds.with_format("torch")
>>> ds[0]
{'data': tensor([1, 2])}
>>> ds[:2]
{'data': tensor([[1, 2],
[3, 4]])}
Dataset 对象是 Arrow 表的包装器,它允许快速将数据集中的数组零拷贝读取到 PyTorch 张量中。
要在 GPU 上加载数据作为张量,请指定 device
参数
>>> import torch
>>> device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
>>> ds = ds.with_format("torch", device=device)
>>> ds[0]
{'data': tensor([1, 2], device='cuda:0')}
N 维数组
如果您的数据集包含 N 维数组,您会发现默认情况下,如果形状固定,则它们被认为是相同的张量
>>> from datasets import Dataset
>>> data = [[[1, 2],[3, 4]],[[5, 6],[7, 8]]] # fixed shape
>>> ds = Dataset.from_dict({"data": data})
>>> ds = ds.with_format("torch")
>>> ds[0]
{'data': tensor([[1, 2],
[3, 4]])}
>>> from datasets import Dataset
>>> data = [[[1, 2],[3]],[[4, 5, 6],[7, 8]]] # varying shape
>>> ds = Dataset.from_dict({"data": data})
>>> ds = ds.with_format("torch")
>>> ds[0]
{'data': [tensor([1, 2]), tensor([3])]}
但是,这种逻辑通常需要缓慢的形状比较和数据复制。为了避免这种情况,您必须显式使用 Array
特征类型并指定张量的形状
>>> from datasets import Dataset, Features, Array2D
>>> data = [[[1, 2],[3, 4]],[[5, 6],[7, 8]]]
>>> features = Features({"data": Array2D(shape=(2, 2), dtype='int32')})
>>> ds = Dataset.from_dict({"data": data}, features=features)
>>> ds = ds.with_format("torch")
>>> ds[0]
{'data': tensor([[1, 2],
[3, 4]])}
>>> ds[:2]
{'data': tensor([[[1, 2],
[3, 4]],
[[5, 6],
[7, 8]]])}
其他特征类型
ClassLabel 数据将被正确转换为张量
>>> from datasets import Dataset, Features, ClassLabel
>>> labels = [0, 0, 1]
>>> features = Features({"label": ClassLabel(names=["negative", "positive"])})
>>> ds = Dataset.from_dict({"label": labels}, features=features)
>>> ds = ds.with_format("torch")
>>> ds[:3]
{'label': tensor([0, 0, 1])}
字符串和二进制对象保持不变,因为 PyTorch 仅支持数字。
要使用 Image 特征类型,您需要安装 vision
扩展,例如 pip install datasets[vision]
。
>>> from datasets import Dataset, Features, Audio, Image
>>> images = ["path/to/image.png"] * 10
>>> features = Features({"image": Image()})
>>> ds = Dataset.from_dict({"image": images}, features=features)
>>> ds = ds.with_format("torch")
>>> ds[0]["image"].shape
torch.Size([512, 512, 4])
>>> ds[0]
{'image': tensor([[[255, 215, 106, 255],
[255, 215, 106, 255],
...,
[255, 255, 255, 255],
[255, 255, 255, 255]]], dtype=torch.uint8)}
>>> ds[:2]["image"].shape
torch.Size([2, 512, 512, 4])
>>> ds[:2]
{'image': tensor([[[[255, 215, 106, 255],
[255, 215, 106, 255],
...,
[255, 255, 255, 255],
[255, 255, 255, 255]]]], dtype=torch.uint8)}
要使用 Audio 特征类型,您需要安装 audio
扩展,例如 pip install datasets[audio]
。
>>> from datasets import Dataset, Features, Audio, Image
>>> audio = ["path/to/audio.wav"] * 10
>>> features = Features({"audio": Audio()})
>>> ds = Dataset.from_dict({"audio": audio}, features=features)
>>> ds = ds.with_format("torch")
>>> ds[0]["audio"]["array"]
tensor([ 6.1035e-05, 1.5259e-05, 1.6785e-04, ..., -1.5259e-05,
-1.5259e-05, 1.5259e-05])
>>> ds[0]["audio"]["sampling_rate"]
tensor(44100)
数据加载
与 torch.utils.data.Dataset
对象类似,Dataset 可以直接传递给 PyTorch DataLoader
>>> import numpy as np
>>> from datasets import Dataset
>>> from torch.utils.data import DataLoader
>>> data = np.random.rand(16)
>>> label = np.random.randint(0, 2, size=16)
>>> ds = Dataset.from_dict({"data": data, "label": label}).with_format("torch")
>>> dataloader = DataLoader(ds, batch_size=4)
>>> for batch in dataloader:
... print(batch)
{'data': tensor([0.0047, 0.4979, 0.6726, 0.8105]), 'label': tensor([0, 1, 0, 1])}
{'data': tensor([0.4832, 0.2723, 0.4259, 0.2224]), 'label': tensor([0, 0, 0, 0])}
{'data': tensor([0.5837, 0.3444, 0.4658, 0.6417]), 'label': tensor([0, 1, 0, 0])}
{'data': tensor([0.7022, 0.1225, 0.7228, 0.8259]), 'label': tensor([1, 1, 1, 1])}
优化数据加载
您可以通过多种方式提高数据加载速度,这可以为您节省时间,尤其是在处理大型数据集时。PyTorch 提供了并行数据加载、检索批次索引而不是单个索引以及流式传输以在不下载到磁盘的情况下遍历数据集。
使用多个工作进程
您可以使用 PyTorch DataLoader
的 num_workers
参数并行化数据加载,并获得更高的吞吐量。
在幕后,DataLoader
启动 num_workers
个进程。每个进程重新加载传递给 DataLoader
的数据集,并用于查询示例。在工作进程中重新加载数据集不会占用您的 RAM,因为它只是从您的磁盘再次内存映射数据集。
>>> import numpy as np
>>> from datasets import Dataset, load_from_disk
>>> from torch.utils.data import DataLoader
>>> data = np.random.rand(10_000)
>>> Dataset.from_dict({"data": data}).save_to_disk("my_dataset")
>>> ds = load_from_disk("my_dataset").with_format("torch")
>>> dataloader = DataLoader(ds, batch_size=32, num_workers=4)
流式传输数据
通过将数据集加载为 IterableDataset 来流式传输数据集。这使您能够逐步遍历远程数据集,而无需将其下载到磁盘或遍历本地数据文件。在 选择常规数据集还是可迭代数据集 指南中详细了解哪种数据集最适合您的用例。
datasets
中的可迭代数据集继承自 torch.utils.data.IterableDataset
,因此您可以将其传递给 torch.utils.data.DataLoader
>>> import numpy as np
>>> from datasets import Dataset, load_dataset
>>> from torch.utils.data import DataLoader
>>> data = np.random.rand(10_000)
>>> Dataset.from_dict({"data": data}).push_to_hub("<username>/my_dataset") # Upload to the Hugging Face Hub
>>> my_iterable_dataset = load_dataset("<username>/my_dataset", streaming=True, split="train")
>>> dataloader = DataLoader(my_iterable_dataset, batch_size=32)
如果数据集被拆分成多个分片(即如果数据集由多个数据文件组成),那么您可以使用 num_workers
并行流式传输
>>> my_iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> my_iterable_dataset.n_shards
39
>>> dataloader = DataLoader(my_iterable_dataset, batch_size=32, num_workers=4)
在这种情况下,每个工作进程都会获得要从中流式传输的分片列表的子集。
检查点和恢复
如果您需要可以在训练中途进行检查点和恢复的 DataLoader,可以使用 torchdata 中的 StatefulDataLoader
>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> my_iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(my_iterable_dataset, batch_size=32, num_workers=4)
>>> # save in the middle of training
>>> state_dict = dataloader.state_dict()
>>> # and resume later
>>> dataloader.load_state_dict(state_dict)
这得益于 IterableDataset.state_dict() 和 IterableDataset.load_state_dict()。
分布式
要将您的数据集拆分到您的训练节点上,可以使用 datasets.distributed.split_dataset_by_node()
import os
from datasets.distributed import split_dataset_by_node
ds = split_dataset_by_node(ds, rank=int(os.environ["RANK"]), world_size=int(os.environ["WORLD_SIZE"]))
这适用于映射风格数据集和可迭代数据集。数据集针对大小为 world_size
的节点池中排名为 rank
的节点进行拆分。
对于映射风格数据集
每个节点都被分配了一块数据,例如排名为 0 的节点被分配了数据集的第一块。
对于可迭代数据集
如果数据集的分片数量是world_size
的倍数(即dataset.n_shards % world_size == 0
),那么这些分片将被均匀地分配到各个节点,这是最优化的方式。否则,每个节点会从world_size
中获取一个示例,并跳过其他示例。
如果希望每个节点使用多个工作进程来加载数据,也可以将此与torch.utils.data.DataLoader
结合使用。