Skip to content

Commit

Permalink
Merge pull request NVIDIA#23 from wjxiz1992/encoding-tool
Browse files Browse the repository at this point in the history
Add encoding tool
  • Loading branch information
GaryShen2008 authored Aug 31, 2020
2 parents 1fb9a28 + 99c9926 commit 5d3a49e
Show file tree
Hide file tree
Showing 18 changed files with 442 additions and 0 deletions.
46 changes: 46 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
### Encoding Tool
This tool is to convert the values from categorical type to numerical type in certain columns. Currently we supoort `mean encoding` and `one-hot encoding`.

### Main Procedure
1. User should firstly use our tool to profile the raw data source to get a "dictinary"(We call this dictionary `model`) that maps categorical values to certain numerical values. We call this method `train`. Each column will have its own `model`
2. User will use the `model` they got from step 1 to replace those categorical values with numerical values.

### Usage
1. `cd encoding/python`
2. `zip -r sample.zip com` to get a python encoding tool library
3. submit the encoding job to your Spark host

You can find full use cases in `encoding-sample/run.sh`

### Application Parameters
- mainClass:

- `com.nvidia.spark.encoding.criteo.one_hot_cpu_main`: one-hot encoding
- `com.nvidia.spark.encoding.criteo.target_cpu_main`: target(mean) encoding
- mode:
- `train`: use raw data to get encoding model
- `transform`: use encoding moddel to convert raw data
- format:
- `csv`: only csv is supported
- columns:
- the target columns user wants to convert, e.g. `_34,_35` means user wants to get dictionary for both `_34` and `_35` columns
- modelPaths:
- for `train` mode, it points to the path where user wants to save the encoding model
- for `transform` mode, it points to the model that the encoding conversion needs.
- it is 1-1 mapped to `columns`. If user wants to encode 2 columns, he must provide 2 `modelPaths`. e.g. `model_34,model_35`
- inputPaths:
- raw data user wants to get encoding model from, or to convert
- outputPaths:
- only used in `transform` mode.
- overwrite:
- whether overwrite the exsiting model or output data
- numRows:
- optinal. show some rows in command line when encoding is finished.
- labelColumn:
- required in `target encoding`. Set the label column of raw data.

### Optimization
1. Due to default behaviors from some Spark methods, Some value may contain useless precison which causes the large size of `model`.e.g. 0.000000 and 1.000000 are identical to 0 and 1 in value perspective, but the csv model file that contains those values costs more disk space. We provide `truncate-model.py` in `encoding-sample` to remove the extra useless precisions.
2. We provide a repartition kit `repartition.py` to reparitition your output data.

The usage can also be found in `encoding-sample/run.sh`
17 changes: 17 additions & 0 deletions tools/encoding-sample/repartition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Note: Plase modify the data source options for your case.

import sys

from pyspark.sql import SparkSession

(SparkSession
.builder
.getOrCreate()
.read
.option('sep', '\t')
.csv(sys.argv[1])
.repartition(int(sys.argv[3]))
.write
.option('sep', '\t')
.option('nullValue', None)
.csv(sys.argv[2]))
57 changes: 57 additions & 0 deletions tools/encoding-sample/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# clear
rm -f encoding.zip main.py
rm -f raw-*.csv
rm -rf model target-* onehot-* final-*

# prepare data
head -n 500 ../../datasets/clicklog.csv > raw-1.csv
head -n 750 ../../datasets/clicklog.csv | tail -n 250 > raw-2.csv
tail -n 250 ../../datasets/clicklog.csv > raw-3.csv

# assemble python libs
pushd ../encoding/python/
zip -r ../../encoding-sample/encoding.zip ai/
cp main.py ../../encoding-sample/
popd

# train target models/dicts
spark-submit --py-files encoding.zip main.py \
--mainClass=com.nvidia.spark.encoding.criteo.target_cpu_main --mode=train \
--format=csv --inputPaths=raw-1.csv,raw-2.csv \
--labelColumn=_c0 --columns=_c34,_c35 --modelPaths=model/c34.dict,model/c35.dict
spark-submit truncate-model.py model/c34.dict model/c34_truncated.dict
spark-submit truncate-model.py model/c35.dict model/c35_truncated.dict

