From 227ac2eac008aa633985c41e0021b4421690713d Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Tue, 22 Nov 2022 16:48:28 +0800 Subject: [PATCH 1/9] update docs --- docs/post_fix.py | 15 ++++++++ docs/source/feature/data.md | 47 ++++++++++++++++++++++--- easy_rec/python/test/train_eval_test.py | 2 +- scripts/build_docs.sh | 4 +++ setup.cfg | 2 +- 5 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 docs/post_fix.py diff --git a/docs/post_fix.py b/docs/post_fix.py new file mode 100644 index 000000000..ce279bed8 --- /dev/null +++ b/docs/post_fix.py @@ -0,0 +1,15 @@ +# -*- encoding:utf-8 -*- +import sys + +lines = [] +with open(sys.argv[1], 'r') as fin: + for line_str in fin: + lines.append(line_str) + +with open(sys.argv[1], 'w') as fout: + for line_str in lines: + if '_static/searchtools.js' in line_str: + fout.write( + ' \n' + ) + fout.write(line_str) diff --git a/docs/source/feature/data.md b/docs/source/feature/data.md index 20143f1b1..ec62a5000 100644 --- a/docs/source/feature/data.md +++ b/docs/source/feature/data.md @@ -62,16 +62,53 @@ input_fields字段: ### input_type: -目前支持一下几种input_type: +目前支持一下几种[input_type](../proto.html#protos.DatasetConfig.InputType): - CSVInput,表示数据格式是CSV,注意要配合separator使用 + + - 需要指定train_input_path和eval_input_path + + ```protobuf + train_input_path: "data/test/dwd_avazu_ctr_train.csv" + eval_input_path: "data/test/dwd_avazu_ctr_test.csv" + ``` + - OdpsInputV2,如果在MaxCompute上运行EasyRec, 则使用OdpsInputV2 -- OdpsInputV3, 如果在本地或者EMR上访问MaxCompute Table, 则使用OdpsInputV3 + + - 需要指定train_input_path和eval_input_path + - 可以通过pai命令传入, [参考](../train.md#on-pai) + +- OdpsInputV3, 如果在本地或者[DataScience](https://help.aliyun.com/document_detail/170836.html)上访问MaxCompute Table, 则使用OdpsInputV3 + +- HiveInput和HiveParquetInput, 在Hadoop集群上访问Hive表 + + - 需要配置hive_train_input和hive_eval_input + - 参考[HiveConfig](../proto.html#protos.HiveConfig) + + ```protobuf + hive_train_input { + host: "192.168.1" + username: "admin" + table_name: "census_income_train_simple" + } + hive_eval_input { + host: "192.168.1" + username: "admin" + table_name: "census_income_eval_simple" + } + ``` + - 如果需要使用RTP FG, 那么: - - 在EMR或者本地运行EasyRec,应使用RTPInput; + + - 在EMR或者本地运行EasyRec,应使用RTPInput或者HiveRTPInput; - 在Odps上运行,则应使用OdpsRTPInput -- KafkaInput & DatahubInput - - 实时训练需要用到的input类型 + +- KafkaInput & DatahubInput: [实时训练](../online_train.md)需要用到的input类型 + + - KafkaInput需要配置kafka_train_input 和 kafka_eval_input + - 参考[KafkaServer](../proto.html#protos.KafkaServer) + - DatahubServer需要配置datahub_train_input 和 datahub_eval_input + - 参考[DataHubServer](../proto.html#protos.DatahubServer) ### separator: diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index acfe81b5a..2c9fc2d14 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -7,11 +7,11 @@ import threading import time import unittest +from distutils.version import LooseVersion import numpy as np import six import tensorflow as tf -from distutils.version import LooseVersion from tensorflow.python.platform import gfile from easy_rec.python.main import predict diff --git a/scripts/build_docs.sh b/scripts/build_docs.sh index 929df6044..7d880270c 100644 --- a/scripts/build_docs.sh +++ b/scripts/build_docs.sh @@ -15,3 +15,7 @@ cd docs rm -rf build make html rm -rf build/html/_modules + +python post_fix.py build/html/search.html + +echo "view docs: python -m http.server --directory=docs/build/html/ 8081" diff --git a/setup.cfg b/setup.cfg index b5b966faa..b180b9fb1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,7 +10,7 @@ multi_line_output = 7 force_single_line = true known_standard_library = setuptools known_first_party = easy_rec -known_third_party = absl,common_io,distutils,docutils,future,google,graphlearn,kafka,matplotlib,numpy,oss2,pai,pandas,psutil,six,sklearn,sphinx_markdown_tables,sphinx_rtd_theme,tensorflow,yaml +known_third_party = absl,common_io,docutils,future,google,graphlearn,kafka,matplotlib,numpy,oss2,pai,pandas,psutil,six,sklearn,sphinx_markdown_tables,sphinx_rtd_theme,tensorflow,yaml no_lines_before = LOCALFOLDER default_section = THIRDPARTY skip = easy_rec/python/protos From 330e1f175c0bacd4c64d59678071b18d4f376cf1 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Wed, 23 Nov 2022 00:21:29 +0800 Subject: [PATCH 2/9] fix share input bug --- .../feature_column/feature_column_v2.py | 165 ++-- .../feature_column/sequence_feature_column.py | 38 +- .../python/feature_column/feature_column.py | 121 ++- easy_rec/python/input/input.py | 739 +++++++++--------- easy_rec/python/model/easy_rec_estimator.py | 1 - .../deepfm_combo_on_avazu_feature_name.config | 396 ++++++++++ 6 files changed, 924 insertions(+), 536 deletions(-) create mode 100644 samples/model_config/deepfm_combo_on_avazu_feature_name.config diff --git a/easy_rec/python/compat/feature_column/feature_column_v2.py b/easy_rec/python/compat/feature_column/feature_column_v2.py index 23757669c..6703738c5 100644 --- a/easy_rec/python/compat/feature_column/feature_column_v2.py +++ b/easy_rec/python/compat/feature_column/feature_column_v2.py @@ -1411,7 +1411,8 @@ def bucketized_column(source_column, boundaries): def categorical_column_with_hash_bucket(key, hash_bucket_size, - dtype=dtypes.string): + dtype=dtypes.string, + feature_name=None): """Represents sparse feature where ids are set by hashing. Use this when your sparse features are in string or integer format, and you @@ -1464,97 +1465,7 @@ def categorical_column_with_hash_bucket(key, fc_utils.assert_key_is_string(key) fc_utils.assert_string_or_int(dtype, prefix='column_name: {}'.format(key)) - return HashedCategoricalColumn(key, hash_bucket_size, dtype) - - -def categorical_column_with_vocabulary_file(key, - vocabulary_file, - vocabulary_size=None, - num_oov_buckets=0, - default_value=None, - dtype=dtypes.string): - """A `CategoricalColumn` with a vocabulary file. - - Use this when your inputs are in string or integer format, and you have a - vocabulary file that maps each value to an integer ID. By default, - out-of-vocabulary values are ignored. Use either (but not both) of - `num_oov_buckets` and `default_value` to specify how to include - out-of-vocabulary values. - - For input dictionary `features`, `features[key]` is either `Tensor` or - `SparseTensor`. If `Tensor`, missing values can be represented by `-1` for int - and `''` for string, which will be dropped by this feature column. - - Example with `num_oov_buckets`: - File '/us/states.txt' contains 50 lines, each with a 2-character U.S. state - abbreviation. All inputs with values in that file are assigned an ID 0-49, - corresponding to its line number. All other values are hashed and assigned an - ID 50-54. - - ```python - states = categorical_column_with_vocabulary_file( - key='states', vocabulary_file='/us/states.txt', vocabulary_size=50, - num_oov_buckets=5) - columns = [states, ...] - features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) - linear_prediction = linear_model(features, columns) - ``` - - Example with `default_value`: - File '/us/states.txt' contains 51 lines - the first line is 'XX', and the - other 50 each have a 2-character U.S. state abbreviation. Both a literal 'XX' - in input, and other values missing from the file, will be assigned ID 0. All - others are assigned the corresponding line number 1-50. - - ```python - states = categorical_column_with_vocabulary_file( - key='states', vocabulary_file='/us/states.txt', vocabulary_size=51, - default_value=0) - columns = [states, ...] - features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) - linear_prediction, _, _ = linear_model(features, columns) - ``` - - And to make an embedding with either: - - ```python - columns = [embedding_column(states, 3),...] - features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) - dense_tensor = input_layer(features, columns) - ``` - - Args: - key: A unique string identifying the input feature. It is used as the - column name and the dictionary key for feature parsing configs, feature - `Tensor` objects, and feature columns. - vocabulary_file: The vocabulary file name. - vocabulary_size: Number of the elements in the vocabulary. This must be no - greater than length of `vocabulary_file`, if less than length, later - values are ignored. If None, it is set to the length of `vocabulary_file`. - num_oov_buckets: Non-negative integer, the number of out-of-vocabulary - buckets. All out-of-vocabulary inputs will be assigned IDs in the range - `[vocabulary_size, vocabulary_size+num_oov_buckets)` based on a hash of - the input value. A positive `num_oov_buckets` can not be specified with - `default_value`. - default_value: The integer ID value to return for out-of-vocabulary feature - values, defaults to `-1`. This can not be specified with a positive - `num_oov_buckets`. - dtype: The type of features. Only string and integer types are supported. - - Returns: - A `CategoricalColumn` with a vocabulary file. - - Raises: - ValueError: `vocabulary_file` is missing or cannot be opened. - ValueError: `vocabulary_size` is missing or < 1. - ValueError: `num_oov_buckets` is a negative integer. - ValueError: `num_oov_buckets` and `default_value` are both specified. - ValueError: `dtype` is neither string nor integer. - """ - return categorical_column_with_vocabulary_file_v2(key, vocabulary_file, - vocabulary_size, dtype, - default_value, - num_oov_buckets) + return HashedCategoricalColumn(feature_name, key, hash_bucket_size, dtype) def categorical_column_with_vocabulary_file_v2(key, @@ -1562,7 +1473,8 @@ def categorical_column_with_vocabulary_file_v2(key, vocabulary_size=None, dtype=dtypes.string, default_value=None, - num_oov_buckets=0): + num_oov_buckets=0, + feature_name=None): """A `CategoricalColumn` with a vocabulary file. Use this when your inputs are in string or integer format, and you have a @@ -1668,6 +1580,7 @@ def categorical_column_with_vocabulary_file_v2(key, fc_utils.assert_string_or_int(dtype, prefix='column_name: {}'.format(key)) fc_utils.assert_key_is_string(key) return VocabularyFileCategoricalColumn( + feature_name=feature_name, key=key, vocabulary_file=vocabulary_file, vocabulary_size=vocabulary_size, @@ -1680,7 +1593,8 @@ def categorical_column_with_vocabulary_list(key, vocabulary_list, dtype=None, default_value=-1, - num_oov_buckets=0): + num_oov_buckets=0, + feature_name=None): """A `CategoricalColumn` with in-memory vocabulary. Use this when your inputs are in string or integer format, and you have an @@ -1785,6 +1699,7 @@ def categorical_column_with_vocabulary_list(key, fc_utils.assert_key_is_string(key) return VocabularyListCategoricalColumn( + feature_name=feature_name, key=key, vocabulary_list=tuple(vocabulary_list), dtype=dtype, @@ -1792,7 +1707,10 @@ def categorical_column_with_vocabulary_list(key, num_oov_buckets=num_oov_buckets) -def categorical_column_with_identity(key, num_buckets, default_value=None): +def categorical_column_with_identity(key, + num_buckets, + default_value=None, + feature_name=None): """A `CategoricalColumn` that returns identity values. Use this when your inputs are integers in the range `[0, num_buckets)`, and @@ -1856,7 +1774,10 @@ def categorical_column_with_identity(key, num_buckets, default_value=None): default_value, num_buckets, key)) fc_utils.assert_key_is_string(key) return IdentityCategoricalColumn( - key=key, number_buckets=num_buckets, default_value=default_value) + feature_name=feature_name, + key=key, + number_buckets=num_buckets, + default_value=default_value) def indicator_column(categorical_column): @@ -1968,7 +1889,7 @@ def weighted_categorical_column(categorical_column, dtype=dtype) -def crossed_column(keys, hash_bucket_size, hash_key=None): +def crossed_column(keys, hash_bucket_size, hash_key=None, feature_name=None): """Returns a column for performing crosses of categorical features. Crossed features will be hashed according to `hash_bucket_size`. Conceptually, @@ -2092,7 +2013,10 @@ def crossed_column(keys, hash_bucket_size, hash_key=None): 'Hashing before crossing will increase probability of collision. ' 'Instead, use the feature name as a string. Given: {}'.format(key)) return CrossedColumn( - keys=tuple(keys), hash_bucket_size=hash_bucket_size, hash_key=hash_key) + feature_name=feature_name, + keys=tuple(keys), + hash_bucket_size=hash_bucket_size, + hash_key=hash_key) @six.add_metaclass(abc.ABCMeta) @@ -2696,9 +2620,9 @@ def _normalize_feature_columns(feature_columns): class NumericColumn( DenseColumn, fc_old._DenseColumn, # pylint: disable=protected-access - collections.namedtuple( - 'NumericColumn', - ('key', 'shape', 'default_value', 'dtype', 'normalizer_fn'))): + collections.namedtuple('NumericColumn', + ('feature_name', 'key', 'shape', 'default_value', + 'dtype', 'normalizer_fn'))): """see `numeric_column`.""" @property @@ -2708,7 +2632,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): @@ -3927,7 +3851,8 @@ class HashedCategoricalColumn( CategoricalColumn, fc_old._CategoricalColumn, # pylint: disable=protected-access collections.namedtuple('HashedCategoricalColumn', - ('key', 'hash_bucket_size', 'dtype'))): + ('feature_name', 'key', 'hash_bucket_size', 'dtype')) +): """see `categorical_column_with_hash_bucket`.""" @property @@ -3937,7 +3862,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): @@ -4042,9 +3967,10 @@ def _from_config(cls, config, custom_objects=None, columns_by_name=None): class VocabularyFileCategoricalColumn( CategoricalColumn, fc_old._CategoricalColumn, # pylint: disable=protected-access - collections.namedtuple('VocabularyFileCategoricalColumn', - ('key', 'vocabulary_file', 'vocabulary_size', - 'num_oov_buckets', 'dtype', 'default_value'))): + collections.namedtuple( + 'VocabularyFileCategoricalColumn', + ('feature_name', 'key', 'vocabulary_file', 'vocabulary_size', + 'num_oov_buckets', 'dtype', 'default_value'))): """See `categorical_column_with_vocabulary_file`.""" @property @@ -4054,7 +3980,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): @@ -4160,10 +4086,9 @@ def _from_config(cls, config, custom_objects=None, columns_by_name=None): class VocabularyListCategoricalColumn( CategoricalColumn, fc_old._CategoricalColumn, # pylint: disable=protected-access - collections.namedtuple( - 'VocabularyListCategoricalColumn', - ('key', 'vocabulary_list', 'dtype', 'default_value', 'num_oov_buckets')) -): + collections.namedtuple('VocabularyListCategoricalColumn', + ('feature_name', 'key', 'vocabulary_list', 'dtype', + 'default_value', 'num_oov_buckets'))): """See `categorical_column_with_vocabulary_list`.""" @property @@ -4173,7 +4098,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): @@ -4278,8 +4203,9 @@ def _from_config(cls, config, custom_objects=None, columns_by_name=None): class IdentityCategoricalColumn( CategoricalColumn, fc_old._CategoricalColumn, # pylint: disable=protected-access - collections.namedtuple('IdentityCategoricalColumn', - ('key', 'number_buckets', 'default_value'))): + collections.namedtuple( + 'IdentityCategoricalColumn', + ('feature_name', 'key', 'number_buckets', 'default_value'))): """See `categorical_column_with_identity`.""" @property @@ -4289,7 +4215,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): @@ -4528,8 +4454,9 @@ def _from_config(cls, config, custom_objects=None, columns_by_name=None): class CrossedColumn( CategoricalColumn, fc_old._CategoricalColumn, # pylint: disable=protected-access - collections.namedtuple('CrossedColumn', - ('keys', 'hash_bucket_size', 'hash_key'))): + collections.namedtuple( + 'CrossedColumn', + ('feature_name', 'keys', 'hash_bucket_size', 'hash_key'))): """See `crossed_column`.""" @property @@ -4546,6 +4473,8 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" + if self.feature_name: + return self.feature_name feature_names = [] for key in _collect_leaf_level_keys(self): if isinstance(key, (FeatureColumn, fc_old._FeatureColumn)): # pylint: disable=protected-access diff --git a/easy_rec/python/compat/feature_column/sequence_feature_column.py b/easy_rec/python/compat/feature_column/sequence_feature_column.py index f4994103c..b0fcdc9f7 100644 --- a/easy_rec/python/compat/feature_column/sequence_feature_column.py +++ b/easy_rec/python/compat/feature_column/sequence_feature_column.py @@ -193,7 +193,8 @@ def concatenate_context_input(context_input, sequence_input): def sequence_categorical_column_with_identity(key, num_buckets, - default_value=None): + default_value=None, + feature_name=None): """Returns a feature column that represents sequences of integers. Pass this to `embedding_column` or `indicator_column` to convert sequence @@ -235,7 +236,10 @@ def sequence_categorical_column_with_identity(key, """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_identity( - key=key, num_buckets=num_buckets, default_value=default_value)) + feature_name=feature_name, + key=key, + num_buckets=num_buckets, + default_value=default_value)) def sequence_numeric_column_with_bucketized_column(source_column, boundaries): @@ -281,7 +285,8 @@ def sequence_weighted_categorical_column(categorical_column, def sequence_categorical_column_with_hash_bucket(key, hash_bucket_size, - dtype=dtypes.string): + dtype=dtypes.string, + feature_name=None): """A sequence of categorical terms where ids are set by hashing. Pass this to `embedding_column` or `indicator_column` to convert sequence @@ -320,7 +325,10 @@ def sequence_categorical_column_with_hash_bucket(key, """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_hash_bucket( - key=key, hash_bucket_size=hash_bucket_size, dtype=dtype)) + feature_name=feature_name, + key=key, + hash_bucket_size=hash_bucket_size, + dtype=dtype)) def sequence_categorical_column_with_vocabulary_file(key, @@ -328,7 +336,8 @@ def sequence_categorical_column_with_vocabulary_file(key, vocabulary_size=None, num_oov_buckets=0, default_value=None, - dtype=dtypes.string): + dtype=dtypes.string, + feature_name=None): """A sequence of categorical terms where ids use a vocabulary file. Pass this to `embedding_column` or `indicator_column` to convert sequence @@ -382,6 +391,7 @@ def sequence_categorical_column_with_vocabulary_file(key, """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_vocabulary_file( + feature_name=feature_name, key=key, vocabulary_file=vocabulary_file, vocabulary_size=vocabulary_size, @@ -394,7 +404,8 @@ def sequence_categorical_column_with_vocabulary_list(key, vocabulary_list, dtype=None, default_value=-1, - num_oov_buckets=0): + num_oov_buckets=0, + feature_name=None): """A sequence of categorical terms where ids use an in-memory list. Pass this to `embedding_column` or `indicator_column` to convert sequence @@ -447,6 +458,7 @@ def sequence_categorical_column_with_vocabulary_list(key, """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_vocabulary_list( + feature_name=feature_name, key=key, vocabulary_list=vocabulary_list, dtype=dtype, @@ -458,7 +470,8 @@ def sequence_numeric_column(key, shape=(1,), default_value=0., dtype=dtypes.float32, - normalizer_fn=None): + normalizer_fn=None, + feature_name=None): """Returns a feature column that represents sequences of numeric data. Example: @@ -508,7 +521,8 @@ def sequence_numeric_column(key, 'normalizer_fn must be a callable. Given: {}'.format(normalizer_fn)) return SequenceNumericColumn( - key, + feature_name=feature_name, + key=key, shape=shape, default_value=default_value, dtype=dtype, @@ -529,9 +543,9 @@ def _assert_all_equal_and_return(tensors, name=None): class SequenceNumericColumn( fc.SequenceDenseColumn, fc_v1._FeatureColumn, - collections.namedtuple( - 'SequenceNumericColumn', - ('key', 'shape', 'default_value', 'dtype', 'normalizer_fn'))): + collections.namedtuple('SequenceNumericColumn', + ('feature_name', 'key', 'shape', 'default_value', + 'dtype', 'normalizer_fn'))): """Represents sequences of numeric data.""" @property @@ -541,7 +555,7 @@ def _is_v2_column(self): @property def name(self): """See `FeatureColumn` base class.""" - return self.key + return self.feature_name if self.feature_name else self.key @property def raw_name(self): diff --git a/easy_rec/python/feature_column/feature_column.py b/easy_rec/python/feature_column/feature_column.py index 94c6e9c52..3f63a944a 100644 --- a/easy_rec/python/feature_column/feature_column.py +++ b/easy_rec/python/feature_column/feature_column.py @@ -4,6 +4,8 @@ import logging import tensorflow as tf +from tensorflow.python.ops import partitioned_variables +from tensorflow.python.platform import gfile from easy_rec.python.builders import hyperparams_builder from easy_rec.python.compat.feature_column import sequence_feature_column @@ -13,12 +15,6 @@ from easy_rec.python.compat.feature_column import feature_column_v2 as feature_column # NOQA -if tf.__version__ >= '2.0': - min_max_variable_partitioner = tf.compat.v1.min_max_variable_partitioner - tf = tf.compat.v1 -else: - min_max_variable_partitioner = tf.min_max_variable_partitioner - MAX_HASH_BUCKET_SIZE = 9223372036854775807 @@ -228,7 +224,7 @@ def is_deep(self, config): def _get_vocab_size(self, vocab_path): if vocab_path in self._vocab_size: return self._vocab_size[vocab_path] - with tf.gfile.GFile(vocab_path, 'r') as fin: + with gfile.GFile(vocab_path, 'r') as fin: vocabulary_size = sum(1 for _ in fin) self._vocab_size[vocab_path] = vocabulary_size return vocabulary_size @@ -251,24 +247,33 @@ def parse_id_feature(self, config): Args: config: instance of easy_rec.python.protos.feature_config_pb2.FeatureConfig """ + feature_name = config.feature_name if config.HasField('feature_name') \ + else config.input_names[0] hash_bucket_size = self._get_hash_bucket_size(config) if hash_bucket_size > 0: fc = feature_column.categorical_column_with_hash_bucket( - config.input_names[0], hash_bucket_size=hash_bucket_size) + feature_name, + hash_bucket_size=hash_bucket_size, + feature_name=feature_name) elif config.vocab_list: fc = feature_column.categorical_column_with_vocabulary_list( - config.input_names[0], + feature_name, default_value=0, - vocabulary_list=config.vocab_list) + vocabulary_list=config.vocab_list, + feature_name=feature_name) elif config.vocab_file: fc = feature_column.categorical_column_with_vocabulary_file( - config.input_names[0], + feature_name, default_value=0, vocabulary_file=config.vocab_file, - vocabulary_size=self._get_vocab_size(config.vocab_file)) + vocabulary_size=self._get_vocab_size(config.vocab_file), + feature_name=feature_name) else: fc = feature_column.categorical_column_with_identity( - config.input_names[0], config.num_buckets, default_value=0) + feature_name, + config.num_buckets, + default_value=0, + feature_name=feature_name) if self.is_wide(config): self._add_wide_embedding_column(fc, config) @@ -285,32 +290,41 @@ def parse_tag_feature(self, config): Args: config: instance of easy_rec.python.protos.feature_config_pb2.FeatureConfig """ + feature_name = config.feature_name if config.HasField('feature_name') \ + else config.input_names[0] hash_bucket_size = self._get_hash_bucket_size(config) if hash_bucket_size > 0: tag_fc = feature_column.categorical_column_with_hash_bucket( - config.input_names[0], hash_bucket_size, dtype=tf.string) + feature_name, + hash_bucket_size, + dtype=tf.string, + feature_name=feature_name) elif config.vocab_list: tag_fc = feature_column.categorical_column_with_vocabulary_list( - config.input_names[0], + feature_name, default_value=0, - vocabulary_list=config.vocab_list) + vocabulary_list=config.vocab_list, + feature_name=feature_name) elif config.vocab_file: tag_fc = feature_column.categorical_column_with_vocabulary_file( - config.input_names[0], + feature_name, default_value=0, vocabulary_file=config.vocab_file, - vocabulary_size=self._get_vocab_size(config.vocab_file)) + vocabulary_size=self._get_vocab_size(config.vocab_file), + feature_name=feature_name) else: tag_fc = feature_column.categorical_column_with_identity( - config.input_names[0], config.num_buckets, default_value=0) + feature_name, + config.num_buckets, + default_value=0, + feature_name=feature_name) if len(config.input_names) > 1: tag_fc = feature_column.weighted_categorical_column( - tag_fc, weight_feature_key=config.input_names[1], dtype=tf.float32) + tag_fc, weight_feature_key=feature_name + ':1', dtype=tf.float32) elif config.HasField('kv_separator'): - wgt_name = config.input_names[0] + '_WEIGHT' tag_fc = feature_column.weighted_categorical_column( - tag_fc, weight_feature_key=wgt_name, dtype=tf.float32) + tag_fc, weight_feature_key=feature_name + ':1', dtype=tf.float32) if self.is_wide(config): self._add_wide_embedding_column(tag_fc, config) @@ -328,7 +342,7 @@ def parse_raw_feature(self, config): feature_name = config.feature_name if config.HasField('feature_name') \ else config.input_names[0] fc = feature_column.numeric_column( - config.input_names[0], shape=(config.raw_input_dim,)) + feature_name, shape=(config.raw_input_dim,), name=feature_name) bounds = None if config.boundaries: @@ -346,8 +360,8 @@ def parse_raw_feature(self, config): try: fc = feature_column.bucketized_column(fc, bounds) except Exception as e: - tf.logging.error('bucketized_column [%s] with bounds %s error' % - (fc.name, str(bounds))) + logging.error('bucketized_column [%s] with bounds %s error' % + (fc.name, str(bounds))) raise e if self.is_wide(config): self._add_wide_embedding_column(fc, config) @@ -355,13 +369,9 @@ def parse_raw_feature(self, config): self._add_deep_embedding_column(fc, config) else: tmp_id_col = feature_column.categorical_column_with_identity( - config.input_names[0] + '_raw_proj_id', - config.raw_input_dim, - default_value=0) + feature_name, config.raw_input_dim, default_value=0) wgt_fc = feature_column.weighted_categorical_column( - tmp_id_col, - weight_feature_key=config.input_names[0] + '_raw_proj_val', - dtype=tf.float32) + tmp_id_col, weight_feature_key=feature_name + ':1', dtype=tf.float32) if self.is_wide(config): self._add_wide_embedding_column(wgt_fc, config) if self.is_deep(config): @@ -380,7 +390,8 @@ def parse_expr_feature(self, config): """ feature_name = config.feature_name if config.HasField('feature_name') \ else config.input_names[0] - fc = feature_column.numeric_column(feature_name, shape=(1,)) + fc = feature_column.numeric_column( + feature_name, shape=(1,), feature_name=feature_name) if self.is_wide(config): self._add_wide_embedding_column(fc, config) if self.is_deep(config): @@ -392,9 +403,20 @@ def parse_combo_feature(self, config): Args: config: instance of easy_rec.python.protos.feature_config_pb2.FeatureConfig """ + feature_name = config.feature_name if config.HasField('feature_name') \ + else None assert len(config.input_names) >= 2 + input_names = [] + for input_id in range(len(config.input_names)): + if input_id == 0: + input_names.append(feature_name) + else: + input_names.append(feature_name + ':' + str(input_id)) fc = feature_column.crossed_column( - config.input_names, self._get_hash_bucket_size(config), hash_key=None) + input_names, + self._get_hash_bucket_size(config), + hash_key=None, + feature_name=feature_name) if self.is_wide(config): self._add_wide_embedding_column(fc, config) @@ -412,7 +434,10 @@ def parse_lookup_feature(self, config): assert config.HasField('hash_bucket_size') hash_bucket_size = self._get_hash_bucket_size(config) fc = feature_column.categorical_column_with_hash_bucket( - feature_name, hash_bucket_size, dtype=tf.string) + feature_name, + hash_bucket_size, + dtype=tf.string, + feature_name=feature_name) if self.is_wide(config): self._add_wide_embedding_column(fc, config) @@ -434,25 +459,33 @@ def parse_sequence_feature(self, config): if config.HasField('hash_bucket_size'): hash_bucket_size = self._get_hash_bucket_size(config) fc = sequence_feature_column.sequence_categorical_column_with_hash_bucket( - config.input_names[0], hash_bucket_size, dtype=tf.string) + config.input_names[0], + hash_bucket_size, + dtype=tf.string, + feature_name=feature_name) elif config.vocab_list: fc = sequence_feature_column.sequence_categorical_column_with_vocabulary_list( config.input_names[0], default_value=0, - vocabulary_list=config.vocab_list) + vocabulary_list=config.vocab_list, + feature_name=feature_name) elif config.vocab_file: fc = sequence_feature_column.sequence_categorical_column_with_vocabulary_file( config.input_names[0], default_value=0, vocabulary_file=config.vocab_file, - vocabulary_size=self._get_vocab_size(config.vocab_file)) + vocabulary_size=self._get_vocab_size(config.vocab_file), + feature_name=feature_name) else: fc = sequence_feature_column.sequence_categorical_column_with_identity( - config.input_names[0], config.num_buckets, default_value=0) + config.input_names[0], + config.num_buckets, + default_value=0, + feature_name=feature_name) else: bounds = None fc = sequence_feature_column.sequence_numeric_column( - config.input_names[0], shape=(1,)) + config.input_names[0], shape=(1,), feature_name=feature_name) if config.hash_bucket_size > 0: hash_bucket_size = self._get_hash_bucket_size(config) assert sub_feature_type == config.IdFeature, \ @@ -472,7 +505,7 @@ def parse_sequence_feature(self, config): fc = sequence_feature_column.sequence_numeric_column_with_bucketized_column( fc, bounds) except Exception as e: - tf.logging.error( + logging.error( 'sequence features bucketized_column [%s] with bounds %s error' % (config.input_names[0], str(bounds))) raise e @@ -481,7 +514,8 @@ def parse_sequence_feature(self, config): tmp_id_col = sequence_feature_column.sequence_categorical_column_with_identity( config.input_names[0] + '_raw_proj_id', config.raw_input_dim, - default_value=0) + default_value=0, + feature_name=feature_name) wgt_fc = sequence_feature_column.sequence_weighted_categorical_column( tmp_id_col, weight_feature_key=config.input_names[0] + '_raw_proj_val', @@ -500,9 +534,10 @@ def _build_partitioner(self, config): if config.max_partitions > 1: if self._global_ev_params is not None or config.HasField('ev_params'): # pai embedding_variable should use fixed_size_partitioner - return tf.fixed_size_partitioner(num_shards=config.max_partitions) + return partitioned_variables.fixed_size_partitioner( + num_shards=config.max_partitions) else: - return min_max_variable_partitioner( + return partitioned_variables.min_max_variable_partitioner( max_partitions=config.max_partitions) else: return None diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index 966ec6cf5..6859ca5dd 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -245,13 +245,14 @@ def create_placeholders(self, export_config): return {'features': inputs_placeholder}, features def _get_features(self, fields): - field_dict = {x: fields[x] for x in self._effective_fields if x in fields} - for k in self._appended_fields: - field_dict[k] = fields[k] - if constant.SAMPLE_WEIGHT in fields: - logging.info('will use field %s as sample weight' % - self._data_config.sample_weight) - field_dict[constant.SAMPLE_WEIGHT] = fields[constant.SAMPLE_WEIGHT] + # field_dict = {x: fields[x] for x in self._effective_fields if x in fields} + # for k in self._appended_fields: + # field_dict[k] = fields[k] + # if constant.SAMPLE_WEIGHT in fields: + # logging.info('will use field %s as sample weight' % + # self._data_config.sample_weight) + # field_dict[constant.SAMPLE_WEIGHT] = fields[constant.SAMPLE_WEIGHT] + field_dict = {x: fields[x] for x in fields if x not in self._label_fields} return field_dict def _get_labels(self, fields): @@ -261,6 +262,362 @@ def _get_labels(self, fields): for x in self._label_fields ]) + def _parse_tag_feature(self, fc, parsed_dict, field_dict): + input_0 = fc.input_names[0] + feature_name = fc.feature_name if fc.HasField('feature_name') else input_0 + field = field_dict[input_0] + # Construct the output of TagFeature according to the dimension of field_dict. + # When the input field exceeds 2 dimensions, convert TagFeature to 2D output. + if len(field.get_shape()) < 2 or field.get_shape()[-1] == 1: + if len(field.get_shape()) == 0: + field = tf.expand_dims(field, axis=0) + elif len(field.get_shape()) == 2: + field = tf.squeeze(field, axis=-1) + if fc.HasField('kv_separator') and len(fc.input_names) > 1: + assert False, 'Tag Feature Error, ' \ + 'Cannot set kv_separator and multi input_names in one feature config. Feature: %s.' % input_0 + parsed_dict[feature_name] = tf.string_split(field, fc.separator) + if fc.HasField('kv_separator'): + indices = parsed_dict[feature_name].indices + tmp_kvs = parsed_dict[feature_name].values + tmp_kvs = tf.string_split(tmp_kvs, fc.kv_separator, skip_empty=False) + tmp_kvs = tf.reshape(tmp_kvs.values, [-1, 2]) + tmp_ks, tmp_vs = tmp_kvs[:, 0], tmp_kvs[:, 1] + + check_list = [ + tf.py_func(check_string_to_number, [tmp_vs, input_0], Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + tmp_vs = tf.string_to_number( + tmp_vs, tf.float32, name='kv_tag_wgt_str_2_flt_%s' % input_0) + parsed_dict[feature_name] = tf.sparse.SparseTensor( + indices, tmp_ks, parsed_dict[feature_name].dense_shape) + parsed_dict[feature_name + ':1'] = tf.sparse.SparseTensor( + indices, tmp_vs, parsed_dict[feature_name + ':1'].dense_shape) + if not fc.HasField('hash_bucket_size'): + check_list = [ + tf.py_func( + check_string_to_number, + [parsed_dict[feature_name].values, input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + vals = tf.string_to_number( + parsed_dict[feature_name].values, + tf.int32, + name='tag_fea_%s' % input_0) + parsed_dict[feature_name] = tf.sparse.SparseTensor( + parsed_dict[feature_name].indices, vals, + parsed_dict[feature_name].dense_shape) + if len(fc.input_names) > 1: + input_1 = fc.input_names[1] + field = field_dict[input_1] + if len(field.get_shape()) == 0: + field = tf.expand_dims(field, axis=0) + field = tf.string_split(field, fc.separator) + check_list = [ + tf.py_func( + check_string_to_number, [field.values, input_1], Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + field_vals = tf.string_to_number( + field.values, tf.float32, name='tag_wgt_str_2_flt_%s' % input_1) + assert_op = tf.assert_equal( + tf.shape(field_vals)[0], + tf.shape(parsed_dict[feature_name].values)[0], + message='TagFeature Error: The size of %s not equal to the size of %s. Please check input: %s and %s.' + % (input_0, input_1, input_0, input_1)) + with tf.control_dependencies([assert_op]): + field = tf.sparse.SparseTensor(field.indices, tf.identity(field_vals), + field.dense_shape) + parsed_dict[feature_name + ':1'] = field + else: + parsed_dict[feature_name] = field_dict[input_0] + if len(fc.input_names) > 1: + input_1 = fc.input_names[1] + parsed_dict[feature_name + ':1'] = field_dict[input_1] + + def _parse_expr_feature(self, fc, parsed_dict, field_dict): + fea_name = fc.feature_name + prefix = 'expr_' + for input_name in fc.input_names: + new_input_name = prefix + input_name + if field_dict[input_name].dtype == tf.string: + check_list = [ + tf.py_func( + check_string_to_number, [field_dict[input_name], input_name], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + parsed_dict[new_input_name] = tf.string_to_number( + field_dict[input_name], + tf.float64, + name='%s_str_2_int_for_expr' % new_input_name) + elif field_dict[input_name].dtype in [ + tf.int32, tf.int64, tf.double, tf.float32 + ]: + parsed_dict[new_input_name] = tf.cast(field_dict[input_name], + tf.float64) + else: + assert False, 'invalid input dtype[%s] for expr feature' % str( + field_dict[input_name].dtype) + + expression = get_expression(fc.expression, fc.input_names, prefix=prefix) + logging.info('expression: %s' % expression) + parsed_dict[fea_name] = eval(expression) + self._appended_fields.append(fea_name) + + def _parse_id_feature(self, fc, parsed_dict, field_dict): + input_0 = fc.input_names[0] + feature_name = fc.feature_name if fc.HasField('feature_name') else input_0 + parsed_dict[feature_name] = field_dict[input_0] + if fc.HasField('hash_bucket_size'): + if field_dict[input_0].dtype != tf.string: + if field_dict[input_0].dtype in [tf.float32, tf.double]: + assert fc.precision > 0, 'it is dangerous to convert float or double to string due to ' \ + 'precision problem, it is suggested to convert them into string ' \ + 'format during feature generalization before using EasyRec; ' \ + 'if you really need to do so, please set precision (the number of ' \ + 'decimal digits) carefully.' + precision = None + if field_dict[input_0].dtype in [tf.float32, tf.double]: + if fc.precision > 0: + precision = fc.precision + # convert to string + + if 'as_string' in dir(tf.strings): + parsed_dict[feature_name] = tf.strings.as_string( + field_dict[input_0], precision=precision) + else: + parsed_dict[feature_name] = tf.as_string( + field_dict[input_0], precision=precision) + elif fc.num_buckets > 0: + if parsed_dict[feature_name].dtype == tf.string: + check_list = [ + tf.py_func( + check_string_to_number, [parsed_dict[feature_name], input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + parsed_dict[feature_name] = tf.string_to_number( + parsed_dict[feature_name], + tf.int32, + name='%s_str_2_int' % input_0) + + def _parse_raw_feature(self, fc, parsed_dict, field_dict): + input_0 = fc.input_names[0] + feature_name = fc.feature_name if fc.HasField('feature_name') else input_0 + if field_dict[input_0].dtype == tf.string: + if fc.raw_input_dim > 1: + check_list = [ + tf.py_func( + check_split, + [field_dict[input_0], fc.separator, fc.raw_input_dim, input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + tmp_fea = tf.string_split(field_dict[input_0], fc.separator) + check_list = [ + tf.py_func( + check_string_to_number, [tmp_fea.values, input_0], Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + tmp_vals = tf.string_to_number( + tmp_fea.values, + tf.float32, + name='multi_raw_fea_to_flt_%s' % input_0) + parsed_dict[feature_name] = tf.sparse_to_dense( + tmp_fea.indices, + [tf.shape(field_dict[input_0])[0], fc.raw_input_dim], + tmp_vals, + default_value=0) + else: + check_list = [ + tf.py_func( + check_string_to_number, [field_dict[input_0], input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + parsed_dict[feature_name] = tf.string_to_number( + field_dict[input_0], tf.float32) + elif field_dict[input_0].dtype in [ + tf.int32, tf.int64, tf.double, tf.float32 + ]: + parsed_dict[feature_name] = tf.to_float(field_dict[input_0]) + else: + assert False, 'invalid dtype[%s] for raw feature' % str( + field_dict[input_0].dtype) + if fc.max_val > fc.min_val: + parsed_dict[feature_name] = (parsed_dict[feature_name] - fc.min_val) / ( + fc.max_val - fc.min_val) + + if fc.HasField('normalizer_fn'): + logging.info('apply normalizer_fn %s' % fc.normalizer_fn) + parsed_dict[feature_name] = load_by_path(fc.normalizer_fn)( + parsed_dict[feature_name]) + + if not fc.boundaries and fc.num_buckets <= 1 and \ + self._data_config.sample_weight != input_0: + # may need by wide model and deep model to project + # raw values to a vector, it maybe better implemented + # by a ProjectionColumn later + sample_num = tf.to_int64(tf.shape(parsed_dict[feature_name])[0]) + indices_0 = tf.range(sample_num, dtype=tf.int64) + indices_1 = tf.range(fc.raw_input_dim, dtype=tf.int64) + indices_0 = indices_0[:, None] + indices_1 = indices_1[None, :] + indices_0 = tf.tile(indices_0, [1, fc.raw_input_dim]) + indices_1 = tf.tile(indices_1, [sample_num, 1]) + indices_0 = tf.reshape(indices_0, [-1, 1]) + indices_1 = tf.reshape(indices_1, [-1, 1]) + indices = tf.concat([indices_0, indices_1], axis=1) + + tmp_parsed = parsed_dict[feature_name] + parsed_dict[feature_name] = tf.SparseTensor( + indices=indices, + values=indices_1[:, 0], + dense_shape=[sample_num, fc.raw_input_dim]) + parsed_dict[feature_name + ':1'] = tf.SparseTensor( + indices=indices, + values=tf.reshape(tmp_parsed, [-1]), + dense_shape=[sample_num, fc.raw_input_dim]) + # self._appended_fields.append(input_0 + '_raw_proj_id') + # self._appended_fields.append(input_0 + '_raw_proj_val') + + def _parse_seq_feature(self, fc, parsed_dict, field_dict): + input_0 = fc.input_names[0] + feature_name = fc.feature_name if fc.HasField('feature_name') else input_0 + field = field_dict[input_0] + sub_feature_type = fc.sub_feature_type + # Construct the output of SeqFeature according to the dimension of field_dict. + # When the input field exceeds 2 dimensions, convert SeqFeature to 2D output. + if len(field.get_shape()) < 2: + parsed_dict[feature_name] = tf.strings.split(field, fc.separator) + if fc.HasField('seq_multi_sep'): + indices = parsed_dict[feature_name].indices + values = parsed_dict[feature_name].values + multi_vals = tf.string_split(values, fc.seq_multi_sep) + indices_1 = multi_vals.indices + indices = tf.gather(indices, indices_1[:, 0]) + out_indices = tf.concat([indices, indices_1[:, 1:]], axis=1) + # 3 dimensional sparse tensor + out_shape = tf.concat( + [parsed_dict[feature_name].dense_shape, multi_vals.dense_shape[1:]], + axis=0) + parsed_dict[feature_name] = tf.sparse.SparseTensor( + out_indices, multi_vals.values, out_shape) + if (fc.num_buckets > 1 and fc.max_val == fc.min_val): + check_list = [ + tf.py_func( + check_string_to_number, + [parsed_dict[feature_name].values, input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + parsed_dict[feature_name] = tf.sparse.SparseTensor( + parsed_dict[feature_name].indices, + tf.string_to_number( + parsed_dict[feature_name].values, + tf.int64, + name='sequence_str_2_int_%s' % input_0), + parsed_dict[feature_name].dense_shape) + elif sub_feature_type == fc.RawFeature: + check_list = [ + tf.py_func( + check_string_to_number, + [parsed_dict[feature_name].values, input_0], + Tout=tf.bool) + ] if self._check_mode else [] + with tf.control_dependencies(check_list): + parsed_dict[feature_name] = tf.sparse.SparseTensor( + parsed_dict[feature_name].indices, + tf.string_to_number( + parsed_dict[feature_name].values, + tf.float32, + name='sequence_str_2_float_%s' % input_0), + parsed_dict[feature_name].dense_shape) + if fc.num_buckets > 1 and fc.max_val > fc.min_val: + normalized_values = (parsed_dict[feature_name].values - fc.min_val) / ( + fc.max_val - fc.min_val) + parsed_dict[feature_name] = tf.sparse.SparseTensor( + parsed_dict[feature_name].indices, normalized_values, + parsed_dict[feature_name].dense_shape) + else: + parsed_dict[feature_name] = field + if not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ + self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ + fc.raw_input_dim == 1: + # may need by wide model and deep model to project + # raw values to a vector, it maybe better implemented + # by a ProjectionColumn later + logging.info( + 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as two dimension raw feature' + % feature_name) + parsed_dict[feature_name] = tf.sparse_to_dense( + parsed_dict[feature_name].indices, + [tf.shape(parsed_dict[feature_name])[0], fc.sequence_length], + parsed_dict[feature_name].values) + sample_num = tf.to_int64(tf.shape(parsed_dict[feature_name])[0]) + indices_0 = tf.range(sample_num, dtype=tf.int64) + indices_1 = tf.range(fc.sequence_length, dtype=tf.int64) + indices_0 = indices_0[:, None] + indices_1 = indices_1[None, :] + indices_0 = tf.tile(indices_0, [1, fc.sequence_length]) + indices_1 = tf.tile(indices_1, [sample_num, 1]) + indices_0 = tf.reshape(indices_0, [-1, 1]) + indices_1 = tf.reshape(indices_1, [-1, 1]) + indices = tf.concat([indices_0, indices_1], axis=1) + tmp_parsed = parsed_dict[feature_name] + parsed_dict[feature_name] = tf.SparseTensor( + indices=indices, + values=indices_1[:, 0], + dense_shape=[sample_num, fc.sequence_length]) + parsed_dict[feature_name + ':1'] = tf.SparseTensor( + indices=indices, + values=tf.reshape(tmp_parsed, [-1]), + dense_shape=[sample_num, fc.sequence_length]) + elif not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ + self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ + fc.raw_input_dim > 1: + # for 3 dimension sequence feature input. + # may need by wide model and deep model to project + # raw values to a vector, it maybe better implemented + # by a ProjectionColumn later + logging.info( + 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as three dimension raw feature' + % feature_name) + parsed_dict[feature_name] = tf.sparse_to_dense( + parsed_dict[feature_name].indices, [ + tf.shape(parsed_dict[feature_name])[0], fc.sequence_length, + fc.raw_input_dim + ], parsed_dict[feature_name].values) + sample_num = tf.to_int64(tf.shape(parsed_dict[feature_name])[0]) + indices_0 = tf.range(sample_num, dtype=tf.int64) + indices_1 = tf.range(fc.sequence_length, dtype=tf.int64) + indices_2 = tf.range(fc.raw_input_dim, dtype=tf.int64) + indices_0 = indices_0[:, None, None] + indices_1 = indices_1[None, :, None] + indices_2 = indices_2[None, None, :] + indices_0 = tf.tile(indices_0, [1, fc.sequence_length, fc.raw_input_dim]) + indices_1 = tf.tile(indices_1, [sample_num, 1, fc.raw_input_dim]) + indices_2 = tf.tile(indices_2, [sample_num, fc.sequence_length, 1]) + indices_0 = tf.reshape(indices_0, [-1, 1]) + indices_1 = tf.reshape(indices_1, [-1, 1]) + indices_2 = tf.reshape(indices_2, [-1, 1]) + indices = tf.concat([indices_0, indices_1, indices_2], axis=1) + + tmp_parsed = parsed_dict[feature_name] + parsed_dict[feature_name] = tf.SparseTensor( + indices=indices, + values=indices_1[:, 0], + dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) + parsed_dict[feature_name + ':1'] = tf.SparseTensor( + indices=indices, + values=tf.reshape(parsed_dict[feature_name], [-1]), + dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) + # self._appended_fields.append(input_0 + '_raw_proj_id') + # self._appended_fields.append(input_0 + '_raw_proj_val') + def _preprocess(self, field_dict): """Preprocess the feature columns. @@ -305,371 +662,29 @@ def _preprocess(self, field_dict): for fc in self._feature_configs: feature_name = fc.feature_name feature_type = fc.feature_type - input_0 = fc.input_names[0] if feature_type == fc.TagFeature: - input_0 = fc.input_names[0] - field = field_dict[input_0] - # Construct the output of TagFeature according to the dimension of field_dict. - # When the input field exceeds 2 dimensions, convert TagFeature to 2D output. - if len(field.get_shape()) < 2 or field.get_shape()[-1] == 1: - if len(field.get_shape()) == 0: - field = tf.expand_dims(field, axis=0) - elif len(field.get_shape()) == 2: - field = tf.squeeze(field, axis=-1) - if fc.HasField('kv_separator') and len(fc.input_names) > 1: - assert False, 'Tag Feature Error, ' \ - 'Cannot set kv_separator and multi input_names in one feature config. Feature: %s.' % input_0 - parsed_dict[input_0] = tf.string_split(field, fc.separator) - if fc.HasField('kv_separator'): - indices = parsed_dict[input_0].indices - tmp_kvs = parsed_dict[input_0].values - tmp_kvs = tf.string_split( - tmp_kvs, fc.kv_separator, skip_empty=False) - tmp_kvs = tf.reshape(tmp_kvs.values, [-1, 2]) - tmp_ks, tmp_vs = tmp_kvs[:, 0], tmp_kvs[:, 1] - - check_list = [ - tf.py_func( - check_string_to_number, [tmp_vs, input_0], Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - tmp_vs = tf.string_to_number( - tmp_vs, tf.float32, name='kv_tag_wgt_str_2_flt_%s' % input_0) - parsed_dict[input_0] = tf.sparse.SparseTensor( - indices, tmp_ks, parsed_dict[input_0].dense_shape) - input_wgt = input_0 + '_WEIGHT' - parsed_dict[input_wgt] = tf.sparse.SparseTensor( - indices, tmp_vs, parsed_dict[input_0].dense_shape) - self._appended_fields.append(input_wgt) - if not fc.HasField('hash_bucket_size'): - check_list = [ - tf.py_func( - check_string_to_number, - [parsed_dict[input_0].values, input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - vals = tf.string_to_number( - parsed_dict[input_0].values, - tf.int32, - name='tag_fea_%s' % input_0) - parsed_dict[input_0] = tf.sparse.SparseTensor( - parsed_dict[input_0].indices, vals, - parsed_dict[input_0].dense_shape) - if len(fc.input_names) > 1: - input_1 = fc.input_names[1] - field = field_dict[input_1] - if len(field.get_shape()) == 0: - field = tf.expand_dims(field, axis=0) - field = tf.string_split(field, fc.separator) - check_list = [ - tf.py_func( - check_string_to_number, [field.values, input_1], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - field_vals = tf.string_to_number( - field.values, - tf.float32, - name='tag_wgt_str_2_flt_%s' % input_1) - assert_op = tf.assert_equal( - tf.shape(field_vals)[0], - tf.shape(parsed_dict[input_0].values)[0], - message='TagFeature Error: The size of %s not equal to the size of %s. Please check input: %s and %s.' - % (input_0, input_1, input_0, input_1)) - with tf.control_dependencies([assert_op]): - field = tf.sparse.SparseTensor(field.indices, - tf.identity(field_vals), - field.dense_shape) - parsed_dict[input_1] = field - else: - parsed_dict[input_0] = field_dict[input_0] - if len(fc.input_names) > 1: - input_1 = fc.input_names[1] - parsed_dict[input_1] = field_dict[input_1] + self._parse_tag_feature(fc, parsed_dict, field_dict) elif feature_type == fc.LookupFeature: assert feature_name is not None and feature_name != '' assert len(fc.input_names) == 2 parsed_dict[feature_name] = self._lookup_preprocess(fc, field_dict) elif feature_type == fc.SequenceFeature: - input_0 = fc.input_names[0] - field = field_dict[input_0] - sub_feature_type = fc.sub_feature_type - # Construct the output of SeqFeature according to the dimension of field_dict. - # When the input field exceeds 2 dimensions, convert SeqFeature to 2D output. - if len(field.get_shape()) < 2: - parsed_dict[input_0] = tf.strings.split(field, fc.separator) - if fc.HasField('seq_multi_sep'): - indices = parsed_dict[input_0].indices - values = parsed_dict[input_0].values - multi_vals = tf.string_split(values, fc.seq_multi_sep) - indices_1 = multi_vals.indices - indices = tf.gather(indices, indices_1[:, 0]) - out_indices = tf.concat([indices, indices_1[:, 1:]], axis=1) - # 3 dimensional sparse tensor - out_shape = tf.concat( - [parsed_dict[input_0].dense_shape, multi_vals.dense_shape[1:]], - axis=0) - parsed_dict[input_0] = tf.sparse.SparseTensor( - out_indices, multi_vals.values, out_shape) - if (fc.num_buckets > 1 and fc.max_val == fc.min_val): - check_list = [ - tf.py_func( - check_string_to_number, - [parsed_dict[input_0].values, input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - parsed_dict[input_0] = tf.sparse.SparseTensor( - parsed_dict[input_0].indices, - tf.string_to_number( - parsed_dict[input_0].values, - tf.int64, - name='sequence_str_2_int_%s' % input_0), - parsed_dict[input_0].dense_shape) - elif sub_feature_type == fc.RawFeature: - check_list = [ - tf.py_func( - check_string_to_number, - [parsed_dict[input_0].values, input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - parsed_dict[input_0] = tf.sparse.SparseTensor( - parsed_dict[input_0].indices, - tf.string_to_number( - parsed_dict[input_0].values, - tf.float32, - name='sequence_str_2_float_%s' % input_0), - parsed_dict[input_0].dense_shape) - if fc.num_buckets > 1 and fc.max_val > fc.min_val: - normalized_values = (parsed_dict[input_0].values - fc.min_val) / ( - fc.max_val - fc.min_val) - parsed_dict[input_0] = tf.sparse.SparseTensor( - parsed_dict[input_0].indices, normalized_values, - parsed_dict[input_0].dense_shape) - else: - parsed_dict[input_0] = field - if not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ - self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ - fc.raw_input_dim == 1: - # may need by wide model and deep model to project - # raw values to a vector, it maybe better implemented - # by a ProjectionColumn later - logging.info( - 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as two dimension raw feature' - % input_0) - parsed_dict[input_0] = tf.sparse_to_dense( - parsed_dict[input_0].indices, - [tf.shape(parsed_dict[input_0])[0], fc.sequence_length], - parsed_dict[input_0].values) - sample_num = tf.to_int64(tf.shape(parsed_dict[input_0])[0]) - indices_0 = tf.range(sample_num, dtype=tf.int64) - indices_1 = tf.range(fc.sequence_length, dtype=tf.int64) - indices_0 = indices_0[:, None] - indices_1 = indices_1[None, :] - indices_0 = tf.tile(indices_0, [1, fc.sequence_length]) - indices_1 = tf.tile(indices_1, [sample_num, 1]) - indices_0 = tf.reshape(indices_0, [-1, 1]) - indices_1 = tf.reshape(indices_1, [-1, 1]) - indices = tf.concat([indices_0, indices_1], axis=1) - parsed_dict[input_0 + '_raw_proj_id'] = tf.SparseTensor( - indices=indices, - values=indices_1[:, 0], - dense_shape=[sample_num, fc.sequence_length]) - parsed_dict[input_0 + '_raw_proj_val'] = tf.SparseTensor( - indices=indices, - values=tf.reshape(parsed_dict[input_0], [-1]), - dense_shape=[sample_num, fc.sequence_length]) - self._appended_fields.append(input_0 + '_raw_proj_id') - self._appended_fields.append(input_0 + '_raw_proj_val') - elif not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ - self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ - fc.raw_input_dim > 1: - # for 3 dimension sequence feature input. - # may need by wide model and deep model to project - # raw values to a vector, it maybe better implemented - # by a ProjectionColumn later - logging.info( - 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as three dimension raw feature' - % input_0) - parsed_dict[input_0] = tf.sparse_to_dense( - parsed_dict[input_0].indices, [ - tf.shape(parsed_dict[input_0])[0], fc.sequence_length, - fc.raw_input_dim - ], parsed_dict[input_0].values) - sample_num = tf.to_int64(tf.shape(parsed_dict[input_0])[0]) - indices_0 = tf.range(sample_num, dtype=tf.int64) - indices_1 = tf.range(fc.sequence_length, dtype=tf.int64) - indices_2 = tf.range(fc.raw_input_dim, dtype=tf.int64) - indices_0 = indices_0[:, None, None] - indices_1 = indices_1[None, :, None] - indices_2 = indices_2[None, None, :] - indices_0 = tf.tile(indices_0, - [1, fc.sequence_length, fc.raw_input_dim]) - indices_1 = tf.tile(indices_1, [sample_num, 1, fc.raw_input_dim]) - indices_2 = tf.tile(indices_2, [sample_num, fc.sequence_length, 1]) - indices_0 = tf.reshape(indices_0, [-1, 1]) - indices_1 = tf.reshape(indices_1, [-1, 1]) - indices_2 = tf.reshape(indices_2, [-1, 1]) - indices = tf.concat([indices_0, indices_1, indices_2], axis=1) - - parsed_dict[input_0 + '_raw_proj_id'] = tf.SparseTensor( - indices=indices, - values=indices_1[:, 0], - dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) - parsed_dict[input_0 + '_raw_proj_val'] = tf.SparseTensor( - indices=indices, - values=tf.reshape(parsed_dict[input_0], [-1]), - dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) - self._appended_fields.append(input_0 + '_raw_proj_id') - self._appended_fields.append(input_0 + '_raw_proj_val') + self._parse_seq_feature(fc, parsed_dict, field_dict) elif feature_type == fc.RawFeature: - input_0 = fc.input_names[0] - if field_dict[input_0].dtype == tf.string: - if fc.raw_input_dim > 1: - check_list = [ - tf.py_func( - check_split, [ - field_dict[input_0], fc.separator, fc.raw_input_dim, - input_0 - ], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - tmp_fea = tf.string_split(field_dict[input_0], fc.separator) - check_list = [ - tf.py_func( - check_string_to_number, [tmp_fea.values, input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - tmp_vals = tf.string_to_number( - tmp_fea.values, - tf.float32, - name='multi_raw_fea_to_flt_%s' % input_0) - parsed_dict[input_0] = tf.sparse_to_dense( - tmp_fea.indices, - [tf.shape(field_dict[input_0])[0], fc.raw_input_dim], - tmp_vals, - default_value=0) - else: - check_list = [ - tf.py_func( - check_string_to_number, [field_dict[input_0], input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - parsed_dict[input_0] = tf.string_to_number( - field_dict[input_0], tf.float32) - elif field_dict[input_0].dtype in [ - tf.int32, tf.int64, tf.double, tf.float32 - ]: - parsed_dict[input_0] = tf.to_float(field_dict[input_0]) - else: - assert False, 'invalid dtype[%s] for raw feature' % str( - field_dict[input_0].dtype) - if fc.max_val > fc.min_val: - parsed_dict[input_0] = (parsed_dict[input_0] - fc.min_val) /\ - (fc.max_val - fc.min_val) - - if fc.HasField('normalizer_fn'): - logging.info('apply normalizer_fn %s' % fc.normalizer_fn) - parsed_dict[input_0] = load_by_path(fc.normalizer_fn)( - parsed_dict[input_0]) - - if not fc.boundaries and fc.num_buckets <= 1 and \ - self._data_config.sample_weight != input_0: - # may need by wide model and deep model to project - # raw values to a vector, it maybe better implemented - # by a ProjectionColumn later - sample_num = tf.to_int64(tf.shape(parsed_dict[input_0])[0]) - indices_0 = tf.range(sample_num, dtype=tf.int64) - indices_1 = tf.range(fc.raw_input_dim, dtype=tf.int64) - indices_0 = indices_0[:, None] - indices_1 = indices_1[None, :] - indices_0 = tf.tile(indices_0, [1, fc.raw_input_dim]) - indices_1 = tf.tile(indices_1, [sample_num, 1]) - indices_0 = tf.reshape(indices_0, [-1, 1]) - indices_1 = tf.reshape(indices_1, [-1, 1]) - indices = tf.concat([indices_0, indices_1], axis=1) - - parsed_dict[input_0 + '_raw_proj_id'] = tf.SparseTensor( - indices=indices, - values=indices_1[:, 0], - dense_shape=[sample_num, fc.raw_input_dim]) - parsed_dict[input_0 + '_raw_proj_val'] = tf.SparseTensor( - indices=indices, - values=tf.reshape(parsed_dict[input_0], [-1]), - dense_shape=[sample_num, fc.raw_input_dim]) - self._appended_fields.append(input_0 + '_raw_proj_id') - self._appended_fields.append(input_0 + '_raw_proj_val') + self._parse_raw_feature(fc, parsed_dict, field_dict) elif feature_type == fc.IdFeature: - input_0 = fc.input_names[0] - parsed_dict[input_0] = field_dict[input_0] - if fc.HasField('hash_bucket_size'): - if field_dict[input_0].dtype != tf.string: - if field_dict[input_0].dtype in [tf.float32, tf.double]: - assert fc.precision > 0, 'it is dangerous to convert float or double to string due to ' \ - 'precision problem, it is suggested to convert them into string ' \ - 'format during feature generalization before using EasyRec; ' \ - 'if you really need to do so, please set precision (the number of ' \ - 'decimal digits) carefully.' - precision = None - if field_dict[input_0].dtype in [tf.float32, tf.double]: - if fc.precision > 0: - precision = fc.precision - # convert to string - if 'as_string' in dir(tf.strings): - parsed_dict[input_0] = tf.strings.as_string( - field_dict[input_0], precision=precision) - else: - parsed_dict[input_0] = tf.as_string( - field_dict[input_0], precision=precision) - elif fc.num_buckets > 0: - if parsed_dict[input_0].dtype == tf.string: - check_list = [ - tf.py_func( - check_string_to_number, [parsed_dict[input_0], input_0], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - parsed_dict[input_0] = tf.string_to_number( - parsed_dict[input_0], tf.int32, name='%s_str_2_int' % input_0) + self._parse_id_feature(fc, parsed_dict, field_dict) elif feature_type == fc.ExprFeature: - fea_name = fc.feature_name - prefix = 'expr_' - for input_name in fc.input_names: - new_input_name = prefix + input_name - if field_dict[input_name].dtype == tf.string: - check_list = [ - tf.py_func( - check_string_to_number, - [field_dict[input_name], input_name], - Tout=tf.bool) - ] if self._check_mode else [] - with tf.control_dependencies(check_list): - parsed_dict[new_input_name] = tf.string_to_number( - field_dict[input_name], - tf.float64, - name='%s_str_2_int_for_expr' % new_input_name) - elif field_dict[input_name].dtype in [ - tf.int32, tf.int64, tf.double, tf.float32 - ]: - parsed_dict[new_input_name] = tf.cast(field_dict[input_name], - tf.float64) - else: - assert False, 'invalid input dtype[%s] for expr feature' % str( - field_dict[input_name].dtype) - - expression = get_expression( - fc.expression, fc.input_names, prefix=prefix) - logging.info('expression: %s' % expression) - parsed_dict[fea_name] = eval(expression) - self._appended_fields.append(fea_name) + self._parse_expr_feature(fc, parsed_dict, field_dict) else: - for input_name in fc.input_names: - parsed_dict[input_name] = field_dict[input_name] + feature_name = fc.feature_name if fc.HasField( + 'feature_name') else fc.input_names[0] + for input_id, input_name in enumerate(fc.input_names): + if input_id > 0: + key = feature_name + ':' + str(input_id) + else: + key = feature_name + parsed_dict[key] = field_dict[input_name] for input_id, input_name in enumerate(self._label_fields): if input_name not in field_dict: diff --git a/easy_rec/python/model/easy_rec_estimator.py b/easy_rec/python/model/easy_rec_estimator.py index 2772c9ed6..23bcb0a70 100644 --- a/easy_rec/python/model/easy_rec_estimator.py +++ b/easy_rec/python/model/easy_rec_estimator.py @@ -174,7 +174,6 @@ def _train_model_fn(self, features, labels, run_config): global_vars = {x.name: x for x in tf.global_variables()} for x in update_ops: if isinstance(x, ops.Operation) and x.inputs[0].name in global_vars: - logging.info('add dense update %s' % x.inputs[0].name) ops.add_to_collection(constant.DENSE_UPDATE_VARIABLES, global_vars[x.inputs[0].name]) update_op = tf.group(*update_ops, name='update_barrier') diff --git a/samples/model_config/deepfm_combo_on_avazu_feature_name.config b/samples/model_config/deepfm_combo_on_avazu_feature_name.config new file mode 100644 index 000000000..65aff377d --- /dev/null +++ b/samples/model_config/deepfm_combo_on_avazu_feature_name.config @@ -0,0 +1,396 @@ +train_input_path: "data/test/dwd_avazu_ctr_deepmodel_10w.csv" +eval_input_path: "data/test/dwd_avazu_ctr_deepmodel_10w.csv" +model_dir: "experiments/dwd_avazu_out_test_combo_feature_name" + +train_config { + log_step_count_steps: 200 + # fine_tune_checkpoint: "" + optimizer_config: { + adam_optimizer: { + learning_rate: { + exponential_decay_learning_rate { + initial_learning_rate: 0.0001 + decay_steps: 10000 + decay_factor: 0.5 + min_learning_rate: 0.0000001 + } + } + } + use_moving_average: false + } + + sync_replicas: true + save_checkpoints_steps: 500 + num_steps: 1000 +} + +eval_config { + metrics_set: { + auc {} + } +} + +data_config { + separator: "," + input_fields: { + input_name: "label" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "hour" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "c1" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "banner_pos" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "site_id" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "site_domain" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "site_category" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "app_id" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "app_domain" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "app_category" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "device_id" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "device_ip" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "device_model" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "device_type" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "device_conn_type" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c14" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c15" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c16" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c17" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c18" + input_type: STRING + default_val:"0" + } + input_fields: { + input_name: "c19" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "c20" + input_type: INT64 + default_val:"0" + } + input_fields: { + input_name: "c21" + input_type: INT64 + default_val:"0" + } + label_fields: "label" + + batch_size: 1024 + prefetch_size: 32 + input_type: CSVInput +} + +feature_config: { + features: { + input_names: "hour" + feature_type: IdFeature + num_buckets: 24 + embedding_dim: 16 + } + features: { + input_names: "c1" + feature_type: RawFeature + boundaries: [1000.0,1001.0,1002.0,1003.0,1004.0,1005.0,1006.0,1007.0,1008.0,1009.0,1010.0,1011.0,1012.0,1013.0,1014.0,1015.0] + embedding_dim: 16 + } + features: { + input_names: "banner_pos" + feature_type: RawFeature + boundaries: [1,2,3,4,5,6] + embedding_dim: 16 + } + features: { + feature_name: "banner_pos_v2" + input_names: "banner_pos" + feature_type: RawFeature + boundaries: [1,3,6] + embedding_dim: 16 + } + features: { + input_names: "site_id" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: "site_domain" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: "site_category" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: "app_id" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: "app_domain" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 1000 + } + features: { + input_names: "app_category" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: "device_id" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: "device_ip" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: "device_model" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: "device_type" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: "device_conn_type" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: "c14" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 500 + } + features: { + input_names: "c15" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 500 + } + features: { + input_names: "c16" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 500 + } + features: { + input_names: "c17" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 500 + } + features: { + input_names: "c18" + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 500 + } + features: { + input_names: "c19" + feature_type: RawFeature + boundaries: [10,20,30,40,50,60,70,80,90,100,110,120,130,140,150,160,170,180,190] + embedding_dim: 16 + } + features: { + input_names: "c20" + feature_type: RawFeature + boundaries: [100.0,200.0,300.0,400.0,500.0,600.0,700.0,800.0, 900.0, 1000.0,1100.0,1200.0, 1300.0,1400.0] + embedding_dim: 16 + } + features: { + input_names: "c21" + feature_type: RawFeature + boundaries: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25] + embedding_dim: 16 + } + features: { + input_names: ["site_id", "app_id"] + feature_name: "site_id_app_id" + feature_type: ComboFeature + hash_bucket_size: 1000, + embedding_dim: 16 + } + features: { + input_names: ["site_id", "c19"] + feature_name: "site_id_c19" + feature_type: ComboFeature + hash_bucket_size: 1000 + embedding_dim: 16 + } + features: { + input_names: ["c21", "c19"] + feature_name: "c19_c21" + feature_type: ComboFeature + hash_bucket_size: 1000 + embedding_dim: 16 + } + +} +model_config:{ + model_class: "DeepFM" + feature_groups: { + group_name: "deep" + feature_names: "hour" + feature_names: "c1" + feature_names: "banner_pos" + feature_names: "banner_pos_v2" + feature_names: "site_id" + feature_names: "site_domain" + feature_names: "site_category" + feature_names: "app_id" + feature_names: "app_domain" + feature_names: "app_category" + feature_names: "device_id" + feature_names: "device_ip" + feature_names: "device_model" + feature_names: "device_type" + feature_names: "device_conn_type" + feature_names: "c14" + feature_names: "c15" + feature_names: "c16" + feature_names: "c17" + feature_names: "c18" + feature_names: "c19" + feature_names: "c20" + feature_names: "c21" + feature_names: "site_id_app_id" + feature_names: "site_id_c19" + feature_names: "c19_c21" + wide_deep:DEEP + } + feature_groups: { + group_name: "wide" + feature_names: "hour" + feature_names: "c1" + feature_names: "banner_pos" + feature_names: "site_id" + feature_names: "site_domain" + feature_names: "site_category" + feature_names: "app_id" + feature_names: "app_domain" + feature_names: "app_category" + feature_names: "device_id" + feature_names: "device_ip" + feature_names: "device_model" + feature_names: "device_type" + feature_names: "device_conn_type" + feature_names: "c14" + feature_names: "c15" + feature_names: "c16" + feature_names: "c17" + feature_names: "c18" + feature_names: "c19" + feature_names: "c20" + feature_names: "c21" + wide_deep:WIDE + } + + deepfm { + wide_output_dim: 16 + + dnn { + hidden_units: [128, 64, 32] + } + + final_dnn { + hidden_units: [128, 64] + } + l2_regularization: 1e-5 + } + # embedding_regularization: 1e-7 +} + +export_config { + multi_placeholder: false +} From c82ba52bd797c3efcfc3740c966320115245992d Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Wed, 23 Nov 2022 20:46:24 +0800 Subject: [PATCH 3/9] fix bug --- .../feature_column/feature_column_v2.py | 6 ++- .../python/feature_column/feature_column.py | 35 ++++++++----- easy_rec/python/input/input.py | 52 +++++++++---------- 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/easy_rec/python/compat/feature_column/feature_column_v2.py b/easy_rec/python/compat/feature_column/feature_column_v2.py index 6703738c5..e1e4d9304 100644 --- a/easy_rec/python/compat/feature_column/feature_column_v2.py +++ b/easy_rec/python/compat/feature_column/feature_column_v2.py @@ -1255,7 +1255,8 @@ def numeric_column(key, shape=(1,), default_value=None, dtype=dtypes.float32, - normalizer_fn=None): + normalizer_fn=None, + feature_name=None): """Represents real valued or numerical features. Example: @@ -1319,7 +1320,8 @@ def numeric_column(key, fc_utils.assert_key_is_string(key) return NumericColumn( - key, + feature_name=feature_name, + key=key, shape=shape, default_value=default_value, dtype=dtype, diff --git a/easy_rec/python/feature_column/feature_column.py b/easy_rec/python/feature_column/feature_column.py index 3f63a944a..57f77b97f 100644 --- a/easy_rec/python/feature_column/feature_column.py +++ b/easy_rec/python/feature_column/feature_column.py @@ -321,10 +321,10 @@ def parse_tag_feature(self, config): if len(config.input_names) > 1: tag_fc = feature_column.weighted_categorical_column( - tag_fc, weight_feature_key=feature_name + ':1', dtype=tf.float32) + tag_fc, weight_feature_key=feature_name + '_w', dtype=tf.float32) elif config.HasField('kv_separator'): tag_fc = feature_column.weighted_categorical_column( - tag_fc, weight_feature_key=feature_name + ':1', dtype=tf.float32) + tag_fc, weight_feature_key=feature_name + '_w', dtype=tf.float32) if self.is_wide(config): self._add_wide_embedding_column(tag_fc, config) @@ -342,7 +342,9 @@ def parse_raw_feature(self, config): feature_name = config.feature_name if config.HasField('feature_name') \ else config.input_names[0] fc = feature_column.numeric_column( - feature_name, shape=(config.raw_input_dim,), name=feature_name) + key=feature_name, + shape=(config.raw_input_dim,), + feature_name=feature_name) bounds = None if config.boundaries: @@ -369,9 +371,14 @@ def parse_raw_feature(self, config): self._add_deep_embedding_column(fc, config) else: tmp_id_col = feature_column.categorical_column_with_identity( - feature_name, config.raw_input_dim, default_value=0) + feature_name + '_raw_proj_id', + config.raw_input_dim, + default_value=0, + feature_name=feature_name) wgt_fc = feature_column.weighted_categorical_column( - tmp_id_col, weight_feature_key=feature_name + ':1', dtype=tf.float32) + tmp_id_col, + weight_feature_key=feature_name + '_raw_proj_val', + dtype=tf.float32) if self.is_wide(config): self._add_wide_embedding_column(wgt_fc, config) if self.is_deep(config): @@ -459,33 +466,33 @@ def parse_sequence_feature(self, config): if config.HasField('hash_bucket_size'): hash_bucket_size = self._get_hash_bucket_size(config) fc = sequence_feature_column.sequence_categorical_column_with_hash_bucket( - config.input_names[0], + feature_name, hash_bucket_size, dtype=tf.string, feature_name=feature_name) elif config.vocab_list: fc = sequence_feature_column.sequence_categorical_column_with_vocabulary_list( - config.input_names[0], + feature_name, default_value=0, vocabulary_list=config.vocab_list, feature_name=feature_name) elif config.vocab_file: fc = sequence_feature_column.sequence_categorical_column_with_vocabulary_file( - config.input_names[0], + feature_name, default_value=0, vocabulary_file=config.vocab_file, vocabulary_size=self._get_vocab_size(config.vocab_file), feature_name=feature_name) else: fc = sequence_feature_column.sequence_categorical_column_with_identity( - config.input_names[0], + feature_name, config.num_buckets, default_value=0, feature_name=feature_name) - else: + else: # raw feature bounds = None fc = sequence_feature_column.sequence_numeric_column( - config.input_names[0], shape=(1,), feature_name=feature_name) + feature_name, shape=(1,), feature_name=feature_name) if config.hash_bucket_size > 0: hash_bucket_size = self._get_hash_bucket_size(config) assert sub_feature_type == config.IdFeature, \ @@ -507,18 +514,18 @@ def parse_sequence_feature(self, config): except Exception as e: logging.error( 'sequence features bucketized_column [%s] with bounds %s error' % - (config.input_names[0], str(bounds))) + (feature_name, str(bounds))) raise e elif config.hash_bucket_size <= 0: if config.embedding_dim > 0: tmp_id_col = sequence_feature_column.sequence_categorical_column_with_identity( - config.input_names[0] + '_raw_proj_id', + feature_name + '_raw_proj_id', config.raw_input_dim, default_value=0, feature_name=feature_name) wgt_fc = sequence_feature_column.sequence_weighted_categorical_column( tmp_id_col, - weight_feature_key=config.input_names[0] + '_raw_proj_val', + weight_feature_key=feature_name + '_raw_proj_val', dtype=tf.float32) fc = wgt_fc else: diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index 6859ca5dd..ae52d4660 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -292,8 +292,8 @@ def _parse_tag_feature(self, fc, parsed_dict, field_dict): tmp_vs, tf.float32, name='kv_tag_wgt_str_2_flt_%s' % input_0) parsed_dict[feature_name] = tf.sparse.SparseTensor( indices, tmp_ks, parsed_dict[feature_name].dense_shape) - parsed_dict[feature_name + ':1'] = tf.sparse.SparseTensor( - indices, tmp_vs, parsed_dict[feature_name + ':1'].dense_shape) + parsed_dict[feature_name + '_w'] = tf.sparse.SparseTensor( + indices, tmp_vs, parsed_dict[feature_name + '_w'].dense_shape) if not fc.HasField('hash_bucket_size'): check_list = [ tf.py_func( @@ -330,12 +330,12 @@ def _parse_tag_feature(self, fc, parsed_dict, field_dict): with tf.control_dependencies([assert_op]): field = tf.sparse.SparseTensor(field.indices, tf.identity(field_vals), field.dense_shape) - parsed_dict[feature_name + ':1'] = field + parsed_dict[feature_name + '_w'] = field else: parsed_dict[feature_name] = field_dict[input_0] if len(fc.input_names) > 1: input_1 = fc.input_names[1] - parsed_dict[feature_name + ':1'] = field_dict[input_1] + parsed_dict[feature_name + '_w'] = field_dict[input_1] def _parse_expr_feature(self, fc, parsed_dict, field_dict): fea_name = fc.feature_name @@ -457,6 +457,7 @@ def _parse_raw_feature(self, fc, parsed_dict, field_dict): parsed_dict[feature_name]) if not fc.boundaries and fc.num_buckets <= 1 and \ + fc.embedding_dim > 0 and \ self._data_config.sample_weight != input_0: # may need by wide model and deep model to project # raw values to a vector, it maybe better implemented @@ -473,11 +474,11 @@ def _parse_raw_feature(self, fc, parsed_dict, field_dict): indices = tf.concat([indices_0, indices_1], axis=1) tmp_parsed = parsed_dict[feature_name] - parsed_dict[feature_name] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_id'] = tf.SparseTensor( indices=indices, values=indices_1[:, 0], dense_shape=[sample_num, fc.raw_input_dim]) - parsed_dict[feature_name + ':1'] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_val'] = tf.SparseTensor( indices=indices, values=tf.reshape(tmp_parsed, [-1]), dense_shape=[sample_num, fc.raw_input_dim]) @@ -544,14 +545,12 @@ def _parse_seq_feature(self, fc, parsed_dict, field_dict): parsed_dict[feature_name].dense_shape) else: parsed_dict[feature_name] = field - if not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ - self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ - fc.raw_input_dim == 1: - # may need by wide model and deep model to project - # raw values to a vector, it maybe better implemented - # by a ProjectionColumn later + if not fc.boundaries and fc.num_buckets <= 1 and\ + self._data_config.sample_weight != input_0 and\ + sub_feature_type == fc.RawFeature and\ + fc.raw_input_dim == 1: logging.info( - 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as two dimension raw feature' + 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as two dimension sequence raw feature' % feature_name) parsed_dict[feature_name] = tf.sparse_to_dense( parsed_dict[feature_name].indices, @@ -568,24 +567,21 @@ def _parse_seq_feature(self, fc, parsed_dict, field_dict): indices_1 = tf.reshape(indices_1, [-1, 1]) indices = tf.concat([indices_0, indices_1], axis=1) tmp_parsed = parsed_dict[feature_name] - parsed_dict[feature_name] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_id'] = tf.SparseTensor( indices=indices, values=indices_1[:, 0], dense_shape=[sample_num, fc.sequence_length]) - parsed_dict[feature_name + ':1'] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_val'] = tf.SparseTensor( indices=indices, values=tf.reshape(tmp_parsed, [-1]), dense_shape=[sample_num, fc.sequence_length]) - elif not fc.boundaries and fc.num_buckets <= 1 and fc.hash_bucket_size <= 0 and \ - self._data_config.sample_weight != input_0 and sub_feature_type == fc.RawFeature and \ - fc.raw_input_dim > 1: + elif (not fc.boundaries and fc.num_buckets <= 1 and + self._data_config.sample_weight != input_0 and + sub_feature_type == fc.RawFeature and fc.raw_input_dim > 1): # for 3 dimension sequence feature input. - # may need by wide model and deep model to project - # raw values to a vector, it maybe better implemented - # by a ProjectionColumn later - logging.info( - 'Not set boundaries or num_buckets or hash_bucket_size, %s will process as three dimension raw feature' - % feature_name) + logging.info('Not set boundaries or num_buckets or hash_bucket_size,' + ' %s will process as three dimension sequence raw feature' % + feature_name) parsed_dict[feature_name] = tf.sparse_to_dense( parsed_dict[feature_name].indices, [ tf.shape(parsed_dict[feature_name])[0], fc.sequence_length, @@ -607,11 +603,11 @@ def _parse_seq_feature(self, fc, parsed_dict, field_dict): indices = tf.concat([indices_0, indices_1, indices_2], axis=1) tmp_parsed = parsed_dict[feature_name] - parsed_dict[feature_name] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_id'] = tf.SparseTensor( indices=indices, values=indices_1[:, 0], dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) - parsed_dict[feature_name + ':1'] = tf.SparseTensor( + parsed_dict[feature_name + '_raw_proj_val'] = tf.SparseTensor( indices=indices, values=tf.reshape(parsed_dict[feature_name], [-1]), dense_shape=[sample_num, fc.sequence_length, fc.raw_input_dim]) @@ -685,6 +681,10 @@ def _preprocess(self, field_dict): else: key = feature_name parsed_dict[key] = field_dict[input_name] + if 'price' in fc.input_names: + print(fc) + print(parsed_dict[fc.feature_name if fc.feature_name else fc + .input_names[0]]) for input_id, input_name in enumerate(self._label_fields): if input_name not in field_dict: From 0b8835148e54dfe0b349585ee38cc1531a6ea6f9 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 10:32:47 +0800 Subject: [PATCH 4/9] fix bug --- easy_rec/python/input/input.py | 2 +- .../deepfm_distribute_eval_combo_on_avazu_ctr.config | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index ae52d4660..9e0b54bb4 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -293,7 +293,7 @@ def _parse_tag_feature(self, fc, parsed_dict, field_dict): parsed_dict[feature_name] = tf.sparse.SparseTensor( indices, tmp_ks, parsed_dict[feature_name].dense_shape) parsed_dict[feature_name + '_w'] = tf.sparse.SparseTensor( - indices, tmp_vs, parsed_dict[feature_name + '_w'].dense_shape) + indices, tmp_vs, parsed_dict[feature_name].dense_shape) if not fc.HasField('hash_bucket_size'): check_list = [ tf.py_func( diff --git a/samples/model_config/deepfm_distribute_eval_combo_on_avazu_ctr.config b/samples/model_config/deepfm_distribute_eval_combo_on_avazu_ctr.config index 530855a62..eaf1f6e3f 100644 --- a/samples/model_config/deepfm_distribute_eval_combo_on_avazu_ctr.config +++ b/samples/model_config/deepfm_distribute_eval_combo_on_avazu_ctr.config @@ -302,7 +302,7 @@ feature_config: { } features: { input_names: ["site_id", "app_id"] - feature_name: "site_id_app_id" + feature_name: "app_id_X_site_id" feature_type: ComboFeature hash_bucket_size: 1000, embedding_dim: 16 @@ -335,7 +335,7 @@ model_config:{ feature_names: "c19" feature_names: "c20" feature_names: "c21" - feature_names: "site_id_app_id" + feature_names: "app_id_X_site_id" wide_deep:DEEP } feature_groups: { From cb0885a20e8816611101ceef85de80ad364938b6 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 16:42:27 +0800 Subject: [PATCH 5/9] fix bug --- .../python/feature_column/feature_column.py | 2 +- easy_rec/python/input/input.py | 43 +++++++------------ 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/easy_rec/python/feature_column/feature_column.py b/easy_rec/python/feature_column/feature_column.py index 57f77b97f..94a9cd132 100644 --- a/easy_rec/python/feature_column/feature_column.py +++ b/easy_rec/python/feature_column/feature_column.py @@ -418,7 +418,7 @@ def parse_combo_feature(self, config): if input_id == 0: input_names.append(feature_name) else: - input_names.append(feature_name + ':' + str(input_id)) + input_names.append(feature_name + '_' + str(input_id)) fc = feature_column.crossed_column( input_names, self._get_hash_bucket_size(config), diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index 9e0b54bb4..e080a6d9d 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -242,24 +242,16 @@ def create_placeholders(self, export_config): (ftype, tf_type)) features[input_name] = input_vals[:, tmp_id] features = self._preprocess(features) - return {'features': inputs_placeholder}, features + return {'features': inputs_placeholder}, features['feature'] def _get_features(self, fields): - # field_dict = {x: fields[x] for x in self._effective_fields if x in fields} - # for k in self._appended_fields: - # field_dict[k] = fields[k] - # if constant.SAMPLE_WEIGHT in fields: - # logging.info('will use field %s as sample weight' % - # self._data_config.sample_weight) - # field_dict[constant.SAMPLE_WEIGHT] = fields[constant.SAMPLE_WEIGHT] - field_dict = {x: fields[x] for x in fields if x not in self._label_fields} - return field_dict + return fields['feature'] def _get_labels(self, fields): + labels = fields['label'] return OrderedDict([ - (x, tf.squeeze(fields[x], axis=1) if len(fields[x].get_shape()) == 2 and - fields[x].get_shape()[1] == 1 else fields[x]) - for x in self._label_fields + (x, tf.squeeze(labels[x], axis=1) if len(labels[x].get_shape()) == 2 and + labels[x].get_shape()[1] == 1 else labels[x]) for x in labels ]) def _parse_tag_feature(self, fc, parsed_dict, field_dict): @@ -677,15 +669,12 @@ def _preprocess(self, field_dict): 'feature_name') else fc.input_names[0] for input_id, input_name in enumerate(fc.input_names): if input_id > 0: - key = feature_name + ':' + str(input_id) + key = feature_name + '_' + str(input_id) else: key = feature_name parsed_dict[key] = field_dict[input_name] - if 'price' in fc.input_names: - print(fc) - print(parsed_dict[fc.feature_name if fc.feature_name else fc - .input_names[0]]) + label_dict = {} for input_id, input_name in enumerate(self._label_fields): if input_name not in field_dict: continue @@ -701,31 +690,31 @@ def _preprocess(self, field_dict): Tout=tf.bool) ] if self._check_mode else [] with tf.control_dependencies(check_list): - parsed_dict[input_name] = tf.string_split( + label_dict[input_name] = tf.string_split( field_dict[input_name], self._label_sep[input_id]).values - parsed_dict[input_name] = tf.reshape( - parsed_dict[input_name], [-1, self._label_dim[input_id]]) + label_dict[input_name] = tf.reshape(label_dict[input_name], + [-1, self._label_dim[input_id]]) else: - parsed_dict[input_name] = field_dict[input_name] + label_dict[input_name] = field_dict[input_name] check_list = [ tf.py_func( - check_string_to_number, [parsed_dict[input_name], input_name], + check_string_to_number, [label_dict[input_name], input_name], Tout=tf.bool) ] if self._check_mode else [] with tf.control_dependencies(check_list): - parsed_dict[input_name] = tf.string_to_number( - parsed_dict[input_name], tf.float32, name=input_name) + label_dict[input_name] = tf.string_to_number( + label_dict[input_name], tf.float32, name=input_name) else: assert field_dict[input_name].dtype in [ tf.float32, tf.double, tf.int32, tf.int64 ], 'invalid label dtype: %s' % str(field_dict[input_name].dtype) - parsed_dict[input_name] = field_dict[input_name] + label_dict[input_name] = field_dict[input_name] if self._data_config.HasField('sample_weight'): if self._mode != tf.estimator.ModeKeys.PREDICT: parsed_dict[constant.SAMPLE_WEIGHT] = field_dict[ self._data_config.sample_weight] - return parsed_dict + return {'feature': parsed_dict, 'label': label_dict} def _lookup_preprocess(self, fc, field_dict): """Preprocess function for lookup features. From 1b7b95526562ef1f550200ec63ba13ed9cd1c11e Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 17:34:07 +0800 Subject: [PATCH 6/9] fix bug --- easy_rec/python/input/input.py | 2 +- easy_rec/python/input/odps_rtp_input_v2.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/easy_rec/python/input/input.py b/easy_rec/python/input/input.py index e080a6d9d..38c195a82 100644 --- a/easy_rec/python/input/input.py +++ b/easy_rec/python/input/input.py @@ -198,7 +198,7 @@ def create_multi_placeholders(self, export_config): inputs[input_name] = finput features = {x: inputs[x] for x in inputs} features = self._preprocess(features) - return inputs, features + return inputs, features['feature'] def create_placeholders(self, export_config): self._mode = tf.estimator.ModeKeys.PREDICT diff --git a/easy_rec/python/input/odps_rtp_input_v2.py b/easy_rec/python/input/odps_rtp_input_v2.py index c5a0e8079..c74357c27 100644 --- a/easy_rec/python/input/odps_rtp_input_v2.py +++ b/easy_rec/python/input/odps_rtp_input_v2.py @@ -77,7 +77,7 @@ def create_placeholders(self, *args, **kwargs): print('[OdpsRTPInputV2] built features: {}'.format(features.keys())) features = self._preprocess(features) print('[OdpsRTPInputV2] processed features: {}'.format(features.keys())) - return {'features': inputs_placeholder}, features + return {'features': inputs_placeholder}, features['feature'] def create_multi_placeholders(self, *args, **kwargs): """Create serving multi-placeholders with rtp_fg.""" From 28f4fba0f53a8ba5bae42de75f543d70c8677b45 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 20:50:51 +0800 Subject: [PATCH 7/9] fix big model export bug --- easy_rec/python/layers/seq_input_layer.py | 3 +-- easy_rec/python/utils/proto_util.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/easy_rec/python/layers/seq_input_layer.py b/easy_rec/python/layers/seq_input_layer.py index 9a23f387d..9f761aad3 100644 --- a/easy_rec/python/layers/seq_input_layer.py +++ b/easy_rec/python/layers/seq_input_layer.py @@ -63,8 +63,7 @@ def _seq_embed_summary_name(input_name): builder) regularizers.apply_regularization( self._embedding_regularizer, weights_list=[tmp_key_tensor]) - key_tensors.append( - feature_column_dict[key]._get_dense_tensor(builder)) + key_tensors.append(tmp_key_tensor) elif feature_name_to_output_tensors[key] is None: assert feature_name_to_output_tensors[ key] is not None, 'When allow_key_search is False, key: %s should defined in same feature group.' % key diff --git a/easy_rec/python/utils/proto_util.py b/easy_rec/python/utils/proto_util.py index 2f8255858..d0a98543f 100644 --- a/easy_rec/python/utils/proto_util.py +++ b/easy_rec/python/utils/proto_util.py @@ -51,7 +51,8 @@ def get_norm_embed_name(name, verbose=False): # input_layer/app_category_embedding/app_category_embedding_weights/SparseReshape # => input_layer/app_category_embedding for i in range(0, len(name_toks) - 1): - if name_toks[i + 1].endswith('_embedding_weights'): + if name_toks[i + 1].endswith('_embedding_weights') or \ + '_embedding_weights_' in name_toks[i+1]: tmp_name = '/'.join(name_toks[:i + 1]) if verbose: logging.info('norm %s to %s' % (name, tmp_name)) From 55fbf0a780ce67d7228447370b0dd5f573cbd847 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 21:06:50 +0800 Subject: [PATCH 8/9] fix big model export bug --- processor/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/test.py b/processor/test.py index f3a81d402..0423e7996 100644 --- a/processor/test.py +++ b/processor/test.py @@ -19,7 +19,7 @@ logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s] %(message)s') -PROCESSOR_VERSION = 'LaRec-0.9.5b-b890f69-TF-2.5.0-Linux' +PROCESSOR_VERSION = 'LaRec-0.9.5d-b1b1604-TF-2.5.0-Linux' PROCESSOR_FILE = PROCESSOR_VERSION + '.tar.gz' PROCESSOR_URL = 'http://easyrec.oss-cn-beijing.aliyuncs.com/processor/' + PROCESSOR_FILE PROCESSOR_ENTRY_LIB = 'processor/' + PROCESSOR_VERSION + '/larec/libtf_predictor.so' From 58d84d430e81e37140efd07be336d80bc12fedfd Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Thu, 24 Nov 2022 21:08:02 +0800 Subject: [PATCH 9/9] fix big model export bug --- easy_rec/python/utils/proto_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/easy_rec/python/utils/proto_util.py b/easy_rec/python/utils/proto_util.py index d0a98543f..c96d41a78 100644 --- a/easy_rec/python/utils/proto_util.py +++ b/easy_rec/python/utils/proto_util.py @@ -52,7 +52,7 @@ def get_norm_embed_name(name, verbose=False): # => input_layer/app_category_embedding for i in range(0, len(name_toks) - 1): if name_toks[i + 1].endswith('_embedding_weights') or \ - '_embedding_weights_' in name_toks[i+1]: + '_embedding_weights_' in name_toks[i + 1]: tmp_name = '/'.join(name_toks[:i + 1]) if verbose: logging.info('norm %s to %s' % (name, tmp_name))