人工智能:Pytorch DataLoader

PyTorch 数据加载实用程序的核心是 torch.utils.data.DataLoader 类。它表示数据集上的 Python 可迭代对象,支持:映射式和可迭代式数据集;自定义数据加载顺序;自动批处理;单进程和多进程数据加载;自动内存固定。这些选项由 DataLoader 的构造函数参数配置,其签名为:

1
2
3
4
5
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,
batch_sampler=None, num_workers=0, collate_fn=None,
pin_memory=False, drop_last=False, timeout=0,
worker_init_fn=None, *, prefetch_factor=2,
persistent_workers=False)

本文将详细介绍了这些选项的效果和用法。内容参考 Pytorch 官方文档:https://pytorch.org/docs/stable/data.html#module-torch.utils.data

数据集类型

DataLoader 构造函数最重要的参数是 dataset,它指示要从中加载数据的 “映射式数据集” 与 “可迭代式数据集”。

映射式数据集

映射式数据集是实现了 __getitem__()__len__() 协议的数据集,它表示从(可能是非整数)索引/键到数据样本的映射。

例如,当使用 dataset[idx] 访问此类数据集时,它可以从磁盘上的文件夹读取第 idx 个图像及其相应的标签文件。有关更多详细信息,请参阅 Dataset

可迭代式数据集

可迭代式数据集是 IterableDataset 子类的实例,它实现了 __iter__() 协议,

例如,当调用 iter(dataset) 时,此类数据集可以返回从数据库、远程服务器读取的数据流,甚至是实时生成的日志。有关更多详细信息,请参阅 IterableDataset

当使用多进程数据加载的 IterableDataset 时。在每个工作进程上复制相同的数据集对象,因此必须对副本进行不同的配置,以避免重复数据。请参阅 IterableDataset 文档了解如何实现这一点。

数据加载顺序和 Sampler

对于 可迭代式数据集,数据加载顺序完全由用户定义的可迭代对象控制。

本节的其余部分涉及 映射式数据集 的情况。

基于shuffle参数,DataLoader会自动构建顺序或随机采样器。 另外,用户可以使用sampler参数指定一个自定义的Sampler对象,该对象每次都会生成下一个要获取的索引/键。

一个自定义的Sampler,每次生成一个批次索引列表,可以作为batch_sampler参数传递。 也可以通过batch_size和drop_last参数启用自动批处理。 有关详细信息,请参阅下一节。

注意

sampler和batch_sampler都不兼容可迭代数据集,因为此类数据集没有键或索引的概念。

加载批处理和非批处理数据

DataLoader支持通过参数batch_size、drop_last、batch_sampler和collate_fn(具有默认函数)自动将获取的单个数据样本整理成批次。

自动批处理(默认)

这是最常见的情况,对应于获取一个数据小批量并将其整理成批处理样本,即包含一个维度为批次维度的张量(通常是第一个)。

当batch_size(默认1)不为None时,数据加载器会生成批处理样本,而不是单个样本。 batch_size和drop_last参数用于指定数据加载器如何获取数据集键的批次。 对于映射式数据集,用户可以另外指定batch_sampler,它每次生成一个键列表。

注意

batch_size和drop_last参数本质上用于从sampler构造batch_sampler。 对于映射式数据集,sampler由用户提供或根据shuffle参数构建。 对于可迭代数据集,sampler是一个虚拟的无限采样器。 有关采样器的更多详细信息,请参阅此节。

注意

当使用多进程从可迭代数据集中获取数据时,drop_last参数会删除每个工作程序的数据集副本的最后一个非满批次。

使用采样器中的索引获取样本列表后,作为collate_fn参数传递的函数用于将样本列表整理成批次。

在这种情况下,从映射式数据集加载大致等同于

1
2
for indices in batch_sampler:
yield collate_fn([dataset[i] for i in indices])

从可迭代数据集加载大致等同于

1
2
3
dataset_iter = iter(dataset)
for indices in batch_sampler:
yield collate_fn([next(dataset_iter) for _ in indices])

可以使用自定义的collate_fn来自定义整理,例如,将顺序数据填充到批次的最大长度。 有关collate_fn的更多信息,请参阅此节。

禁用自动批处理

在某些情况下,用户可能希望在数据集代码中手动处理批处理,或者只是加载单个样本。 例如,直接加载批处理数据可能更便宜(例如,从数据库中批量读取或读取连续的内存块),或者批次大小取决于数据,或者程序被设计为处理单个样本。 在这些情况下,最好不要使用自动批处理(其中collate_fn用于整理样本),而是让数据加载器直接返回dataset对象的每个成员。

