forked from NOBLES5E/DoubleSqueeze
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathps.py
75 lines (61 loc) · 2.68 KB
/
ps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import torch
import torch.nn as nn
import torch.nn.functional as F
import argparse
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import time
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
import os
import argparse
import ray
from persia_pytorch_toolkit import model_utils
from importlib import reload
import model
from compression import *
@ray.remote
class GradientParameterServer(object):
def __init__(self):
self.compensation_buffer = None
pass
def get_ps_gradient(self, *gradients):
# print('server', gradients)
return torch.sum(torch.stack(gradients), dim=0)
def get_quantized_gradient(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_gradient(*x), gradients_inputs))
return tenary_compress_gradient(self.get_ps_gradient(*gradients))[:4]
def get_quantized_gradient_no_requant(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_gradient(*x), gradients_inputs))
return self.get_ps_gradient(*gradients)
def get_random_quantized_gradient(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_gradient(*x), gradients_inputs))
return randomized_tenary_compress_gradient(self.get_ps_gradient(*gradients))[:4]
def get_all_random_quantized_gradient(self, *gradients_inputs):
return gradients_inputs
def get_compensated_quantized_gradient(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_gradient(*x), gradients_inputs))
gradient = self.get_ps_gradient(*gradients)
if self.compensation_buffer is None:
self.compensation_buffer = torch.zeros_like(gradient)
compensated_gradient = gradient + self.compensation_buffer
result = tenary_compress_gradient(compensated_gradient)
self.compensation_buffer = result[-1]
return result[:-1]
def get_compensated_top_k_gradient(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_top_k_gradient(*x), gradients_inputs))
gradient = self.get_ps_gradient(*gradients)
if self.compensation_buffer is None:
self.compensation_buffer = torch.zeros_like(gradient)
compensated_gradient = gradient + self.compensation_buffer
result = top_k_compress_gradient(compensated_gradient)
self.compensation_buffer = result[-1]
return result[:-1]
def get_topk_gradient(self, *gradients_inputs):
gradients = list(map(lambda x: decompress_top_k_gradient(*x), gradients_inputs))
gradient = self.get_ps_gradient(*gradients)
return gradient