Skip to content

Commit

Permalink
Add out_span option for better GPU utilization (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
tushuhei authored Nov 11, 2022
1 parent ab7b522 commit babb428
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 54 deletions.
5 changes: 4 additions & 1 deletion scripts/build_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ def rollup(weights_filename: str,
"""
decision_trees: typing.Dict[str, float] = dict()
with open(weights_filename) as f:
for row in f:
for row in f.readlines():
row = row.strip()
if not row:
continue
feature = row.split('\t')[0]
score = float(row.split('\t')[1])
decision_trees.setdefault(feature, 0)
Expand Down
142 changes: 93 additions & 49 deletions scripts/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import numpy as jnp # type: ignore

EPS = np.finfo(float).eps # type: np.floating[typing.Any]
DEFAULT_OUTPUT_NAME = 'weights.txt'
DEFAULT_LOG_NAME = 'train.log'
DEFAULT_FEATURE_THRES = 10
DEFAULT_ITERATION = 10000
DEFAULT_OUT_SPAN = 100
ArgList = typing.Optional[typing.List[str]]


class Result(NamedTuple):
Expand Down Expand Up @@ -168,6 +174,7 @@ def fit(X_train: npt.NDArray[np.bool_],
iters: int,
weights_filename: str,
log_filename: str,
out_span: int,
chunk_size: typing.Optional[int] = None) -> typing.Dict[int, float]:
"""Trains an AdaBoost classifier.
Expand All @@ -180,20 +187,23 @@ def fit(X_train: npt.NDArray[np.bool_],
iters (int): A number of training iterations.
weights_filename (str): A file path to write the learned weights.
log_filename (str): A file path to log the accuracy along with training.
chunk_size (Optional[int]): A chunk size to split training entries into chunks for memory reduction
when calculating AdaBoost's weighted training error.
out_span (int): Iteration span to output metics and weights.
chunk_size (Optional[int]): A chunk size to split training entries for
memory efficiency.
Returns:
phi (Dict[int, float]): Learned child classifiers.
"""
with open(weights_filename, 'w') as f:
f.write('')
with open(log_filename, 'w') as f:
f.write('train_accuracy\ttrain_precision\ttrain_recall\ttrain_fscore\t'
'test_accuracy\ttest_precision\ttest_recall\ttest_fscore\n')
f.write(
'iter\ttrain_accuracy\ttrain_precision\ttrain_recall\ttrain_fscore\t'
'test_accuracy\ttest_precision\ttest_recall\ttest_fscore\n')
print('Outputting learned weights to %s ...' % (weights_filename))

phis: typing.Dict[int, float] = dict()
phi_buffer: typing.List[typing.Tuple[str, float]] = []

assert (X_train.shape[1] == X_test.shape[1]
), 'Training and test entries should have the same number of features.'
Expand All @@ -212,38 +222,19 @@ def fit(X_train: npt.NDArray[np.bool_],
N_train, M_train = X_train.shape
w = jnp.ones(N_train) / N_train
YX_train = Y_train[:, None] ^ X_train
for t in range(iters):
print('=== %s ===' % (t))
if chunk_size is None:
res: npt.NDArray[np.float64] = w.dot(YX_train)
else:
res = np.zeros(M_train)
for i in range(0, N_train, chunk_size):
YX_train_chunk = YX_train[i:i + chunk_size]
w_chunk = w[i:i + chunk_size]
res += w_chunk.dot(YX_train_chunk)
err = 0.5 - jnp.abs(res - 0.5)
m_best = int(err.argmin())
pol_best = res[m_best] < 0.5
err_min = err[m_best]
print('min error:\t%.5f' % err_min)
print('best tree:\t%d' % m_best)
print()
alpha = jnp.log((1 - err_min) / (err_min + EPS))
phis.setdefault(m_best, 0)
phis[m_best] += alpha if pol_best else -alpha
miss = YX_train[:, m_best]
if not pol_best:
miss = ~(miss)
w = w * jnp.exp(alpha * miss)
w = w / w.sum()

def output_progress(t: int) -> None:
with open(weights_filename, 'a') as f:
feature = features[m_best] if m_best < len(features) else 'BIAS'
f.write('%s\t%.3f\n' % (feature, alpha if pol_best else -alpha))
f.write('\n'.join('%s\t%.3f' % p for p in phi_buffer) + '\n')
phi_buffer.clear()
pred_train = jit(pred)(phis, X_train) if jax_ready else pred(phis, X_train)
pred_test = jit(pred)(phis, X_test) if jax_ready else pred(phis, X_test)
metrics_train = get_metrics(pred_train, Y_train)
metrics_test = get_metrics(pred_test, Y_test)
print('=== %s ===' % t)
print('min error:\t%.5f' % err_min)
print('best tree:\t%d' % m_best)
print()
print('train accuracy:\t%.5f' % metrics_train.accuracy)
print('train prec.:\t%.5f' % metrics_train.precision)
print('train recall:\t%.5f' % metrics_train.recall)
Expand All @@ -255,7 +246,8 @@ def fit(X_train: npt.NDArray[np.bool_],
print('test fscore:\t%.5f' % metrics_test.fscore)
print()
with open(log_filename, 'a') as f:
f.write('%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\n' % (
f.write('%d\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\t%.5f\n' % (
t,
metrics_train.accuracy,
metrics_train.precision,
metrics_train.recall,
Expand All @@ -265,51 +257,103 @@ def fit(X_train: npt.NDArray[np.bool_],
metrics_test.recall,
metrics_test.fscore,
))

for t in range(iters):
if chunk_size is None:
res: npt.NDArray[np.float64] = w.dot(YX_train)
else:
res = np.zeros(M_train)
for i in range(0, N_train, chunk_size):
YX_train_chunk = YX_train[i:i + chunk_size]
w_chunk = w[i:i + chunk_size]
res += w_chunk.dot(YX_train_chunk)
err = 0.5 - jnp.abs(res - 0.5)
m_best = int(err.argmin())
pol_best = res[m_best] < 0.5
err_min: float = err[m_best]

alpha: float = jnp.log((1 - err_min) / (err_min + EPS))
phis.setdefault(m_best, 0)
phis[m_best] += alpha if pol_best else -alpha
miss = YX_train[:, m_best]
if not pol_best:
miss = ~(miss)
w = w * jnp.exp(alpha * miss)
w = w / w.sum()
feature = features[m_best] if m_best < len(features) else 'BIAS'
phi_buffer.append((feature, alpha if pol_best else -alpha))
if (t + 1) % out_span == 0:
output_progress(t + 1)
if len(phi_buffer) > 0:
output_progress(t + 1)

return phis


def parse_args() -> argparse.Namespace:
def parse_args(test: ArgList = None) -> argparse.Namespace:
"""Parses commandline arguments.
Args:
test (typing.Optional[typing.List[str]], optional): Commandline args for
testing. Defaults to None.
Returns:
argparse.Namespace: Parsed data of args.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'encoded_train_data', help='File path for the encoded training data.')
parser.add_argument(
'-o',
'--output',
help='Output file path for the learned weights. (default: weights.txt)',
default='weights.txt')
help=f'Output file path for the learned weights. (default: {DEFAULT_OUTPUT_NAME})',
type=str,
default=DEFAULT_OUTPUT_NAME)
parser.add_argument(
'--log',
help='Output file path for the training log. (default: train.log)',
default='train.log')
help=f'Output file path for the training log. (default: {DEFAULT_LOG_NAME})',
type=str,
default=DEFAULT_LOG_NAME)
parser.add_argument(
'--feature-thres',
help='Threshold value of the minimum feature frequency. (default: 10)',
default=10)
help=f'Threshold value of the minimum feature frequency. (default: {DEFAULT_FEATURE_THRES})',
type=int,
default=DEFAULT_FEATURE_THRES)
parser.add_argument(
'--iter',
help='Number of iterations for training. (default: 10000)',
default=10000)
help=f'Number of iterations for training. (default: {DEFAULT_ITERATION})',
type=int,
default=DEFAULT_ITERATION)
parser.add_argument(
'--out-span',
help=f'Iteration span to output metrics and weights. (default: {DEFAULT_OUT_SPAN})',
type=int,
default=DEFAULT_OUT_SPAN)
parser.add_argument(
'--chunk-size',
help='A chunk size to split training entries into chunks for memory reduction when calculating AdaBoost\'s weighted training error.'
)