当batch_size和batch_sampler都为None(batch_sampler的默认值为None)时,自动批处理将被禁用。 从dataset中获取的每个样本都会使用作为collate_fn参数传递的函数进行处理。

禁用自动批处理时,默认的collate_fn只会将NumPy数组转换为PyTorch张量,其他一切保持不变。

在这种情况下,从映射式数据集加载大致等同于

1
2
for index in sampler:
yield collate_fn(dataset[index])

从可迭代数据集加载大致等同于

1
2
for data in iter(dataset):
yield collate_fn(data)

有关collate_fn的更多信息,请参阅此节。

使用collate_fn

启用或禁用自动批处理时,collate_fn的使用略有不同。

禁用自动批处理时,collate_fn会针对每个单独的数据样本进行调用,并从数据加载器迭代器中生成输出。 在这种情况下,默认的collate_fn只会将NumPy数组转换为PyTorch张量。

启用自动批处理时,collate_fn会每次针对一个数据样本列表进行调用。 它应该将输入样本整理成一个批次,以便从数据加载器迭代器中生成。 本节的其余部分描述了默认collate_fn(default_collate())的行为。

例如,如果每个数据样本包含一个 3 通道图像和一个整型类标签,即数据集的每个元素返回一个元组(image, class_index),则默认的collate_fn会将此类元组列表整理成一个包含批处理图像张量和批处理类标签张量的单个元组。 特别是,默认的collate_fn具有以下属性

它始终将一个新的维度作为批处理维度前置。

它会自动将NumPy数组和Python数值转换为PyTorch张量。

它会保留数据结构,例如,如果每个样本都是一个字典,它会输出一个具有相同键集但批处理张量作为值的字典(如果值不能转换为张量,则为列表)。 list、tuple、namedtuple等也是如此。

用户可以使用自定义的collate_fn来实现自定义批处理,例如,沿除第一个维度以外的维度整理,填充不同长度的序列,或添加对自定义数据类型的支持。

如果您遇到DataLoader的输出具有与预期不同的维度或类型的情况,您可能需要检查您的collate_fn。

单进程和多进程数据加载

DataLoader默认使用单进程数据加载。

在 Python 进程中,全局解释器锁 (GIL)阻止了跨线程真正地完全并行化 Python 代码。 为了避免数据加载阻塞计算代码,PyTorch 提供了一个简单的开关来执行多进程数据加载,只需将参数num_workers设置为正整数即可。

单进程数据加载(默认)

在此模式下,数据获取在DataLoader初始化的同一个进程中完成。 因此,数据加载可能会阻塞计算。 但是,当用于在进程之间共享数据的资源(例如,共享内存、文件描述符)有限,或者当整个数据集很小且可以完全加载到内存中时,此模式可能更受欢迎。 此外,单进程加载通常显示更易读的错误跟踪,因此对调试很有用。

多进程数据加载

将参数num_workers设置为正整数将打开使用指定数量的加载器工作程序进程的多进程数据加载。

警告

经过多次迭代后,加载器工作进程将消耗与父进程相同的 CPU 内存,用于父进程中所有从工作进程访问的 Python 对象。如果数据集包含大量数据(例如,您在数据集构建时加载了非常大的文件名列表)和/或您使用了很多工作进程(总内存使用量为 number of workers * size of parent process),这可能会带来问题。最简单的解决方法是用非引用计数表示替换 Python 对象,例如 Pandas、Numpy 或 PyArrow 对象。查看 问题 #13246 了解更多关于为什么会出现这种情况以及如何解决这些问题的示例代码。

在此模式下,每次创建 DataLoader 的迭代器(例如,当您调用 enumerate(dataloader) 时),都会创建 num_workers 个工作进程。此时,dataset、collate_fn 和 worker_init_fn 会传递给每个工作进程,并在其中用于初始化和获取数据。这意味着数据集访问及其内部 IO、转换(包括 collate_fn)将在工作进程中运行。

torch.utils.data.get_worker_info() 在工作进程中返回各种有用的信息(包括工作进程 ID、数据集副本、初始种子等),并在主进程中返回 None。用户可以在数据集代码和/或 worker_init_fn 中使用此函数来单独配置每个数据集副本,并确定代码是否在工作进程中运行。例如,这在对数据集进行分片时尤其有用。

对于映射式数据集,主进程使用 sampler 生成索引,并将其发送给工作进程。因此,任何洗牌随机化操作都在主进程中完成,该操作通过分配要加载的索引来指导加载。

