Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-node ImageNet DDP implementation #38

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 363 additions & 0 deletions examples/imagenet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
import os
import warnings

import hydra
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torchvision.models as models
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.optim import Adam

best_acc1 = 0


@hydra.main(config_name="imagenetconf")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget to include the config?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure how to use configen and I was mostly looking at mnist_00.py, and I couldn't find the corresponding configuration files for mnist_00.py. Should I just handcraft one and stick it in the example folder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. configen is used to generate configs for libraries. for Examples and user code people should write their own configs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@briankosw

In mnist_00.py, I included the config in the top of the file.

@dataclass
class MNISTConf:
batch_size: int = 64
test_batch_size: int = 1000
epochs: int = 14
no_cuda: bool = False
dry_run: bool = False
seed: int = 1
log_interval: int = 10
save_model: bool = False
checkpoint_name: str = "unnamed.pt"
adadelta: AdadeltaConf = AdadeltaConf()
steplr: StepLRConf = StepLRConf(
step_size=1
) # we pass a default for step_size since it is required, but missing a default in PyTorch (and consequently in hydra-torch)

See the second code block of https://github.com/pytorch/hydra-torch/blob/master/examples/mnist_00.md#parting-with-argparse for the explanation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I think it is cleaner to make your own config python files and then import them. I simply included it in one flat file so that the tutorial reader can see it all in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same thing in the draft mnist_01.py which I have yet to finalize. It's introducing some more hierarchical composition, so you'll see multiple 'non-configen' configs defined. Here's the whole 'hydra block':
https://github.com/pytorch/hydra-torch/blob/examples/mnist_01/examples/mnist_01.py#L10-L56

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll do as you suggested.

def main(cfg):
if cfg.seed is not None:
random.seed(cfg.seed)
torch.manual_seed(cfg.seed)
cudnn.deterministic = True
warnings.warn(
"You have chosen to seed training. "
"This will turn on the CUDNN deterministic setting, "
"which can slow down your training considerably! "
"You may see unexpected behavior when restarting "
"from checkpoints."
)
if cfg.gpu is not None:
warnings.warn(
"You have chosen a specific GPU. This will completely "
"disable data parallelism."
)
if cfg.dist_url == "env://" and cfg.world_size == -1:
cfg.world_size = int(os.environ["WORLD_SIZE"])

global best_acc1
omry marked this conversation as resolved.
Show resolved Hide resolved

if cfg.gpu is not None:
print("Use GPU: {} for training".format(cfg.gpu))
briankosw marked this conversation as resolved.
Show resolved Hide resolved

if cfg.distributed:
if cfg.dist_url == "env://" and cfg.rank == -1:
cfg.rank = int(os.environ["RANK"])
if cfg.multiprocessing_distributed:
cfg.rank = cfg.rank * cfg.ngpus_per_node + gpu
dist.init_process_group(
backend=cfg.dist_backend,
init_method=cfg.dist_url,
world_size=cfg.world_size,
rank=cfg.rank,
)
if cfg.pretrained:
Copy link
Contributor

@romesco romesco Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that we can handle this now, but just putting this out there:

Statements conditioned solely on the config often show up in main files and IMO reduce the readability of the code.

My wish for the future (once we have the hammers) would be to push this logic into the config. There are many pros:

  1. main.py is shorter and cleaner with less nesting or extraneous code.
  2. main.py can be more general. This implies greater extensibility for users and less duplicate code.
  3. As the config is resolved in real time through interpolation / logic resolution, you get to see the 'final' config all in one structure before it is run. This is very powerful in a sense that we now see the outcome of all the intermediate logic that happens in main before training begins.

Obviously this won't work for every case, but there are many (like this one) where I think it would be a trivial change on the user's end.

Copy link
Author

@briankosw briankosw Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree with you, and I think it'd be really nice to get rid of as many conditional statements as possible. I think a combination of using configs and refactoring should get the job done.

print("=> using pre-trained model '{}'".format(cfg.arch))
model = models.__dict__[cfg.arch](pretrained=True)
else:
print("=> creating model '{}'".format(cfg.arch))
model = models.__dict__[cfg.arch]()

if not torch.cuda.is_available():
print("using CPU, this wil be slow")
elif cfg.distributed:
if cfg.gpu is not None:
torch.cuda.set_device(cfg.gpu)
model.cuda(cfg.gpu)
cfg.batch_size = int(cfg.batch_size / cfg.ngpus_per_node)
briankosw marked this conversation as resolved.
Show resolved Hide resolved
model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[cfg.gpu]
)
else:
model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model)
elif cfg.gpu is not None:
torch.cuda.set_device(cfg.gpu)
model = model.cuda(cfg.gpu)
else:
if cfg.arch.startswith("alexnet") or cfg.arch.startswith("vgg"):
model.features = torch.nn.DataParallel(model.features)
model.cuda()
else:
model = torch.nn.DataParallel(model).cuda()

criterion = nn.CrossEntropyLoss().cuda(cfg.gpu)
optimizer = Adam(
briankosw marked this conversation as resolved.
Show resolved Hide resolved
lr=cfg.adam.lr,
rho=cfg.adam.rho,
eps=cfg.adam.eps,
weight_decay=cfg.adam.weight_decay,
params=model.parameters(),
)