return parser.parse_args()
type=int,
help='A chunk size to split training entries for memory efficiency. (default: None)',
default=None)
if test is None:
return parser.parse_args()
else:
return parser.parse_args(test)


def main() -> None:
args = parse_args()
train_data_filename = args.encoded_train_data
weights_filename = args.output
log_filename = args.log
train_data_filename: str = args.encoded_train_data
weights_filename: str = args.output
log_filename: str = args.log
feature_thres = int(args.feature_thres)
iterations = int(args.iter)
out_span = int(args.out_span)
chunk_size = int(args.chunk_size) if args.chunk_size is not None else None

X, Y, features = preprocess(train_data_filename, feature_thres)
X_train, X_test, Y_train, Y_test = split_dataset(X, Y)
fit(X_train, Y_train, X_test, Y_test, features, iterations, weights_filename,
log_filename, chunk_size)
log_filename, out_span, chunk_size)

print('Training done. Export the model by passing %s to build_model.py' %
(weights_filename))
Expand Down
74 changes: 70 additions & 4 deletions tests/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Tests the training script."""

import math
import os
import sys
import unittest
Expand All @@ -35,6 +36,51 @@
os.path.join(os.path.dirname(__file__), 'train_test.log'))


class TestArgParse(unittest.TestCase):

def test_cmdargs_invalid_option(self) -> None:
cmdargs = ['-v']
with self.assertRaises(SystemExit) as cm:
train.parse_args(cmdargs)
self.assertEqual(cm.exception.code, 2)

def test_cmdargs_help(self) -> None:
cmdargs = ['-h']
with self.assertRaises(SystemExit) as cm:
train.parse_args(cmdargs)
self.assertEqual(cm.exception.code, 0)

def test_cmdargs_no_data(self) -> None:
with self.assertRaises(SystemExit) as cm:
train.parse_args([])
self.assertEqual(cm.exception.code, 2)

def test_cmdargs_default(self) -> None:
cmdargs = ['encoded.txt']
output = train.parse_args(cmdargs)
self.assertEqual(output.encoded_train_data, 'encoded.txt')
self.assertEqual(output.output, train.DEFAULT_OUTPUT_NAME)
self.assertEqual(output.log, train.DEFAULT_LOG_NAME)
self.assertEqual(output.feature_thres, train.DEFAULT_FEATURE_THRES)
self.assertEqual(output.iter, train.DEFAULT_ITERATION)
self.assertEqual(output.out_span, train.DEFAULT_OUT_SPAN)
self.assertEqual(output.chunk_size, None)

def test_cmdargs_full(self) -> None:
cmdargs = [
'encoded.txt', '-o', 'out.txt', '--log', 'foo.log', '--feature-thres',
'100', '--iter', '10', '--chunk-size', '1000', '--out-span', '50'
]
output = train.parse_args(cmdargs)
self.assertEqual(output.encoded_train_data, 'encoded.txt')
self.assertEqual(output.output, 'out.txt')
self.assertEqual(output.log, 'foo.log')
self.assertEqual(output.feature_thres, 100)
self.assertEqual(output.iter, 10)
self.assertEqual(output.chunk_size, 1000)
self.assertEqual(output.out_span, 50)


class TestTrain(unittest.TestCase):

def setUp(self) -> None:
Expand Down Expand Up @@ -148,13 +194,33 @@ def test_fit(self) -> None:
True,
])
features = ['a', 'b', 'c']
iters = 1
train.fit(X, Y, X, Y, features, iters, WEIGHTS_FILE_PATH, LOG_FILE_PATH)
iters = 5
out_span = 2
train.fit(X, Y, X, Y, features, iters, WEIGHTS_FILE_PATH, LOG_FILE_PATH,
out_span)
with open(WEIGHTS_FILE_PATH) as f:
weights = f.read().splitlines()
top_feature = weights[0].split('\t')[0]
weights = [
line.split('\t') for line in f.read().splitlines() if line.strip()
]
top_feature = weights[0][0]
self.assertEqual(
top_feature, 'b', msg='The most effective feature should be selected.')
self.assertEqual(
len(weights),
iters,
msg='The number of lines should equal to the iteration count.')

with open(LOG_FILE_PATH) as f:
log = [line.split('\t') for line in f.read().splitlines() if line.strip()]
self.assertEqual(
len(log),
math.ceil(iters / out_span) + 1,
msg='The number of lines should equal to the ceil of iteration / out_span plus one for the header'
)
self.assertEqual(
len(set(len(line) for line in log)),
1,
msg='The header and the body should have the same number of columns.')

def tearDown(self) -> None:
os.remove(WEIGHTS_FILE_PATH)
Expand Down

0 comments on commit babb428

Please sign in to comment.