|
# We retain the copyright notice by NVIDIA from the original code. However, we
|
|
# we reserve our rights on the modifications based on the original code.
|
|
#
|
|
# *****************************************************************************
|
|
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
# * Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the NVIDIA CORPORATION nor the
|
|
# names of its contributors may be used to endorse or promote products
|
|
# derived from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
# DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
|
|
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
# *****************************************************************************
|
|
import os
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
import argparse
|
|
|
|
import torch
|
|
import torch.distributed as dist
|
|
from torch.autograd import Variable
|
|
|
|
def reduce_tensor(tensor, num_gpus):
|
|
rt = tensor.clone()
|
|
dist.all_reduce(rt, op=dist.reduce_op.SUM)
|
|
# rt /= (num_gpus*2)
|
|
rt /=num_gpus
|
|
return rt
|
|
|
|
def init_distributed(rank, num_gpus, group_name, dist_backend, dist_url):
|
|
assert torch.cuda.is_available(), "Distributed mode requires CUDA."
|
|
print("Initializing Distributed")
|
|
|
|
# Set cuda device so everything is done on the right GPU.
|
|
torch.cuda.set_device(rank % torch.cuda.device_count())
|
|
|
|
# os.environ['MASTER_ADDR'] = '172.31.44.232'
|
|
# os.environ['MASTER_PORT'] = '58217'
|
|
# Initialize distributed communication
|
|
dist.init_process_group(dist_backend, init_method=dist_url,
|
|
world_size=num_gpus, rank=rank,
|
|
group_name=group_name)
|
|
|
|
def _flatten_dense_tensors(tensors):
|
|
"""Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of
|
|
same dense type.
|
|
Since inputs are dense, the resulting tensor will be a concatenated 1D
|
|
buffer. Element-wise operation on this buffer will be equivalent to
|
|
operating individually.
|
|
Arguments:
|
|
tensors (Iterable[Tensor]): dense tensors to flatten.
|
|
Returns:
|
|
A contiguous 1D buffer containing input tensors.
|
|
"""
|
|
if len(tensors) == 1:
|
|
return tensors[0].contiguous().view(-1)
|
|
flat = torch.cat([t.contiguous().view(-1) for t in tensors], dim=0)
|
|
return flat
|
|
|
|
def _unflatten_dense_tensors(flat, tensors):
|
|
"""View a flat buffer using the sizes of tensors. Assume that tensors are of
|
|
same dense type, and that flat is given by _flatten_dense_tensors.
|
|
Arguments:
|
|
flat (Tensor): flattened dense tensors to unflatten.
|
|
tensors (Iterable[Tensor]): dense tensors whose sizes will be used to
|
|
unflatten flat.
|
|
Returns:
|
|
Unflattened dense tensors with sizes same as tensors and values from
|
|
flat.
|
|
"""
|
|
outputs = []
|
|
offset = 0
|
|
for tensor in tensors:
|
|
numel = tensor.numel()
|
|
outputs.append(flat.narrow(0, offset, numel).view_as(tensor))
|
|
offset += numel
|
|
return tuple(outputs)
|
|
|
|
def apply_gradient_allreduce(module):
|
|
"""
|
|
Modifies existing model to do gradient allreduce, but doesn't change class
|
|
so you don't need "module"
|
|
"""
|
|
if not hasattr(dist, '_backend'):
|
|
module.warn_on_half = True
|
|
else:
|
|
module.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False
|
|
|
|
for p in module.state_dict().values():
|
|
if not torch.is_tensor(p):
|
|
continue
|
|
dist.broadcast(p, 0)
|
|
|
|
def allreduce_params():
|
|
if(module.needs_reduction):
|
|
module.needs_reduction = False
|
|
buckets = {}
|
|
for param in module.parameters():
|
|
if param.requires_grad and param.grad is not None:
|
|
tp = type(param.data)
|
|
if tp not in buckets:
|
|
buckets[tp] = []
|
|
buckets[tp].append(param)
|
|
if module.warn_on_half:
|
|
if torch.cuda.HalfTensor in buckets:
|
|
print("WARNING: gloo dist backend for half parameters may be extremely slow." +
|
|
" It is recommended to use the NCCL backend in this case. This currently requires" +
|
|
"PyTorch built from top of tree master.")
|
|
module.warn_on_half = False
|
|
|
|
for tp in buckets:
|
|
bucket = buckets[tp]
|
|
grads = [param.grad.data for param in bucket]
|
|
coalesced = _flatten_dense_tensors(grads)
|
|
dist.all_reduce(coalesced)
|
|
coalesced /= dist.get_world_size()
|
|
for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
|
|
buf.copy_(synced)
|
|
|
|
for param in list(module.parameters()):
|
|
def allreduce_hook(*unused):
|
|
Variable._execution_engine.queue_callback(allreduce_params)
|
|
if param.requires_grad:
|
|
param.register_hook(allreduce_hook)
|
|
dir(param)
|
|
|
|
def set_needs_reduction(self, input, output):
|
|
self.needs_reduction = True
|
|
|
|
module.register_forward_hook(set_needs_reduction)
|
|
return module
|
|
|
|
|
|
def main(config, stdout_dir, args_str):
|
|
args_list = ['-u']
|
|
args_list.append('train.py')
|
|
args_list += args_str.split(' ') if len(args_str) > 0 else []
|
|
|
|
args_list.append('--config={}'.format(config))
|
|
|
|
num_gpus = torch.cuda.device_count()
|
|
args_list.append('--num_gpus={}'.format(num_gpus))
|
|
args_list.append("--group_name=group_{}".format(time.strftime("%Y_%m_%d-%H%M%S")))
|
|
|
|
if not os.path.isdir(stdout_dir):
|
|
os.makedirs(stdout_dir)
|
|
os.chmod(stdout_dir, 0o775)
|
|
|
|
workers = []
|
|
|
|
for i in range(num_gpus):
|
|
args_list[-2] = '--rank={}'.format(i)
|
|
stdout = None if i == 0 else open(
|
|
os.path.join(stdout_dir, "GPU_{}.log".format(i)), "w")
|
|
print(args_list)
|
|
p = subprocess.Popen([str(sys.executable)]+args_list, stdout=stdout)
|
|
workers.append(p)
|
|
|
|
for p in workers:
|
|
p.wait()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-c', '--config', type=str, required=True,
|
|
help='JSON file for configuration')
|
|
parser.add_argument('-s', '--stdout_dir', type=str, default=".",
|
|
help='directory to save stoud logs')
|
|
parser.add_argument(
|
|
'-a', '--args_str', type=str, default='',
|
|
help='double quoted string with space separated key value pairs')
|
|
|
|
args = parser.parse_args()
|
|
main(args.config, args.stdout_dir, args.args_str)
|