Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pyspark] SparkXGBRanker inaccurate eval metric output given different number of workers & can not pass multiple eval metrics #8608

Open
tracykyle93 opened this issue Dec 16, 2022 · 1 comment
Labels
LTR Learning to rank

Comments

@tracykyle93
Copy link

tracykyle93 commented Dec 16, 2022

Hi! Thanks a lot for the quick release of 1.7.2 to hotfix #8491, now SparkXGBRanker can work more properly. Based on 1.7.2, I also have encountered 2 issues of SparkXGBRanker regarding evaluation metrics,

A sample dataset is attached to reproduce the results
https://github.com/lezzhov/learning_to_rank/tree/main/learning_to_rank/data/train.txt

  1. inaccurate eval metric output given different number of workers
    while I use num_workers=1 and num_workers=4 and same parameter set to train the model, the results of model._xgb_sklearn_model.best_score are very different, I also write a manual eval function and the results also do not align with model._xgb_sklearn_model.best_score under both cases, the eval metric I used for early stopping is "NDCG@10"

internal ndcg@10 on validation data ~ model._xgb_sklearn_model.best_score

num_workers = 1
manual train ndcg@10 -- 0.5931561486501443, internal ndcg@10 on validation data -- 0.46678797961901586, manual valid ndcg@10 -- 0.43402427434921265

num_workers = 4
manual train ndcg@10 -- 0.5499394648169217, internal ndcg@10 on validation data -- 0.6978460945534216, manual valid ndcg@10 -- 0.43996799528598785

Could you take a look at the eval metric logics and fix it? Here are the code to reproduce:

from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
from pyspark.mllib.linalg import Vectors,VectorUDT, SparseVector
from pyspark.sql.functions import monotonically_increasing_id 
import pyspark.sql.functions as F
from pyspark.ml.functions import vector_to_array
from xgboost.spark import SparkXGBRanker
import numpy as np
import random

def read_libsvm(filepath, spark_session):
    with open(filepath, 'r') as f:
        raw_data = [x.split() for x in f.readlines()]
    outcome = [int(x[0]) for x in raw_data]
    qid = [int(x[1].split(':')[1]) for x in raw_data]
    index_value_dict = list()
    for row in raw_data:
        index_value_dict.append(dict([(int(x.split(':')[0]), float(x.split(':')[1]))
                                       for x in row[2:]]))
    max_idx = max([max(x.keys()) for x in index_value_dict])
    rows = [
        Row(
            label=outcome[i],
            group_id = qid[i],
            feat_vector=SparseVector(max_idx + 1, index_value_dict[i])
        )
        for i in range(len(index_value_dict))
    ]
    df = spark_session.createDataFrame(rows)
    return df

def map_column_value_to_rn(data, input_column, rn_column):
    distinct_column_values = data.select(input_column).distinct()
    map_to_rn = F.udf(lambda x: random.random(), FloatType()).asNondeterministic()
    column_value_to_rn_mapping = distinct_column_values.withColumn(rn_column, map_to_rn(input_column))
    return data.join(column_value_to_rn_mapping, how='inner', on=input_column)

def split_data(data, split_by, valid_split=.2, test_split=.0):
    RN_COLUMN = 'rn'
    data = map_column_value_to_rn(data, split_by, RN_COLUMN)
    data.cache()
    train_data = data.filter(data[RN_COLUMN] >= test_split)
    train_data = train_data.withColumn('validationCol', F.when(F.col(RN_COLUMN) >= test_split + valid_split, F.lit(False)).otherwise(F.lit(True)))
    train_data = train_data.drop(RN_COLUMN)
    train_data_group = train_data.select('group_id').distinct()
    train_data_group = train_data_group.coalesce(1).withColumn("qid", monotonically_increasing_id())
    train_data = train_data.join(F.broadcast(train_data_group), ['group_id'])
    return train_data

def dcg(y_true, y_score, k, formula='traditional'):
    order = np.argsort(y_score)[::-1]
    y_true = np.take(y_true, order[:k])
    if formula=='alternative':
        gain = 2 ** y_true - 1
    else:
        gain = y_true 
    discounts = np.log2(np.arange(len(y_true)) + 2)
    return np.sum(gain / discounts)
    
@F.udf(returnType=FloatType())
def ndcg(y_true, y_score, k):
    formula = 'alternative'
    if not k:
        k = len(y_true)
    return float(dcg(y_true, y_score, k, formula) / dcg(y_true, y_true, k, formula))