对于可迭代式数据集,由于每个工作进程都获得了 dataset 对象的副本,因此简单的多进程加载通常会导致数据重复。使用 torch.utils.data.get_worker_info() 和/或 worker_init_fn,用户可以独立地配置每个副本。(请参阅 IterableDataset 文档了解如何实现这一点。)出于类似原因,在多进程加载中,drop_last 参数将丢弃每个工作进程的可迭代式数据集副本的最后一个非完整批次。

当迭代结束时或当迭代器被垃圾回收时,工作进程将关闭。

警告

一般不建议在多进程加载中返回 CUDA 张量,因为在多进程中使用 CUDA 和共享 CUDA 张量时存在许多细微之处(请参阅 多进程中的 CUDA)。相反,我们建议使用 自动内存锁定(即,设置 pin_memory=True),它可以实现快速将数据传输到支持 CUDA 的 GPU。

特定于平台的行为
由于工作进程依赖于 Python multiprocessing,因此工作进程启动行为在 Windows 上与 Unix 上不同。

在 Unix 上,fork() 是默认的 multiprocessing 启动方法。使用 fork(),子工作进程通常可以通过克隆的地址空间直接访问 dataset 和 Python 参数函数。

在 Windows 或 MacOS 上,spawn() 是默认的 multiprocessing 启动方法。使用 spawn(),会启动另一个解释器来运行您的主脚本,然后是接收 dataset、collate_fn 和其他参数的内部工作进程函数,这些参数通过 pickle 序列化。

这种单独的序列化意味着您应该采取两个步骤来确保在使用多进程数据加载时与 Windows 兼容。

将您的大部分主脚本代码包装在 if name == ‘main‘: 块中,以确保它在每个工作进程启动时不会再次运行(很可能生成错误)。您可以将您的数据集和 DataLoader 实例创建逻辑放在这里,因为它不需要在工作进程中重新执行。

确保任何自定义 collate_fn、worker_init_fn 或 dataset 代码都声明为顶层定义,位于 main 检查之外。这确保它们在工作进程中可用。(这是必要的,因为函数仅以引用形式被腌制,而不是 bytecode。)

多进程数据加载中的随机性
默认情况下,每个工作进程的 PyTorch 种子将设置为 base_seed + worker_id,其中 base_seed 是主进程使用其 RNG 生成的长整型数(因此,强制性地消耗 RNG 状态)或指定的 generator。但是,在初始化工作进程时,其他库的种子可能会重复,导致每个工作进程返回相同的随机数。(请参阅常见问题解答中的 此部分。)

在 worker_init_fn 中,您可以使用 torch.utils.data.get_worker_info().seed 或 torch.initial_seed() 访问为每个工作进程设置的 PyTorch 种子,并在数据加载之前使用它来设置其他库的种子。

内存锁定

当主机到 GPU 的复制来自锁定页面(页面锁定)内存时,速度会快得多。请参阅 使用锁定页面内存缓冲区,了解何时以及如何一般性地使用锁定页面内存。

对于数据加载,将 pin_memory=True 传递给 DataLoader 将自动将获取的数据张量放入锁定页面内存中,从而可以更快地将数据传输到支持 CUDA 的 GPU。

默认内存锁定逻辑仅识别张量以及包含张量的映射和可迭代对象。默认情况下,如果锁定页面逻辑看到一个自定义类型(如果您的 collate_fn 返回自定义批次类型,就会发生这种情况),或者如果批次的每个元素都是自定义类型,则锁定页面逻辑将无法识别它们,并且它将返回该批次(或这些元素)而不锁定页面内存。要为自定义批次或数据类型启用内存锁定,请在自定义类型上定义 pin_memory() 方法。

请参阅下面的示例。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class SimpleCustomBatch:
def __init__(self, data):
transposed_data = list(zip(*data))
self.inp = torch.stack(transposed_data[0], 0)
self.tgt = torch.stack(transposed_data[1], 0)

# custom memory pinning method on custom type
def pin_memory(self):
self.inp = self.inp.pin_memory()
self.tgt = self.tgt.pin_memory()
return self

def collate_wrapper(batch):
return SimpleCustomBatch(batch)

inps = torch.arange(10 * 5, dtype=torch.float32).view(10, 5)
tgts = torch.arange(10 * 5, dtype=torch.float32).view(10, 5)
dataset = TensorDataset(inps, tgts)

loader = DataLoader(dataset, batch_size=2, collate_fn=collate_wrapper,
pin_memory=True)