# train onehot models/indexers
spark-submit --py-files encoding.zip main.py \
--mainClass=com.nvidia.spark.encoding.criteo.one_hot_cpu_main --mode=train \
--format=csv --inputPaths=raw-1.csv,raw-2.csv \
--columns=_c19,_c26 --modelPaths=model/_c19,model/_c26

# target encoding
spark-submit --py-files encoding.zip main.py \
--mainClass=com.nvidia.spark.encoding.criteo.target_cpu_main --mode=transform \
--columns=_c34,_c35 --modelPaths=model/c34_truncated.dict,model/c35_truncated.dict \
--format=csv --inputPaths=raw-1.csv,raw-2.csv,raw-3.csv --outputPaths=target-1,target-2,target-3

# onehot encoding
# NOTE: If the column index changed after target encoding, you should change the metadata of all
# models accordingly. E.g., change "outputCol":"_c26_index","inputCol":"_c26" to
# "outputCol":"_c25_index","inputCol":"_c25" for file model/_c26/metadata/part-00000.
# This is verified on Spark 2.x.
spark-submit --py-files encoding.zip main.py \
--mainClass=com.nvidia.spark.encoding.criteo.one_hot_cpu_main --mode=transform \
--columns=_c19,_c26 --modelPaths=model/_c19,model/_c26 \
--format=csv --inputPaths=target-1,target-2,target-3 --outputPaths=onehot-1,onehot-2,onehot-3

# NOTE: As an example, not all categorical columns are encoded here.
# But please encode all categorical columns in production environment.

# repartition <input path> <output path> <number of partitions>
spark-submit repartition.py onehot-1 final-1 5
spark-submit repartition.py onehot-2 final-2 5
spark-submit repartition.py onehot-3 final-3 5

# known issues:
# - Issue: "org.apache.spark.shuffle.FetchFailedException: Too large frame: ...":
# Solution: Add "--conf spark.maxRemoteBlockSizeFetchToMem=1G"
17 changes: 17 additions & 0 deletions tools/encoding-sample/truncate-model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

(SparkSession
.builder
.getOrCreate()
.read
.csv(sys.argv[1])
.withColumn('_c1', format_string('%.6f', col('_c1').cast('float')))
.withColumn('_c1', when(col('_c1') == '0.000000', lit('0.0')).otherwise(col('_c1')))
.withColumn('_c1', when(col('_c1') == '1.000000', lit('1.0')).otherwise(col('_c1')))
.repartition(1)
.write
.option('nullValue', None)
.csv(sys.argv[2]))
1 change: 1 addition & 0 deletions tools/encoding/python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
24 changes: 24 additions & 0 deletions tools/encoding/python/com/nvidia/spark/encoding/criteo/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

def customize_reader(reader):
(reader
.option('sep', '\t'))

def customize_writer(writer):
(writer
.option('sep', '\t')
.option('nullValue', None))
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from com.nvidia.spark.encoding.criteo.common import *
from com.nvidia.spark.encoding.utility.utils import *
from pyspark.ml.feature import StringIndexer, StringIndexerModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def index(df, column):
column_index = column + '_index'
return (StringIndexer(inputCol=column, outputCol=column_index)
.setHandleInvalid('keep')
.fit(df))

def expand(indexer, df, column):
column_index = column + '_index'
df = (indexer
.transform(df)
.withColumn(column_index, col(column_index).cast('int')))
for i in range(0, len(indexer.labels)):
df = df.withColumn(column + '_' + str(i), (col(column_index) == i).cast('int'))
return df.drop(column, column_index)

def main(args):
spark = (SparkSession
.builder
.appName(args.mainClass)
.getOrCreate())

if args.mode == 'train':
df = load_data(spark, args.inputPaths, args, customize_reader).cache()
for column, path in zip(args.columns, args.modelPaths):
indexer = index(df, column)
save_model(indexer, path, args)