def get_ndcg(model, df):
    transformed_df = model.transform(df)
    transformed_df_group = transformed_df.groupby('group_id').agg(F.count('label').alias('group'), F.count('prediction').alias('group_pred'), F.collect_list('prediction').alias('pred'), F.collect_list('label').alias('label'))
    transformed_df_group = transformed_df_group.withColumn('ndcg@10', ndcg(F.col('label'), F.col('pred'), F.lit(10)))
    ndcg10_df = transformed_df_group.where(~F.isnan(F.col('ndcg@10'))).select(F.mean(F.col('ndcg@10')).alias('ndcg@10')).cache()
    ndcg10 = ndcg10_df.collect()[0][0]
    return ndcg10

sparkSession = (SparkSession
                .builder
                .config('spark.dynamicAllocation.enabled', 'false')
                .appName('clean-up')
                .enableHiveSupport()
                .getOrCreate())

sqlContext = SQLContext(sparkSession)

df = read_libsvm("train.txt", sparkSession)

df = df.withColumn('features', vector_to_array('feat_vector', dtype="float32"))

df_train = split_data(df, 'group_id', valid_split=.2, test_split=.0)

params = {"objective": "rank:ndcg",
        "eval_metric": "ndcg@10"}

ranker1 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)
ranker1.setParams(**params)
model1 = ranker1.fit(df_train)
ndcg10_train1 = get_ndcg(model1, df_train.filter(F.col("validationCol") == False))
ndcg10_valid1 = get_ndcg(model1, df_train.filter(F.col("validationCol") == True))
print('manual train ndcg@10 -- {}, internal ndcg@10 on validation data -- {}, manual valid ndcg@10 -- {}'.format(ndcg10_train1, model1._xgb_sklearn_model.best_score, ndcg10_valid1))
ranker2 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=4)
ranker1.setParams(**params)
model2 = ranker2.fit(df_train)
ndcg10_train2 = get_ndcg(model2, df_train.filter(F.col("validationCol") == False))
ndcg10_valid2 = get_ndcg(model2, df_train.filter(F.col("validationCol") == True))
print('manual train ndcg@10 -- {}, internal ndcg@10 on validation data -- {}, manual valid ndcg@10 -- {}'.format(ndcg10_train2, model2._xgb_sklearn_model.best_score, ndcg10_valid2))

###################################################

  1. SparkXGBRanker can not pass multiple eval metrics
    SparkXGBRanker does not support passing multiple eval metrics yet, could you add this feature into roadmap?
params = {"objective": "rank:ndcg", "eval_metric": ["ndcg@5", "ndcg@10"]}

ranker3 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)

ranker3.setParams(**params)

model3 = ranker3.fit(df_train)

The error is that:

ValueError                                Traceback (most recent call last)
Cell In [9], line 6
      4 ranker3 = SparkXGBRanker(qid_col="qid", features_col="features", validation_indicator_col="validationCol", early_stopping_rounds=5, num_workers=1)
      5 ranker3.setParams(**params)
----> 6 model3 = ranker3.fit(df_train)

File /usr/lib/spark/python/pyspark/ml/base.py:161, in Estimator.fit(self, dataset, params)
    159         return self.copy(params)._fit(dataset)
    160     else:
--> 161         return self._fit(dataset)
    162 else:
    163     raise ValueError("Params must be either a param map or a list/tuple of param maps, "
    164                      "but got %s." % type(params))

File /opt/conda/default/lib/python3.8/site-packages/xgboost/spark/core.py:647, in _SparkXGBEstimator._fit(self, dataset)
    645 def _fit(self, dataset):
    646     # pylint: disable=too-many-statements, too-many-locals
--> 647     self._validate_params()
    648     label_col = col(self.getOrDefault(self.labelCol)).alias(alias.label)
    650     select_cols = [label_col]

File /opt/conda/default/lib/python3.8/site-packages/xgboost/spark/estimator.py:358, in SparkXGBRanker._validate_params(self)
    357 def _validate_params(self):
--> 358     super()._validate_params()
...
    305         self.isDefined(self.validationIndicatorCol)
    306         and self.getOrDefault(self.validationIndicatorCol)
    307     ):

ValueError: Only string type 'eval_metric' param is allowed.
@trivialfis trivialfis added the LTR Learning to rank label Dec 16, 2022
@trivialfis
Copy link
Member

trivialfis commented Dec 16, 2022

Let me take a deeper look into LTR in general after sorting out #8272 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
LTR Learning to rank
Projects
None yet
Development

No branches or pull requests

2 participants