for batch_ndx, sample in enumerate(loader):
print(sample.inp.is_pinned())
print(sample.tgt.is_pinned())
1
Class torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=None, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, *, prefetch_factor=None, persistent_workers=False, pin_memory_device='')[source]

数据加载器将数据集和采样器组合在一起,并提供一个遍历给定数据集的可迭代对象。

DataLoader 支持映射风格和可迭代风格的数据集,包含单进程或多进程加载、自定义加载顺序以及可选的自动批处理(整理)和内存固定。

更多细节请参见 torch.utils.data 文档页面。

参数
** dataset (Dataset) – 要加载数据的 dataset。
** batch_size (int, optional) – 每次加载多少个样本(默认:1)。
** shuffle (bool, optional) – 设置为 True 则在每个 epoch 时重新洗牌数据(默认:False)。
** sampler (Sampler or Iterable, optional) – 定义从 dataset 中抽取样本的策略。可以是任何实现了 len 的 Iterable。如果指定,则不能指定 shuffle。
** batch_sampler (Sampler or Iterable, optional) – 与 sampler 相似,但一次返回一批索引。与 batch_size、shuffle、sampler 和 drop_last 互斥。
** num_workers (int, optional) – 用于数据加载的子进程数量。0 表示数据将在主进程中加载。(默认:0)
** collate_fn (Callable, optional) – 将样本列表合并成一个包含张量(Tensor)的小批量。在使用映射风格数据集的批量加载时使用。
** pin_memory (bool, optional) – 如果为 True,数据加载器将在返回之前将张量复制到设备/CUDA 固定内存中。如果你的数据元素是自定义类型,或者你的 collate_fn 返回一个自定义类型的批次,请参阅下面的示例。
** drop_last (bool, optional) – 设置为 True 以丢弃最后一个不完整的批次,如果数据集的大小不能被批次大小整除。如果为 False 且数据集大小不能被批次大小整除,则最后一个批次将更小。(默认:False)
** timeout (numeric, optional) – 如果为正数,则为从工作进程收集批次的超时值。应该始终为非负数。(默认:0)
** worker_init_fn (Callable, optional) – 如果不为 None,这将在每个工作进程子进程上调用,并将工作进程 ID([0, num_workers - 1] 中的整数)作为输入,在播种和数据加载之前。(默认:None)
** multiprocessing_context (str or multiprocessing.context.BaseContext, optional) – 如果为 None,将使用操作系统的默认 多进程上下文。(默认:None)
** generator (torch.Generator, optional) – 如果不为 None,则随机采样器将使用此 RNG 生成随机索引,多进程将使用此 RNG 为工作进程生成 base_seed。(默认:None)
** prefetch_factor (int, optional, keyword-only arg) – 每个工作进程预先加载的批次数量。2 表示所有工作进程总共预取 2 * num_workers 个批次。(默认值取决于 num_workers 的设置值。如果 num_workers = 0,则默认值为 ** None。否则,如果 num_workers > 0,则默认值为 2)。
** persistent_workers (bool, optional) – 如果为 True,数据加载器在数据集被消费一次后不会关闭工作进程。这允许将工作进程的 Dataset 实例保持活动。(默认:False)
** pin_memory_device (str, optional) – 如果 pin_memory 为 True,则为要 pin_memory 的设备。

警告

如果使用 spawn 启动方法,则 worker_init_fn 不能是不可腌制的对象,例如 lambda 函数。有关 PyTorch 中多进程的更多详细信息,请参阅 多进程最佳实践。

警告

len(dataloader) 启发式基于所用采样器的长度。当 dataset 是一个 IterableDataset 时,它将返回一个基于 len(dataset) / batch_size 的估计值,并根据 drop_last 进行适当的舍入,而与多进程加载配置无关。这代表了 PyTorch 可以做出的最佳猜测,因为 PyTorch 相信用户 dataset 代码在正确处理多进程加载以避免重复数据方面。

但是,如果分片导致多个工作进程具有不完整的最后一个批次,则此估计仍然可能不准确,因为 (1) 一个原本完整的批次可以被分成多个批次,(2) 当设置了 drop_last 时,可以丢弃多个批次。不幸的是,PyTorch 通常无法检测到此类情况。

有关这两种类型的数据集以及 IterableDataset 如何与 多进程数据加载 相互作用的更多详细信息,请参阅 数据集类型。

警告

有关随机种子相关问题的更多信息,请参阅 可重复性,以及 我的数据加载器工作进程返回相同的随机数 和 多进程数据加载中的随机性 说明。

classtorch.utils.data.Dataset(*args, **kwds)[source]
一个表示 Dataset 的抽象类。

