Pytorch 多GPU训练

Stella981
• 阅读 829

概述

Pytorch多GPU训练本质上是数据并行,每个GPU上拥有整个模型的参数,将一个batch的数据均分成N份,每个GPU处理一份数据,然后将每个GPU上的梯度进行整合得到整个batch的梯度,用整合后的梯度更新所有GPU上的参数,完成一次迭代。

其中多gpu训练的方案有两种,一种是利用nn.DataParallel实现,这种方法是最早引入pytorch的,使用简单方便,不涉及多进程。另一种是用torch.nn.parallel.DistributedDataParalleltorch.utils.data.distributed.DistributedSampler 结合多进程实现,第二种方式效率更高,参考,但是实现起来稍难, 第二种方式同时支持多节点分布式实现。方案二的效率要比方案一高,即使是在单运算节点上,参考pytorch doc:

In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data parallelism, including torch.nn.DataParallel():

本篇文章将详细介绍这两种方式的实现,只限于单机上实现,分布式较为复杂,下一篇文章再介绍。 参考:

方案一

步骤

  • 将model用nn.DataParallel wrap.

    model = nn.DataParallel(model)

  • os.environ["CUDA_VISIBLE_DEVICES"]="0"指定当前程序可以使用GPU设备号,如果不指定将会使用设备上所有的GPU设备。

    os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2" #使用3个GPU

  • model.cuda()或者model.to("cuda")和data.cuda()或者data.to("cuda")将模型和数据放入GPU上。

训练过程与使用单GPU一致,使用这种方法,pytorch会自动的将batch数据拆分为N份(N是用os.environ指定的GPU数量),分别forward,backward,然后自动整合每个GPU上的梯度,在一块GPU上update参数,最后将参数广播给其他GPU,完成一次迭代。

测试

代码:

展开 \`\`\`python import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader import os

dataset

class RandomDataset(Dataset):

def __init__(self, size, length):
    self.len = length
    self.data = torch.randn(length, size)

def __getitem__(self, index):
    return self.data[index]

def __len__(self):
    return self.len

model define

class Model(nn.Module): # Our model

def __init__(self, input_size, output_size):
    super(Model, self).__init__()
    self.fc = nn.Linear(input_size, output_size)

def forward(self, input):
    output = self.fc(input)
    print("\tIn Model: input size", input.size(),
          "output size", output.size())

    return output

if name=="main": # Parameters input_size = 5 output_size = 2

batch_size = 30
data_size = 100

dataset = RandomDataset(input_size, data_size)
# dataloader define
rand_loader = DataLoader(dataset=dataset,
                        batch_size=batch_size, shuffle=True)

# model init
model = Model(input_size, output_size)

# cuda devices
os.environ["CUDA_VISIBLE_DEVICES"]="0,1"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
    model = nn.DataParallel(model)

model.to(device)

for data in rand_loader:
    input = data.to(device)
    output = model(input)
    # loss

    # backward

    #update
    
    time.sleep(1)#模拟一个比较长的batch时间
    print("Outside: input size", input.size(),
        "output_size", output.size())

torch.save(model.module.state_dict(), "model.pth")


</details>

- 如果使用一块GPU,则测试结果为如下,可以看出模型内部与外部输入输出是一致的。


    In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])

Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

- 如果使用两块GPU,则测试结果如下,可以看出自动进行batch的拆分。


    In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
    In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])

Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2]) Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) In Model: input size torch.Size([5, 5]) output size torch.Size([5, 2]) Outside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

### 注意
- 多个GPU有主次之分,主GPU负责梯度的整合和参数的更新,将更新后的参数传递给其他GPU,完成一次迭代,所以这个过程中GPU之间既有参数传递,又有参数的梯度传递。
- 关于Batch Norm的问题,[参考](https://discuss.pytorch.org/t/how-does-dataparallel-handels-batch-norm/14040)。因为一个大的batch被拆分成了多个minibatch,所以normalization只会在minibatch上做,最后测试时用的Noramlization layer的参数是主GPU上的。如果想实现多个GPU上同步Normalization需要用[sync norm](https://github.com/vacancy/Synchronized-BatchNorm-PyTorch)实现。这个normalization的问题在分布式训练中依然存在。

## 方案二
方案二是用多进程来实现的,其实分布式就是多进程的意思,分布在多个机器上的进程,利用网络通信协调彼此。关于分布式的处理下一篇文章再详细介绍。这里主要介绍单机上方案二与方案一的不同。首先每个进程都有独立的训练过程,一次迭代后share梯度,整合梯度,独立更新参数。迭代过程中不会进行参数的传递(初始化时会同步所有进程上的参数)。其次进程之间的通信采用了NCCL,当然NCCL已经是pytorch内部支持了,所以一般情况下不用理这个。分布式的细节参考下一篇文章,这里只给出最简单的实现。

### 步骤
- 需要先初始化进程组,这里采用默认的方式初始化,对于单节点来说这也是最方便的一种初始化方式,初始化的目的是让所有的进程彼此建立联系,即知道彼此的位置,状态等信息。
- dataset prepare,增加`torch.utils.data.distributed.DistributedSampler`. 具体使用参见测试部分的代码。
- model prepare, 增加`torch.nn.parallel.DistributedDataParallel`. 具体使用参见测试部分的代码。
- 训练过程与方案一一致,想象同时有多个进程在同时运行training的代码即可。

### 测试
代码与方案一类似,需要初始化进程组,表示本程序是分布式训练的。多进程的创建通过指定`python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1`来实现的,nnodes为1,因为这里我们是一个计算节点,`nproc_per_node=2`表示需要创建两个进程来训练,然后每个进程都获得分配给它rank号,rank唯一标识一个进程,rank 0为master,其他是slave。当然一般是需要两个GPU的,测试程序中是根据rank来指定进程使用GPU,即rank 0使用GPU0,rank 1进程使用GPU1。需要根据数据集创建一个分布式的sampler,初始化dataloader的时候要指定这个sampler,模型分布式封装详见代码。
代码:
<details> 
<summary>展开</summary>  
```python
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
import torch.distributed as dist
import torch.utils.data.distributed
import sys
import time