if args.mode == 'transform':
indexers = list(zip(args.columns, load_models(StringIndexerModel, args.modelPaths)))
for input_path, output_path in zip(args.inputPaths, args.outputPaths):
df = load_data(spark, input_path, args, customize_reader)
for column, indexer in indexers:
df = expand(indexer, df, column)
args.numRows and df.show(args.numRows)
save_data(df, output_path, args, customize_writer)

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from com.nvidia.spark.encoding.criteo.common import *
from com.nvidia.spark.encoding.utility.utils import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, DoubleType
import time


def get_dict_df(train_df, target_col, label_col):
'''
get one dict dataframe for one column
'''
col_target_df = train_df.groupBy(target_col).agg(F.mean(label_col))
return col_target_df

def encode_df(original_df, dict_df, col_name):
dict_df_rename = dict_df.withColumnRenamed('_c0', 'hash').withColumnRenamed('_c1', col_name+'_mean')
df_mean = (original_df.join(dict_df_rename, original_df[col_name] == dict_df_rename['hash'], how='left').drop('hash').drop(col_name)
.na.fill(-1, [col_name + '_mean']))
return df_mean


def main(args):
spark = (SparkSession
.builder
.appName(args.mainClass)
.getOrCreate())
if args.mode == 'train':
for col_name, model_path in zip(args.columns, args.modelPaths):
df = load_data(spark, args.inputPaths, args, customize_reader).cache()
dict_df = get_dict_df(df, col_name, args.labelColumn)
dict_df.repartition(1).write.csv(model_path)

if args.mode == 'transform':
dict_dfs = [
load_dict_df(spark, path).withColumn('_c1', F.col('_c1').cast(DoubleType())).cache()
for path in args.modelPaths
]
for input_path, output_path in zip(args.inputPaths, args.outputPaths):
df = load_data(spark, input_path, args, customize_reader)
for col_name, dict_df in zip(args.columns, dict_dfs):
df = encode_df(df, dict_df, col_name)
save_data(df, output_path, args, customize_writer)
21 changes: 21 additions & 0 deletions tools/encoding/python/com/nvidia/spark/encoding/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from com.nvidia.spark.encoding.utility.args import parse_arguments
from importlib import import_module

def main():
args = parse_arguments()
getattr(import_module(args.mainClass), 'main')(args)
Empty file.
66 changes: 66 additions & 0 deletions tools/encoding/python/com/nvidia/spark/encoding/utility/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys

from argparse import ArgumentParser
from distutils.util import strtobool

def _to_bool(literal):
return bool(strtobool(literal))

def _to_str_list(literal):
return [x for x in literal.split(',') if x]

_examples = [
'com.nvidia.spark.encoding.criteo.one_hot_cpu_main',
'com.nvidia.spark.encoding.criteo.target_cpu_main'
]

def _validate_args(args):
usage = ''
if args.mode == 'transform' and not args.outputPaths:
usage += ' --outputPaths required for transform.\n'
# for production:
# validates that --columns and --inputPaths exists
# validates that --inputPath and --outputPath matches for transform
if (args.mainClass == 'com.nvidia.spark.encoding.criteo.target_cpu_main'
and args.mode == 'train'
and not args.labelColumn):
usage += ' --labelColumn required for target encoding. \n'
if usage:
print('-' * 80)
print('Usage:\n' + usage)
sys.exit(1)

def parse_arguments():
parser = ArgumentParser()

# application arguments
parser.add_argument('--mainClass', required=True, choices=_examples)
parser.add_argument('--mode', choices=['train', 'transform'], required=True)
parser.add_argument('--format', choices=['csv'], default='csv')
parser.add_argument('--columns', type=_to_str_list, required=True)
parser.add_argument('--modelPaths', type=_to_str_list, required=True)
parser.add_argument('--inputPaths', type=_to_str_list, required=True)
parser.add_argument('--outputPaths', type=_to_str_list) # for transform, required
parser.add_argument('--overwrite', type=_to_bool, default=False)
parser.add_argument('--numRows', type=int) # for transform, optional
parser.add_argument('--labelColumn', help='name of the label column') # for target encoding, required

parsed = parser.parse_args()
_validate_args(parsed)

return parsed
Loading

0 comments on commit 5d3a49e

Please sign in to comment.