Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-node ImageNet DDP implementation #38

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions examples/imagenet_00.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
import logging
import os
import random
import shutil
import time
import warnings

import hydra
from omegaconf import DictConfig
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torchvision.models as models
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.optim import SGD

logger = logging.getLogger("ImageNet")


@hydra.main(config_name="imagenetconf")
def main(cfg: DictConfig):
if cfg.seed is not None:
random.seed(cfg.seed)
torch.manual_seed(cfg.seed)
cudnn.benchmark = False
cudnn.deterministic = True
if cfg.gpu is not None:
logger.info(f"Use GPU: {cfg.gpu} for training")
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12345"
dist.init_process_group(
backend=cfg.dist_backend,
# init_method=cfg.dist_url,
world_size=cfg.world_size,
rank=cfg.gpu,
)
return

model = models.__dict__[cfg.arch]()
torch.cuda.set_device(cfg.gpu)
model.cuda(cfg.gpu)
# When using a single GPU per process and per
# DistributedDataParallel, we need to divide the batch size
# ourselves based on the total number of GPUs we have
cfg.batch_size = int(cfg.batch_size / cfg.world_size)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[cfg.gpu])

criterion = nn.CrossEntropyLoss().cuda(cfg.gpu)
optimizer = SGD(
params=model.parameters(),
lr=cfg.sgd.lr,
lambd=cfg.sgd.lambd,
alpha=cfg.sgd.alpha,
t0=cfg.sgd.t0,
weight_decay=cfg.sgd.weight_decay,
)

traindir = os.path.join(cfg.data, "train")
valdir = os.path.join(cfg.data, "val")
normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
)

train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose(
[
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
]
),
)

if cfg.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
else:
train_sampler = None

train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=cfg.batch_size,
shuffle=(train_sampler is None),
num_workers=cfg.workers,
pin_memory=True,
sampler=train_sampler,
)

val_loader = torch.utils.data.DataLoader(
datasets.ImageFolder(
valdir,
transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
normalize,
]
),
),
batch_size=cfg.batch_size,
shuffle=False,
num_workers=cfg.workers,
pin_memory=True,
)

if cfg.evaluate:
validate(val_loader, model, criterion, cfg)
return

for epoch in range(cfg.start_epoch, cfg.epochs):
if cfg.distributed:
train_sampler.set_epoch(epoch)
adjust_learning_rate(optimizer, epoch, cfg)

train(train_loader, model, criterion, optimizer, epoch, cfg)
validate(val_loader, model, criterion, cfg)


def train(train_loader, model, criterion, optimizer, epoch, cfg):
batch_time = AverageMeter("Time", ":6.3f")
data_time = AverageMeter("Data", ":6.3f")
losses = AverageMeter("Loss", ":.4e")
top1 = AverageMeter("Acc@1", ":6.2f")
top5 = AverageMeter("Acc@5", ":6.2f")
progress = ProgressMeter(
len(train_loader),
[batch_time, data_time, losses, top1, top5],
prefix="Epoch: [{}]".format(epoch),
)

# switch to train mode
model.train()

end = time.time()
for i, (images, target) in enumerate(train_loader):
# measure data loading time
data_time.update(time.time() - end)

if cfg.gpu is not None:
images = images.cuda(cfg.gpu, non_blocking=True)
if torch.cuda.is_available():
target = target.cuda(cfg.gpu, non_blocking=True)

# compute output
output = model(images)
loss = criterion(output, target)

# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))

# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()

# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()

if i % cfg.print_freq == 0:
progress.display(i)


def validate(val_loader, model, criterion, cfg):
batch_time = AverageMeter("Time", ":6.3f")
losses = AverageMeter("Loss", ":.4e")
top1 = AverageMeter("Acc@1", ":6.2f")
top5 = AverageMeter("Acc@5", ":6.2f")
progress = ProgressMeter(
len(val_loader), [batch_time, losses, top1, top5], prefix="Test: "
)

# switch to evaluate mode
model.eval()

with torch.no_grad():
end = time.time()
for i, (images, target) in enumerate(val_loader):
if cfg.gpu is not None:
images = images.cuda(cfg.gpu, non_blocking=True)
if torch.cuda.is_available():
target = target.cuda(cfg.gpu, non_blocking=True)

# compute output
output = model(images)
loss = criterion(output, target)

# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))

# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()

if i % cfg.print_freq == 0:
progress.display(i)
logger.info(
" * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}".format(top1=top1, top5=top5)
)


class AverageMeter(object):
"""Computes and stores the average and current value"""

def __init__(self, name, fmt=":f"):
self.name = name
self.fmt = fmt
self.reset()

def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0

def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count

def __str__(self):
fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix

def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
logger.info("\t".join(entries))

def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = "{:" + str(num_digits) + "d}"
return "[" + fmt + "/" + fmt.format(num_batches) + "]"


def adjust_learning_rate(optimizer, epoch, cfg):
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
lr = cfg.lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group["lr"] = lr


def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)

_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))

res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res


if __name__ == "__main__":
main()
19 changes: 19 additions & 0 deletions examples/imagenetconf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
seed: ~
gpu: ~
world_size: 4
dist_backend: nccl
dist_url: "localhost:9999"
batch_size: 256
ngpus_per_node: 4
sgd:
lr: 0.01
lambd: 0.0001
alpha: 0.75
t0: 1000000.0
weight_decay: 0

hydra.launcher.joblib.backend: multiprocessing
omry marked this conversation as resolved.
Show resolved Hide resolved


defaults:
- hydra/launcher: joblib