所有表示从键到数据样本的映射的数据集都应该继承它。所有子类都应该覆盖 getitem(),支持获取给定键的数据样本。子类还可以选择覆盖 len(),这将返回许多 Sampler 实现和 DataLoader 的默认选项的数据集大小。子类还可以选择实现 getitems(),以加快批处理样本加载速度。此方法接受批次样本索引列表并返回样本列表。

注意

DataLoader 默认情况下会构造一个生成整数索引的索引采样器。要使其与具有非整数索引/键的映射式数据集一起使用,必须提供自定义采样器。

1
class torch.utils.data.IterableDataset(*args, **kwds)[source]

一个可迭代的 Dataset。

所有表示数据样本可迭代的数据集都应该继承它。这种形式的数据集在数据来自流时特别有用。

所有子类都应该覆盖 iter(),它将返回此数据集中样本的迭代器。

当子类与 DataLoader 一起使用时,数据集中的每个项目都将从 DataLoader 迭代器中生成。当 num_workers > 0 时,每个工作进程将拥有数据集对象的副本,因此通常需要独立配置每个副本,以避免工作进程返回重复数据。 get_worker_info(),在工作进程中调用时,返回有关工作进程的信息。它可以在数据集的 iter() 方法或 DataLoader 的 worker_init_fn 选项中使用,以修改每个副本的行为。

示例 1:在 iter() 中将工作负载分配到所有工作进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
>>> class MyIterableDataset(torch.utils.data.IterableDataset):
... def __init__(self, start, end):
... super(MyIterableDataset).__init__()
... assert end > start, "this example code only works with end >= start"
... self.start = start
... self.end = end
...
... def __iter__(self):
... worker_info = torch.utils.data.get_worker_info()
... if worker_info is None: # single-process data loading, return the full iterator
... iter_start = self.start
... iter_end = self.end
... else: # in a worker process
... # split workload
... per_worker = int(math.ceil((self.end - self.start) / float(worker_info.num_workers)))
... worker_id = worker_info.id
... iter_start = self.start + worker_id * per_worker
... iter_end = min(iter_start + per_worker, self.end)
... return iter(range(iter_start, iter_end))
...
>>> # should give same set of data as range(3, 7), i.e., [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)

>>> # Single-process loading
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=0)))
[tensor([3]), tensor([4]), tensor([5]), tensor([6])]

>>> # Mult-process loading with two worker processes
>>> # Worker 0 fetched [3, 4]. Worker 1 fetched [5, 6].
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=2)))
[tensor([3]), tensor([5]), tensor([4]), tensor([6])]

>>> # With even more workers
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=12)))
[tensor([3]), tensor([5]), tensor([4]), tensor([6])]

示例 2:使用 worker_init_fn 将工作负载分配到所有工作进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
>>> class MyIterableDataset(torch.utils.data.IterableDataset):
... def __init__(self, start, end):
... super(MyIterableDataset).__init__()
... assert end > start, "this example code only works with end >= start"
... self.start = start
... self.end = end
...
... def __iter__(self):
... return iter(range(self.start, self.end))
...
>>> # should give same set of data as range(3, 7), i.e., [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)

>>> # Single-process loading
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=0)))
[3, 4, 5, 6]
>>>
>>> # Directly doing multi-process loading yields duplicate data
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=2)))
[3, 3, 4, 4, 5, 5, 6, 6]

>>> # Define a `worker_init_fn` that configures each dataset copy differently
>>> def worker_init_fn(worker_id):
... worker_info = torch.utils.data.get_worker_info()
... dataset = worker_info.dataset # the dataset copy in this worker process
... overall_start = dataset.start
... overall_end = dataset.end
... # configure the dataset to only process the split workload
... per_worker = int(math.ceil((overall_end - overall_start) / float(worker_info.num_workers)))
... worker_id = worker_info.id
... dataset.start = overall_start + worker_id * per_worker
... dataset.end = min(dataset.start + per_worker, overall_end)
...

>>> # Mult-process loading with the custom `worker_init_fn`
>>> # Worker 0 fetched [3, 4]. Worker 1 fetched [5, 6].
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=2, worker_init_fn=worker_init_fn)))
[3, 5, 4, 6]

>>> # With even more workers
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=12, worker_init_fn=worker_init_fn)))
[3, 4, 5, 6]
1
class torch.utils.data.TensorDataset(*tensors)[source]

包装张量的 Dataset。

每个样本将通过沿第一维索引张量来检索。

参数
** tensors (Tensor) – 具有相同第一维大小的张量。