if cfg.resume:
if os.path.isfile(cfg.resume):
print("=> loading checkpoint '{}'".format(cfg.resume))
if cfg.gpu is None:
checkpoint = torch.load(cfg.resume)
else:
loc = "cuda:{}".format(cfg.gpu)
checkpoint = torch.load(cfg.resume, map_location=loc)
cfg.start_epoch = checkpoint["epoch"]
best_acc1 = checkpoint["best_acc1"]
if cfg.gpu is not None:
best_acc1 = best_acc1.to(cfg.gpu)
model.load_state_dict(checkpoint["state_dict"])
optimizer.load_state_dict(checkpoint["optimizer"])
print(
"=> loaded checkpoint '{}' (epoch {})".format(
cfg.resume, checkpoint["epoch"]
)
)
else:
print("=> no checkpoint found at '{}'".format(cfg.resume))

cudnn.benchmark = True

traindir = os.path.join(cfg.data, "train")
valdir = os.path.join(cfg.data, "val")
normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
)

train_dataset = datasets.ImageFolder(
train_dir,
transforms.Compose(
[
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
]
),
)

if cfg.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
else:
train_sampler = None

train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=cfg.batch_size,
shuffle=(train_sampler is None),
num_workers=cfg.workers,
pin_memory=True,
sampler=train_sampler,
)

val_loader = torch.utils.data.DataLoader(
datasets.ImageFolder(
valdir,
transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
normalize,
]
),
),
batch_size=args.batch_size,
shuffle=False,
num_workers=args.workers,
pin_memory=True,
)

if cfg.evaluate:
validate(val_loader, model, criterion, cfg)
return

for epoch in range(cfg.start_epoch, cfg.epochs):
if cfg.distributed:
train_sampler.set_epoch(epoch)
adjust_learning_rate(optimizer, epoch, cfg)

train(train_loader, model, criterion, optimizer, epoch, args)
acc1 = validate(val_loader, model, criterion, cfg)

is_best = acc1 > best_acc1
best_acc1 = max(acc1, best_acc1)

if not cfg.multiprocessing_distributed or (
cfg.multiprocessing_distributed and cfg.rank % cfg.ngpus_per_node == 0
):
save_checkpoint(
{
"epoch": epoch + 1,
"arch": cfg.arch,
"state_dict": model.state_dict(),
"best_acc1": best_acc1,
"optimizer": optimizer.state_dict(),
},
is_best,
)


def train(train_loader, model, criterion, optimizer, epoch, cfg):
batch_time = AverageMeter("Time", ":6.3f")
data_time = AverageMeter("Data", ":6.3f")
losses = AverageMeter("Loss", ":.4e")
top1 = AverageMeter("Acc@1", ":6.2f")
top5 = AverageMeter("Acc@5", ":6.2f")
progress = ProgressMeter(
len(train_loader),
[batch_time, data_time, losses, top1, top5],
prefix="Epoch: [{}]".format(epoch),
)

# switch to train mode
model.train()

end = time.time()
for i, (images, target) in enumerate(train_loader):
# measure data loading time
data_time.update(time.time() - end)

if args.gpu is not None:
images = images.cuda(cfg.gpu, non_blocking=True)
if torch.cuda.is_available():
target = target.cuda(cfg.gpu, non_blocking=True)

# compute output
output = model(images)
loss = criterion(output, target)

# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))

# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()

# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()

if i % args.print_freq == 0:
progress.display(i)


def validate(val_loader, model, criterion, cfg):
batch_time = AverageMeter("Time", ":6.3f")
losses = AverageMeter("Loss", ":.4e")
top1 = AverageMeter("Acc@1", ":6.2f")
top5 = AverageMeter("Acc@5", ":6.2f")
progress = ProgressMeter(
len(val_loader), [batch_time, losses, top1, top5], prefix="Test: "
)

# switch to evaluate mode
model.eval()

with torch.no_grad():
end = time.time()
for i, (images, target) in enumerate(val_loader):
if cfg.gpu is not None:
images = images.cuda(cfg.gpu, non_blocking=True)
if torch.cuda.is_available():
target = target.cuda(cfg.gpu, non_blocking=True)

# compute output
output = model(images)
loss = criterion(output, target)

# measure accuracy and record loss
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1[0], images.size(0))
top5.update(acc5[0], images.size(0))

# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()

if i % args.print_freq == 0:
progress.display(i)

# TODO: this should also be done with the ProgressMeter
print(
" * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}".format(top1=top1, top5=top5)
)

return top1.avg


def save_checkpoint(state, is_best, filename="checkpoint.pth.tar"):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, "model_best.pth.tar")


class AverageMeter(object):
"""Computes and stores the average and current value"""

def __init__(self, name, fmt=":f"):
self.name = name
self.fmt = fmt
self.reset()

def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0

def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count

def __str__(self):
fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix

def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print("\t".join(entries))

def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = "{:" + str(num_digits) + "d}"
return "[" + fmt + "/" + fmt.format(num_batches) + "]"


def adjust_learning_rate(optimizer, epoch, cfg):
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
lr = cfg.lr * (0.1 ** (epoch // 30))
for param_group in optimizer.param_groups:
param_group["lr"] = lr


def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)

_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))

res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res


if __name__ == "__main__":
main()