# dataset
class RandomDataset(Dataset):

    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

# model define
class Model(nn.Module):
    # Our model

    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        # print("\tIn Model: input size", input.size(),
        #       "output size", output.size())

        return output

if __name__=="__main__":
    # Parameters
    input_size = 5
    output_size = 2

    batch_size = 30
    data_size = 100

    # check the nccl backend
    if not dist.is_nccl_available():
        print("Error: nccl backend not available.")
        sys.exit(1)

    # init group
    dist.init_process_group(backend="nccl", init_method="env://")

    # get the process rank and the world size
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # prepare the dataset
    dataset = RandomDataset(input_size, data_size)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
    

    rand_loader = DataLoader(dataset, batch_size=batch_size//world_size, 
                              shuffle=(train_sampler is None), 
                              sampler=train_sampler)

    # dataloader define
    # rand_loader = DataLoader(dataset=dataset,
    #                         batch_size=batch_size, shuffle=True)

    # model init
    model = Model(input_size, output_size)

    # cuda devices
    # os.environ["CUDA_VISIBLE_DEVICES"]="0"
    # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # if torch.cuda.device_count() > 1:
    #     print("Let's use", torch.cuda.device_count(), "GPUs!")
    #     # dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
    #     model = nn.DataParallel(model)
    # model.to(device)

    # distribute model define
    device = torch.device('cuda', rank)
    model = model.to(device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank)
    print("From rank %d: start training, time:%s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
    for data in rand_loader:
        input = data.to(device)
        output = model(input)
        # loss

        # backward

        #update
        
        time.sleep(1)#模拟一个比较长的batch时间
        print("From rank %d: Outside: input size %s, output size %s"%(rank, str(input.size()), str(output.size())),flush=True)
    torch.save(model.module.state_dict(), "model_%d.pth"%rank)
    print("From rank %d: end training, time: %s"%(rank, time.strftime("%Y-%m-%d %H:%M:%S")))
- 运行命令 \`python -m torch.distributed.launch --nproc\_per\_node=2 --nnodes=1 simple\_test.py\` - 结果 \`\`\` python From rank 0: start training, time:2019-09-26 13:20:13 From rank 1: start training, time:2019-09-26 13:20:13 From rank 0: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\]) From rank 1: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\]) From rank 0: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\]) From rank 1: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\]) From rank 1: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\])From rank 0: Outside: input size torch.Size(\[15, 5\]), output size torch.Size(\[15, 2\])

From rank 0: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 0: end training, time: 2019-09-26 13:20:17 From rank 1: Outside: input size torch.Size([5, 5]), output size torch.Size([5, 2]) From rank 1: end training, time: 2019-09-26 13:20:17


Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.


我直接将测试果贴上来,可以看出有点乱,是由于多进程并行导致的问题,仔细看可以看出有两个进程并行训练,每个进程处理半个batch数据。最后的OMP_NUM_THREADS 信息是pytorch lanch的时候打印的,翻译过来就是我没有指定OMP多线程的数目,它为了防止系统过负荷,所以贴心的帮我设置为了1,原码[参考](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py#L214).



## One more Thing
模型的保存与加载,与单GPU的方式有所不同。这里通通将参数以cpu的方式save进存储, 因为如果是保存的GPU上参数,pth文件中会记录参数属于的GPU号,则加载时会加载到相应的GPU上,这样就会导致如果你GPU数目不够时会在加载模型时报错,像下面这样:
``` shell
RuntimeError: Attempting to deserialize object on CUDA device 1 but torch.cuda.device_count() is 1. Please use torch.load with map_location to map your storages to an existing device.

模型保存都是一致的,不过时刻记住方案二中你有多个进程在同时跑,所以会保存多个模型到存储上,如果使用共享存储就要注意文件名的问题,当然一般只在rank0进程上保存参数即可,因为所有进程的模型参数是同步的。

torch.save(model.module.cpu().state_dict(), "model.pth")

模型的加载:

param=torch.load("model.pth")

好了今天就写到这儿,好久没有这么认真的写篇博客了。当然还是有一些地方不够完善,比如关于模型参数同步的检验。如果你有什么问题,或者觉得哪里有不对的地方请在评论区给出,蟹蟹 ^=^。

点赞
收藏
评论区
推荐文章
亚瑟 亚瑟
3年前
Flutter - Flutter渲染机制—GPU线程
基于Flutter1.5,从源码视角来深入剖析flutter渲染机制,相关源码目录见文末附录一、GPU线程渲染看Flutter的渲染绘制过程的核心过程包括在ui线程和gpu线程,上一篇文章已经详细介绍了UI线程的工作原理,本文则介绍GPU线程的工作原理,这里需要注意的是,gpu线程是指运行着GPUTaskRunner的名叫gpu
Stella981 Stella981
2年前
Nvidia GPU如何在Kubernetes 里工作
本文介绍NvidiaGPU设备如何在Kubernetes中管理调度。整个工作流程分为以下两个方面:如何在容器中使用GPUKubernetes如何调度GPU如何在容器中使用GPU想要在容器中的应用可以操作GPU,需要实两个目标1.容器中可以查看GPU设备2.容器中运行的应用,可以通过Nvidi
Stella981 Stella981
2年前
AI为Kubernetes深度学习工作负载创建首个分布式GPU共享系统
近日,AI这家虚拟化AI基础架构的公司,发布了第一个分数GPU共享系统,用于Kubernetes上的深度学习工作负载。分数GPU系统特别适用于诸如推理之类的轻量级AI任务,透明地使数据科学和AI工程团队能够在单个GPU上同时运行多个工作负载,从而使公司能够运行更多的工作负载,例如计算机视觉,语音识别和在同一硬件上进行自然语言处理,从而降低了成本。对于深度
Stella981 Stella981
2年前
Fourinone如何实现并行计算和数据库引擎
关于并行计算的概念有非常多,硬件落地其实就只有两种,CPU上的并行计算和GPU上的并行计算,GPU做点积这样的矢量计算(矩阵计算)有优势,但目前还运行不了操作系统和数据库,比较多用于研究性质的计算。在我们生产系统中运用最多的是CPU上的并行计算,其落地方式也只有两种,多线程和多进程。围绕多线程、多进程结合通信技术的灵活设计,它的应用范围非常广泛,不光用于并行
京东云开发者 京东云开发者
6个月前
记录TritonServer部署多模型到多GPU踩坑 | 京东云技术团队
一、问题是怎么发现的部署chatglm2和llama2到一个4V100的GPU机器上遇到问题config.pbtxt中设置模型分别在指定gpu上部署实例配置不生效如以下配置为在gpu0上部署本模型,部署count1个实例,在gpu1上部署本模型,部署cou
京东云开发者 京东云开发者
6个月前
DeepSpeed: 大模型训练框架 | 京东云技术团队
目前,大模型的发展已经非常火热,关于大模型的训练、微调也是各个公司重点关注方向。但是大模型训练的痛点是模型参数过大,动辄上百亿,如果单靠单个GPU来完成训练基本不可能。所以需要多卡或者分布式训练来完成这项工作。
数字先锋 | “言”之有“力”,大模型背后的算力“推手”!
在算力调度方面,天翼云通过自研的调度系统,协助思必驰DFM2大模型调度GPU、NPU、CPU等异构算力资源,大规模训练上云1个月,可以完成数十亿规模大模型所有阶段训练和效果评估。在训练能力打造方面,天翼云支持多种模型训练方式,不仅可以提升大模型训练平台的数据量,还大幅缩短了训练周期和交付进度。
NVIDIA安培架构下MIG技术分析
关键词:NVIDIA、MIG、安培一什么是MIG2020年5月,NVIDIA发布了最新的GPU架构:安培,以及基于安培架构的最新的GPU:A100。安培提供了许多新的特性,MIG是其中一项非常重要的新特性。MIG的全名是MultiInstanceGPU。NVIDIA安培架构中的MIG模式可以在A100GPU上并行运行七个作业。多实
京东云开发者 京东云开发者
7个月前
使用Triton部署chatglm2-6b模型 | 京东云技术团队
一、技术介绍NVIDIATritonInferenceServer是一个针对CPU和GPU进行优化的云端和推理的解决方案。支持的模型类型包括TensorRT、TensorFlow、PyTorch(metallama/Llama27b)、Python(cha
天翼云GPU云主机:共享信息技术与虚拟机的完美融合
GPU云主机是一种基于云计算技术的虚拟化服务器。它通过虚拟化技术将一台或多台物理服务器资源进行整合,形成一个共享资源池,从而提供弹性的云计算环境。在这个环境下,每个虚拟主机都可以独立运行,拥有自己的操作系统、存储空间和网络资源。与传统的云主机相比,GPU云主机最大的特点在于其配备了高性能的GPU计算卡,使得在进行大规模数据处理和复杂计算时能发挥出更强的计算能力。