1
classtorch.utils.data.StackDataset(*args, **kwargs)[source]

将多个数据集堆叠在一起的 Dataset。

此类对于将作为数据集给出的复杂输入数据的不同部分组合在一起非常有用。

示例

1
2
3
4
5
6
>>> images = ImageDataset()
>>> texts = TextDataset()
>>> tuple_stack = StackDataset(images, texts)
>>> tuple_stack[0] == (images[0], texts[0])
>>> dict_stack = StackDataset(image=images, text=texts)
>>> dict_stack[0] == {'image': images[0], 'text': texts[0]}

参数
** args (Dataset) – 堆叠的数据集,作为元组返回。
** kwargs (Dataset) – 堆叠的数据集,作为字典返回。

1
class torch.utils.data.ConcatDataset(datasets)[source]

将多个数据集连接在一起的 Dataset。

此类对于将不同的现有数据集组合在一起非常有用。

参数
** datasets (sequence) – 要连接的数据集列表

1
class torch.utils.data.ChainDataset(datasets)[source]

用于将多个 IterableDataset 连接在一起的 Dataset。

此类对于将不同的现有数据集流组合在一起非常有用。连接操作是在运行时完成的,因此使用此类连接大型数据集将非常高效。

参数
** datasets (iterable of IterableDataset) – 要连接在一起的数据集

1
class torch.utils.data.Subset(dataset, indices)[source]

在指定索引处的数据集的子集。

参数
** dataset (Dataset) – 整个 Dataset
** indices (sequence) – 为子集选择的整个集合中的索引

1
torch.utils.data._utils.collate.collate(batch, *, collate_fn_map=None)[source]

处理每个批次中元素的集合类型的通用整理函数。

该函数还打开函数注册表以处理特定的元素类型。 default_collate_fn_map 为张量、NumPy 数组、数字和字符串提供默认的整理函数。

参数
** batch – 要整理的单个批次

collate_fn_map (Optional[Dict[Union[Type, Tuple[Type, …]], Callable]]) – 可选字典,将元素类型映射到相应的整理函数。如果元素类型不在此字典中,则此函数将按插入顺序遍历字典中的每个键,如果元素类型是键的子类,则调用相应的整理函数。

示例

def collate_tensor_fn(batch, *, collate_fn_map):
… # Extend this function to handle batch of tensors
… return torch.stack(batch, 0)
def custom_collate(batch):
… collate_map = {torch.Tensor: collate_tensor_fn}
… return collate(batch, collate_fn_map=collate_map)

Extend default_collate by in-place modifying default_collate_fn_map

default_collate_fn_map.update({torch.Tensor: collate_tensor_fn})
注意

每个整理函数都需要一个批次的定位参数和一个整理函数字典的关键字参数,作为 collate_fn_map。

torch.utils.data.default_collate(batch)[source]
接收一批数据,并将批次中的元素放入一个具有额外的外部维度(批次大小)的张量中。

确切的输出类型可以是 torch.Tensor、Sequence 的 torch.Tensor、torch.Tensor 的集合,或者保持不变,具体取决于输入类型。 当在 DataLoader 中定义了 batch_size 或 batch_sampler 时,这将用作合并的默认函数。

以下是通用输入类型(基于批次内元素的类型)到输出类型映射

torch.Tensor -> torch.Tensor(添加了外部维度批次大小)

NumPy 数组 -> torch.Tensor

float -> torch.Tensor

int -> torch.Tensor

str -> str(保持不变)

bytes -> bytes(保持不变)

Mapping[K, V_i] -> Mapping[K, default_collate([V_1, V_2, …])]

NamedTuple[V1_i, V2_i, …] -> NamedTuple[default_collate([V1_1, V1_2, …]), default_collate([V2_1, V2_2, …]), …]

Sequence[V1_i, V2_i, …] -> Sequence[default_collate([V1_1, V1_2, …]), default_collate([V2_1, V2_2, …]), …]

参数
batch – 要整理的单个批次

示例

Example with a batch of ints:

default_collate([0, 1, 2, 3])
tensor([0, 1, 2, 3])

Example with a batch of strs:

default_collate([‘a’, ‘b’, ‘c’])
[‘a’, ‘b’, ‘c’]

Example with Map inside the batch:

default_collate([{‘A’: 0, ‘B’: 1}, {‘A’: 100, ‘B’: 100}])
{‘A’: tensor([ 0, 100]), ‘B’: tensor([ 1, 100])}

Example with NamedTuple inside the batch:

Point = namedtuple(‘Point’, [‘x’, ‘y’])
default_collate([Point(0, 0), Point(1, 1)])
Point(x=tensor([0, 1]), y=tensor([0, 1]))

Example with Tuple inside the batch:

default_collate([(0, 1), (2, 3)])
[tensor([0, 2]), tensor([1, 3])]

Example with List inside the batch:

default_collate([[0, 1], [2, 3]])
[tensor([0, 2]), tensor([1, 3])]

Two options to extend default_collate to handle specific type

Option 1: Write custom collate function and invoke default_collate

def custom_collate(batch):
… elem = batch[0]
… if isinstance(elem, CustomType): # Some custom condition
… return …
… else: # Fall back to default_collate
… return default_collate(batch)

Option 2: In-place modify default_collate_fn_map

def collate_customtype_fn(batch, *, collate_fn_map=None):
… return …
default_collate_fn_map.update(CustomType, collate_customtype_fn)
default_collate(batch) # Handle CustomType automatically
torch.utils.data.default_convert(data)[source]
将每个 NumPy 数组元素转换为 torch.Tensor。

如果输入是 Sequence、Collection 或 Mapping,它会尝试将内部的每个元素转换为 torch.Tensor。 如果输入不是 NumPy 数组,则保持不变。 当在 DataLoader 中未定义 batch_sampler 和 batch_size 时,这将用作合并的默认函数。

通用输入类型到输出类型映射类似于 default_collate()。 有关更多详细信息,请参阅那里的描述。

参数
data – 要转换的单个数据点

示例

Example with int

default_convert(0)
0

Example with NumPy array

default_convert(np.array([0, 1]))
tensor([0, 1])

Example with NamedTuple

Point = namedtuple(‘Point’, [‘x’, ‘y’])
default_convert(Point(0, 0))
Point(x=0, y=0)
default_convert(Point(np.array(0), np.array(0)))
Point(x=tensor(0), y=tensor(0))

Example with List

default_convert([np.array([0, 1]), np.array([2, 3])])
[tensor([0, 1]), tensor([2, 3])]
torch.utils.data.get_worker_info()[source]
返回有关当前 DataLoader 迭代器工作进程的信息。

在工作进程中调用时,这将返回一个保证具有以下属性的对象

id:当前工作进程 ID。

num_workers:工作进程的总数。

seed:为当前工作进程设置的随机种子。 此值由主进程 RNG 和工作进程 ID 确定。 有关更多详细信息,请参阅 DataLoader 的文档。

dataset:此进程中数据集对象的副本。 请注意,这将在与主进程不同的进程中成为不同的对象。

在主进程中调用时,这将返回 None。

注意

当在传递给 DataLoader 的 worker_init_fn 中使用时,此方法可用于以不同的方式设置每个工作进程,例如,使用 worker_id 配置 dataset 对象以仅读取分片数据集的特定部分,或使用 seed 为数据集代码中使用的其他库设置种子。

返回类型
Optional[WorkerInfo]

torch.utils.data.random_split(dataset, lengths, generator=<torch._C.Generator object>)[source]
将数据集随机拆分为给定长度的非重叠新数据集。

如果给定一个加起来为 1 的分数列表,则长度将自动计算为每个提供的分数的 floor(frac * len(dataset))。

计算长度后,如果存在任何余数,将以循环方式将 1 个计数分配给长度,直到没有余数为止。

可以选择固定生成器以获得可重复的结果,例如:

示例

generator1 = torch.Generator().manual_seed(42)
generator2 = torch.Generator().manual_seed(42)
random_split(range(10), [3, 7], generator=generator1)
random_split(range(30), [0.3, 0.3, 0.4], generator=generator2)
参数
dataset (Dataset) – 要拆分的Dataset

lengths (sequence) – 要生成的拆分的长度或分数

generator (Generator) – 用于随机排列的生成器。

返回类型
List[Subset[T]]

classtorch.utils.data.Sampler(data_source=None)[source]
所有 Sampler 的基类。

每个 Sampler 子类都必须提供一个 iter() 方法,提供一种方法来迭代数据集元素的索引或索引列表(批次),并且可以提供一个 len() 方法,该方法返回返回的迭代器的长度。

参数
data_source (Dataset) – 此参数未使用,将在 2.2.0 中删除。 您可能仍然拥有利用它的自定义实现。

示例

class AccedingSequenceLengthSampler(Sampler[int]):
def init(self, data: List[str]) -> None:
self.data = data

def __len__(self) -> int:
    return len(self.data)

def __iter__(self) -> Iterator[int]:
    sizes = torch.tensor([len(x) for x in self.data])
    yield from torch.argsort(sizes).tolist()

class AccedingSequenceLengthBatchSampler(Sampler[List[int]]):
def init(self, data: List[str], batch_size: int) -> None:
self.data = data
self.batch_size = batch_size

def __len__(self) -> int:
    return (len(self.data) + self.batch_size - 1) // self.batch_size

def __iter__(self) -> Iterator[List[int]]:
    sizes = torch.tensor([len(x) for x in self.data])
    for batch in torch.chunk(torch.argsort(sizes), len(self)):
        yield batch.tolist()

注意

len() 方法不是 DataLoader 严格要求的,但预期在涉及 DataLoader 长度的任何计算中。

classtorch.utils.data.SequentialSampler(data_source)[source]
按顺序采样元素,始终以相同的顺序。

参数
data_source (Dataset) – 要从中采样的数据集

classtorch.utils.data.RandomSampler(data_source, replacement=False, num_samples=None, generator=None)[source]
随机采样元素。 如果没有替换,则从洗牌后的数据集采样。

如果替换,则用户可以指定 num_samples 以进行抽取。

参数
data_source (Dataset) – 要从中采样的数据集

replacement (bool) – 如果 True,则按需从替换中抽取样本,默认值为 False

num_samples (int) – 要抽取的样本数量,默认值为 len(dataset)

generator (Generator) – 采样中使用的生成器。

classtorch.utils.data.SubsetRandomSampler(indices, generator=None)[source]
从给定的索引列表中随机采样元素,不进行替换。

参数
indices (sequence) – 索引序列

generator (Generator) – 采样中使用的生成器。

classtorch.utils.data.WeightedRandomSampler(weights, num_samples, replacement=True, generator=None)[source]
从 [0,..,len(weights)-1] 中根据给定的概率(权重)采样元素。

参数
weights (sequence) – 权重序列,不必加起来为1

num_samples (int) – 要抽取的样本数

replacement (bool) – 如果为 True,则有放回地抽取样本。否则,则无放回地抽取样本,这意味着当为一行抽取一个样本索引时,就不能再为该行抽取该索引。

generator (Generator) – 采样中使用的生成器。

示例

list(WeightedRandomSampler([0.1, 0.9, 0.4, 0.7, 3.0, 0.6], 5, replacement=True))
[4, 4, 1, 4, 5]
list(WeightedRandomSampler([0.9, 0.4, 0.05, 0.2, 0.3, 0.1], 5, replacement=False))
[0, 1, 4, 3, 2]
classtorch.utils.data.BatchSampler(sampler, batch_size, drop_last)[source]
封装另一个采样器以生成一个索引小批量。

参数
sampler (Sampler or Iterable) – 基本采样器。可以是任何可迭代对象

batch_size (int) – 小批量的尺寸。

drop_last (bool) – 如果为 True,则采样器将丢弃最后一个小批量,如果其尺寸小于 batch_size

示例

list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=False))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
list(BatchSampler(SequentialSampler(range(10)), batch_size=3, drop_last=True))
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
classtorch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)[source]
限制数据加载到数据集子集的采样器。

它在与 torch.nn.parallel.DistributedDataParallel 结合使用时特别有用。在这种情况下,每个进程都可以将 DistributedSampler 实例作为 DataLoader 采样器传递,并加载一个独属于它的原始数据集的子集。

注意

假设数据集大小恒定,并且任何实例都始终以相同的顺序返回相同的元素。

参数
dataset – 用于采样的数据集。

num_replicas (int, optional) – 参与分布式训练的进程数。默认情况下,world_size 从当前分布式组中获取。

rank (int, optional) – 当前进程在 num_replicas 中的排名。默认情况下,rank 从当前分布式组中获取。

shuffle (bool, optional) – 如果为 True(默认),则采样器将随机打乱索引。

seed (int, optional) – 如果 shuffle=True,则用于随机打乱采样器的随机种子。此数字在分布式组中的所有进程中应该相同。默认值:0。

drop_last (bool, optional) – 如果为 True,则采样器将丢弃数据的尾部,使其在副本数上均匀可分。如果为 False,则采样器将添加额外的索引以使数据在副本上均匀可分。默认值:False。

警告

在分布式模式下,在每个纪元开始时调用 set_epoch() 方法,在创建 DataLoader 迭代器之前,对于在多个纪元中使随机打乱正常工作是必要的。否则,将始终使用相同的排序。

示例

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None),
… sampler=sampler)
for epoch in range(start_epoch, n_epochs):
… if is_distributed:
… sampler.set_epoch(epoch)
… train(loader)