From 3ae78c4c39a7084a321f2e01b4745cb6c442b7a5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 14 May 2024 17:23:44 -0700 Subject: [PATCH] [SPARK-47599][MLLIB] MLLib: Migrate logWarn with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logWarn` in module `MLLib` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46527 from panbingkun/SPARK-47599. Authored-by: panbingkun Signed-off-by: Gengliang Wang --- .../org/apache/spark/internal/LogKey.scala | 18 +++++++++++++ .../scala/org/apache/spark/ml/Predictor.scala | 5 ++-- .../spark/ml/classification/Classifier.scala | 5 ++-- .../spark/ml/classification/LinearSVC.scala | 4 +-- .../classification/LogisticRegression.scala | 10 ++++--- .../spark/ml/classification/OneVsRest.scala | 8 +++--- .../ProbabilisticClassifier.scala | 5 ++-- .../spark/ml/clustering/GaussianMixture.scala | 5 ++-- .../apache/spark/ml/clustering/KMeans.scala | 4 +-- .../apache/spark/ml/feature/Binarizer.scala | 6 +++-- .../spark/ml/feature/StopWordsRemover.scala | 7 ++--- .../spark/ml/feature/StringIndexer.scala | 5 ++-- .../spark/ml/optim/WeightedLeastSquares.scala | 16 ++++++------ .../apache/spark/ml/recommendation/ALS.scala | 5 ++-- .../ml/regression/AFTSurvivalRegression.scala | 10 +++---- .../ml/regression/DecisionTreeRegressor.scala | 5 ++-- .../spark/ml/regression/GBTRegressor.scala | 6 ++--- .../GeneralizedLinearRegression.scala | 6 ++--- .../ml/regression/LinearRegression.scala | 18 ++++++------- .../ml/regression/RandomForestRegressor.scala | 5 ++-- .../ml/tree/impl/DecisionTreeMetadata.scala | 8 +++--- .../spark/ml/tree/impl/RandomForest.scala | 9 ++++--- .../spark/mllib/clustering/LocalKMeans.scala | 6 ++--- .../linalg/distributed/BlockMatrix.scala | 7 +++-- .../mllib/linalg/distributed/RowMatrix.scala | 26 +++++++++++-------- .../mllib/optimization/GradientDescent.scala | 11 +++++--- .../MatrixFactorizationModel.scala | 9 ++++--- .../spark/mllib/stat/test/ChiSqTest.scala | 7 ++--- .../mllib/tree/model/DecisionTreeModel.scala | 18 ++++++++----- .../mllib/tree/model/treeEnsembleModels.scala | 18 ++++++++----- 30 files changed, 164 insertions(+), 108 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index bf5b7daab705b..e039879333066 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -82,6 +82,7 @@ object LogKeys { case object CHECKPOINT_TIME extends LogKey case object CHECKSUM_FILE_NUM extends LogKey case object CHOSEN_WATERMARK extends LogKey + case object CLASSIFIER extends LogKey case object CLASS_LOADER extends LogKey case object CLASS_NAME extends LogKey case object CLASS_PATH extends LogKey @@ -157,12 +158,14 @@ object LogKeys { case object DEPRECATED_KEY extends LogKey case object DESCRIPTION extends LogKey case object DESIRED_NUM_PARTITIONS extends LogKey + case object DESIRED_TREE_DEPTH extends LogKey case object DESTINATION_PATH extends LogKey case object DFS_FILE extends LogKey case object DIFF_DELTA extends LogKey case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey case object DRIVER_ID extends LogKey case object DRIVER_LIBRARY_PATH_KEY extends LogKey + case object DRIVER_MEMORY_SIZE extends LogKey case object DRIVER_STATE extends LogKey case object DROPPED_PARTITIONS extends LogKey case object DURATION extends LogKey @@ -196,6 +199,7 @@ object LogKeys { case object EXECUTOR_IDS extends LogKey case object EXECUTOR_LAUNCH_COMMANDS extends LogKey case object EXECUTOR_LAUNCH_COUNT extends LogKey + case object EXECUTOR_MEMORY_SIZE extends LogKey case object EXECUTOR_RESOURCES extends LogKey case object EXECUTOR_SHUFFLE_INFO extends LogKey case object EXECUTOR_STATE extends LogKey @@ -217,6 +221,7 @@ object LogKeys { case object FALLBACK_VERSION extends LogKey case object FEATURE_COLUMN extends LogKey case object FEATURE_DIMENSION extends LogKey + case object FEATURE_NAME extends LogKey case object FETCH_SIZE extends LogKey case object FIELD_NAME extends LogKey case object FILE_ABSOLUTE_PATH extends LogKey @@ -309,6 +314,7 @@ object LogKeys { case object LOADED_VERSION extends LogKey case object LOAD_FACTOR extends LogKey case object LOAD_TIME extends LogKey + case object LOCALE extends LogKey case object LOCAL_SCRATCH_DIR extends LogKey case object LOCATION extends LogKey case object LOGICAL_PLAN_COLUMNS extends LogKey @@ -332,8 +338,10 @@ object LogKeys { case object MAX_LOG_NUM_POLICY extends LogKey case object MAX_MEMORY_SIZE extends LogKey case object MAX_METHOD_CODE_SIZE extends LogKey + case object MAX_NUM_BINS extends LogKey case object MAX_NUM_CHUNKS extends LogKey case object MAX_NUM_PARTITIONS extends LogKey + case object MAX_NUM_POSSIBLE_BINS extends LogKey case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey case object MAX_SIZE extends LogKey case object MAX_SLOTS extends LogKey @@ -342,6 +350,7 @@ object LogKeys { case object MEMORY_CONSUMER extends LogKey case object MEMORY_POOL_NAME extends LogKey case object MEMORY_SIZE extends LogKey + case object MEMORY_THRESHOLD_SIZE extends LogKey case object MERGE_DIR_NAME extends LogKey case object MESSAGE extends LogKey case object METADATA_DIRECTORY extends LogKey @@ -351,6 +360,7 @@ object LogKeys { case object METHOD_PARAMETER_TYPES extends LogKey case object METRICS_JSON extends LogKey case object METRIC_NAME extends LogKey + case object MINI_BATCH_FRACTION extends LogKey case object MIN_COMPACTION_BATCH_ID extends LogKey case object MIN_FREQUENT_PATTERN_COUNT extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey @@ -387,8 +397,10 @@ object LogKeys { case object NUM_BYTES_TO_FREE extends LogKey case object NUM_BYTES_TO_WARN extends LogKey case object NUM_BYTES_USED extends LogKey + case object NUM_CATEGORIES extends LogKey case object NUM_CHUNKS extends LogKey case object NUM_CLASSES extends LogKey + case object NUM_COEFFICIENTS extends LogKey case object NUM_COLUMNS extends LogKey case object NUM_CONCURRENT_WRITER extends LogKey case object NUM_CORES extends LogKey @@ -410,12 +422,14 @@ object LogKeys { case object NUM_FREQUENT_ITEMS extends LogKey case object NUM_INDEX_FILES extends LogKey case object NUM_ITERATIONS extends LogKey + case object NUM_LEADING_SINGULAR_VALUES extends LogKey case object NUM_LEFT_PARTITION_VALUES extends LogKey case object NUM_LOADED_ENTRIES extends LogKey case object NUM_LOCAL_DIRS extends LogKey case object NUM_LOCAL_FREQUENT_PATTERN extends LogKey case object NUM_MERGER_LOCATIONS extends LogKey case object NUM_META_FILES extends LogKey + case object NUM_NODES extends LogKey case object NUM_PARTITIONS extends LogKey case object NUM_PARTITIONS2 extends LogKey case object NUM_PARTITION_VALUES extends LogKey @@ -434,6 +448,7 @@ object LogKeys { case object NUM_RESOURCE_SLOTS extends LogKey case object NUM_RETRIES extends LogKey case object NUM_RIGHT_PARTITION_VALUES extends LogKey + case object NUM_ROWS extends LogKey case object NUM_SEQUENCES extends LogKey case object NUM_SLOTS extends LogKey case object NUM_SPILL_INFOS extends LogKey @@ -487,6 +502,7 @@ object LogKeys { case object POD_SHARED_SLOT_COUNT extends LogKey case object POD_STATE extends LogKey case object POD_TARGET_COUNT extends LogKey + case object POINT_OF_CENTER extends LogKey case object POLICY extends LogKey case object POOL_NAME extends LogKey case object PORT extends LogKey @@ -594,6 +610,7 @@ object LogKeys { case object SHUFFLE_SERVICE_CONF_OVERLAY_URL extends LogKey case object SHUFFLE_SERVICE_METRICS_NAMESPACE extends LogKey case object SHUFFLE_SERVICE_NAME extends LogKey + case object SIGMAS_LENGTH extends LogKey case object SIGNAL extends LogKey case object SIZE extends LogKey case object SLEEP_TIME extends LogKey @@ -665,6 +682,7 @@ object LogKeys { case object THREAD_POOL_KEEPALIVE_TIME extends LogKey case object THREAD_POOL_SIZE extends LogKey case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey + case object THRESHOLD extends LogKey case object TID extends LogKey case object TIME extends LogKey case object TIMEOUT extends LogKey diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 41f39461f71a6..83b77510602b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.linalg.VectorUDT import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -192,8 +193,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, if ($(predictionCol).nonEmpty) { transformImpl(dataset) } else { - this.logWarning(s"$uid: Predictor.transform() does nothing" + - " because no output columns were set.") + logWarning(log"${MDC(LogKeys.UUID, uid)}: Predictor.transform() does nothing because " + + log"no output columns were set.") dataset.toDF() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index e12c68f31099e..7883a0dea54f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param.ParamMap @@ -149,8 +150,8 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur } if (numColsOutput == 0) { - logWarning(s"$uid: ClassificationModel.transform() does nothing" + - " because no output columns were set.") + logWarning(log"${MDC(LogKeys.UUID, uid)}: ClassificationModel.transform() does nothing " + + log"because no output columns were set.") } outputData.toDF() } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 297b28456fd6b..4bcc7877658d1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -179,8 +179,8 @@ class LinearSVC @Since("2.2.0") ( maxBlockSizeInMB) if (dataset.storageLevel != StorageLevel.NONE) { - instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + - s"then cached during training. Be careful of double caching!") + instr.logWarning("Input instances will be standardized, blockified to blocks, and " + + "then cached during training. Be careful of double caching!") } val instances = dataset.select( diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 03c3b247e9ee8..b523bd7508366 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{COUNT, RANGE} import org.apache.spark.ml.feature._ import org.apache.spark.ml.impl.Utils @@ -847,9 +847,11 @@ class LogisticRegression @Since("1.2.0") ( (_initialModel.interceptVector.size == numCoefficientSets) && (_initialModel.getFitIntercept == $(fitIntercept)) if (!modelIsValid) { - instr.logWarning(s"Initial coefficients will be ignored! Its dimensions " + - s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " + - s"expected size ($numCoefficientSets, $numFeatures)") + instr.logWarning(log"Initial coefficients will be ignored! Its dimensions " + + log"(${MDC(LogKeys.NUM_ROWS, providedCoefs.numRows)}}, " + + log"${MDC(LogKeys.NUM_COLUMNS, providedCoefs.numCols)}) did not match the " + + log"expected size (${MDC(LogKeys.NUM_COEFFICIENTS, numCoefficientSets)}, " + + log"${MDC(LogKeys.NUM_FEATURES, numFeatures)})") } modelIsValid case None => false diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index b70f3ddd4c14d..18643f74b700f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.{Vector, Vectors} @@ -180,8 +181,8 @@ final class OneVsRestModel private[ml] ( val outputSchema = transformSchema(dataset.schema, logging = true) if (getPredictionCol.isEmpty && getRawPredictionCol.isEmpty) { - logWarning(s"$uid: OneVsRestModel.transform() does nothing" + - " because no output columns were set.") + logWarning(log"${MDC(LogKeys.UUID, uid)}: OneVsRestModel.transform() does nothing " + + log"because no output columns were set.") return dataset.toDF() } @@ -400,7 +401,8 @@ final class OneVsRest @Since("1.4.0") ( getClassifier match { case _: HasWeightCol => true case c => - instr.logWarning(s"weightCol is ignored, as it is not supported by $c now.") + instr.logWarning(log"weightCol is ignored, as it is not supported by " + + log"${MDC(LogKeys.CLASSIFIER, c)} now.") false } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 460f2398a4628..61fab02cb4518 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.linalg.{DenseVector, Vector, VectorUDT} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared._ @@ -154,8 +155,8 @@ abstract class ProbabilisticClassificationModel[ } if (numColsOutput == 0) { - this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: ProbabilisticClassificationModel.transform()" + + log" does nothing because no output columns were set.") } outputData.toDF() } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 19ae8359b9a37..a68b2fc0dec83 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.impl.Utils.{unpackUpperTriangular, EPSILON} import org.apache.spark.ml.linalg._ @@ -142,8 +143,8 @@ class GaussianMixtureModel private[ml] ( } if (numColsOutput == 0) { - this.logWarning(s"$uid: GaussianMixtureModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: GaussianMixtureModel.transform() does " + + log"nothing because no output columns were set.") } outputData.toDF() } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 120c7c100dbc2..04f76660aee6a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -451,8 +451,8 @@ class KMeans @Since("1.5.0") ( private def trainWithBlock(dataset: Dataset[_], instr: Instrumentation) = { if (dataset.storageLevel != StorageLevel.NONE) { - instr.logWarning(s"Input vectors will be blockified to blocks, and " + - s"then cached during training. Be careful of double caching!") + instr.logWarning("Input vectors will be blockified to blocks, and " + + "then cached during training. Be careful of double caching!") } val initStartTime = System.currentTimeMillis diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala index 3727eb17dcd0a..c726aed14ee51 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import scala.collection.mutable.ArrayBuilder import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg._ @@ -139,8 +140,9 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String) }.apply(col(colName)) case _: VectorUDT if td < 0 => - this.logWarning(s"Binarization operations on sparse dataset with negative threshold " + - s"$td will build a dense output, so take care when applying to sparse input.") + logWarning(log"Binarization operations on sparse dataset with negative threshold " + + log"${MDC(LogKeys.THRESHOLD, td)} will build a dense output, so take care when " + + log"applying to sparse input.") udf { vector: Vector => val values = Array.fill(vector.size)(1.0) var nnz = vector.size diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala index 5862a60a407d4..93956fc1811ef 100755 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import java.util.Locale import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasInputCols, HasOutputCol, HasOutputCols} @@ -129,9 +130,9 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String if (Locale.getAvailableLocales.contains(Locale.getDefault)) { Locale.getDefault } else { - logWarning(s"Default locale set was [${Locale.getDefault.toString}]; however, it was " + - "not found in available locales in JVM, falling back to en_US locale. Set param `locale` " + - "in order to respect another locale.") + logWarning(log"Default locale set was [${MDC(LogKeys.LOCALE, Locale.getDefault)}]; " + + log"however, it was not found in available locales in JVM, falling back to en_US locale. " + + log"Set param `locale` in order to respect another locale.") Locale.US } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index f52f56174ed23..60dc4d0240716 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.{Estimator, Model, Transformer} import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.ml.param._ @@ -431,8 +432,8 @@ class StringIndexerModel ( val labels = labelsArray(i) if (!dataset.schema.fieldNames.contains(inputColName)) { - logWarning(s"Input column ${inputColName} does not exist during transformation. " + - "Skip StringIndexerModel for this column.") + logWarning(log"Input column ${MDC(LogKeys.COLUMN_NAME, inputColName)} does not exist " + + log"during transformation. Skip StringIndexerModel for this column.") outputColNames(i) = null } else { val filteredLabels = getHandleInvalid match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 757630b0ad16c..eff100cc3ae3a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -124,13 +124,13 @@ private[ml] class WeightedLeastSquares( if (rawBStd == 0) { if (fitIntercept || rawBBar == 0.0) { if (rawBBar == 0.0) { - instr.logWarning(s"Mean and standard deviation of the label are zero, so the " + - s"coefficients and the intercept will all be zero; as a result, training is not " + - s"needed.") + instr.logWarning("Mean and standard deviation of the label are zero, so the " + + "coefficients and the intercept will all be zero; as a result, training is not " + + "needed.") } else { - instr.logWarning(s"The standard deviation of the label is zero, so the coefficients " + - s"will be zeros and the intercept will be the mean of the label; as a result, " + - s"training is not needed.") + instr.logWarning("The standard deviation of the label is zero, so the coefficients " + + "will be zeros and the intercept will be the mean of the label; as a result, " + + "training is not needed.") } val coefficients = new DenseVector(Array.ofDim(numFeatures)) val intercept = rawBBar @@ -139,8 +139,8 @@ private[ml] class WeightedLeastSquares( } else { require(!(regParam > 0.0 && standardizeLabel), "The standard deviation of the label is " + "zero. Model cannot be regularized when labels are standardized.") - instr.logWarning(s"The standard deviation of the label is zero. Consider setting " + - s"fitIntercept=true.") + instr.logWarning("The standard deviation of the label is zero. Consider setting " + + "fitIntercept=true.") } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 1e6be16ef62b7..50f94a5799444 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -33,7 +33,8 @@ import org.json4s.JsonDSL._ import org.apache.spark.{Partitioner, SparkException} import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.PATH import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.BLAS import org.apache.spark.ml.param._ @@ -1027,7 +1028,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true) } catch { case e: IOException => - logWarning(s"Cannot delete checkpoint file $file:", e) + logWarning(log"Cannot delete checkpoint file ${MDC(PATH, file)}:", e) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 57d20bcd6f49d..788ad65497dfc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -24,7 +24,7 @@ import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS} import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ @@ -206,8 +206,8 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length) if (dataset.storageLevel != StorageLevel.NONE) { - instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + - s"then cached during training. Be careful of double caching!") + instr.logWarning("Input instances will be standardized, blockified to blocks, and " + + "then cached during training. Be careful of double caching!") } val validatedCensorCol = { @@ -441,8 +441,8 @@ class AFTSurvivalRegressionModel private[ml] ( if (predictionColNames.nonEmpty) { dataset.withColumns(predictionColNames, predictionColumns) } else { - this.logWarning(s"$uid: AFTSurvivalRegressionModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: AFTSurvivalRegressionModel.transform() " + + log"does nothing because no output columns were set.") dataset.toDF() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 6c0089b689499..481e8c8357f16 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -22,6 +22,7 @@ import org.json4s.{DefaultFormats, JObject} import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamMap @@ -238,8 +239,8 @@ class DecisionTreeRegressionModel private[ml] ( if (predictionColNames.nonEmpty) { dataset.withColumns(predictionColNames, predictionColumns) } else { - this.logWarning(s"$uid: DecisionTreeRegressionModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: DecisionTreeRegressionModel.transform() " + + log"does nothing because no output columns were set.") dataset.toDF() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala index 0c58cc2449b99..732bfcbd671ed 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala @@ -21,7 +21,7 @@ import org.json4s.{DefaultFormats, JObject} import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.ml.linalg.{BLAS, Vector} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree._ @@ -288,8 +288,8 @@ class GBTRegressionModel private[ml]( if (predictionColNames.nonEmpty) { dataset.withColumns(predictionColNames, predictionColumns) } else { - this.logWarning(s"$uid: GBTRegressionModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: GBTRegressionModel.transform() " + + log"does nothing because no output columns were set.") dataset.toDF() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index aa39a3e177eeb..4ded2f8d7bf5c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.attribute._ import org.apache.spark.ml.feature.{Instance, OffsetInstance} @@ -1074,8 +1074,8 @@ class GeneralizedLinearRegressionModel private[ml] ( } if (numColsOutput == 0) { - this.logWarning(s"$uid: GeneralizedLinearRegressionModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: GeneralizedLinearRegressionModel.transform()" + + log" does nothing because no output columns were set.") } outputData.toDF() } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index d53b8b270f2d6..23e536ce45eb5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -333,8 +333,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String epsilon, maxBlockSizeInMB) if (dataset.storageLevel != StorageLevel.NONE) { - instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + - s"then cached during training. Be careful of double caching!") + instr.logWarning("Input instances will be standardized, blockified to blocks, and " + + "then cached during training. Be careful of double caching!") } // Extract the number of features before deciding optimization solver. @@ -377,7 +377,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String } else { require($(regParam) == 0.0, "The standard deviation of the label is zero. " + "Model cannot be regularized.") - instr.logWarning(s"The standard deviation of the label is zero. " + + instr.logWarning("The standard deviation of the label is zero. " + "Consider setting fitIntercept=true.") } } @@ -472,13 +472,13 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String // Also, if rawYStd==0 and yMean==0, all the coefficients are zero regardless of // the fitIntercept. if (yMean == 0.0) { - instr.logWarning(s"Mean and standard deviation of the label are zero, so the " + - s"coefficients and the intercept will all be zero; as a result, training is not " + - s"needed.") + instr.logWarning("Mean and standard deviation of the label are zero, so the " + + "coefficients and the intercept will all be zero; as a result, training is not " + + "needed.") } else { - instr.logWarning(s"The standard deviation of the label is zero, so the coefficients " + - s"will be zeros and the intercept will be the mean of the label; as a result, " + - s"training is not needed.") + instr.logWarning("The standard deviation of the label is zero, so the coefficients " + + "will be zeros and the intercept will be the mean of the label; as a result, " + + "training is not needed.") } val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 9eb90f790e284..4135afb5ed0b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -21,6 +21,7 @@ import org.json4s.{DefaultFormats, JObject} import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since +import org.apache.spark.internal.{LogKeys, MDC} import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamMap @@ -254,8 +255,8 @@ class RandomForestRegressionModel private[ml] ( if (predictionColNames.nonEmpty) { dataset.withColumns(predictionColNames, predictionColumns) } else { - this.logWarning(s"$uid: RandomForestRegressionModel.transform() does nothing" + - " because no output columns were set.") + this.logWarning(log"${MDC(LogKeys.UUID, uid)}: RandomForestRegressionModel.transform() " + + log"does nothing because no output columns were set.") dataset.toDF() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala index a9c2941ef3a53..2f63f4ae073e5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.tree.impl import scala.collection.mutable import scala.util.Try -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.tree.TreeEnsembleParams import org.apache.spark.mllib.tree.configuration.Algo._ @@ -134,8 +134,10 @@ private[spark] object DecisionTreeMetadata extends Logging { val maxPossibleBins = math.min(strategy.maxBins, numExamples).toInt if (maxPossibleBins < strategy.maxBins) { - logWarning(s"DecisionTree reducing maxBins from ${strategy.maxBins} to $maxPossibleBins" + - s" (= number of training instances)") + logWarning(log"DecisionTree reducing maxBins from " + + log"${MDC(LogKeys.MAX_NUM_BINS, strategy.maxBins)} to " + + log"${MDC(LogKeys.MAX_NUM_POSSIBLE_BINS, maxPossibleBins)} " + + log"(= number of training instances)") } // We check the number of bins here against maxPossibleBins. diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index c6783f755604b..3461c5218f390 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, TIMER, WEIGHTED_NUM} +import org.apache.spark.internal.LogKeys.{MAX_MEMORY_SIZE, MEMORY_SIZE, NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, NUM_NODES, TIMER, WEIGHTED_NUM} import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.impl.Utils @@ -1288,9 +1288,10 @@ private[spark] object RandomForest extends Logging with Serializable { } if (memUsage > maxMemoryUsage) { // If maxMemoryUsage is 0, we should still allow splitting 1 node. - logWarning(s"Tree learning is using approximately $memUsage bytes per iteration, which" + - s" exceeds requested limit maxMemoryUsage=$maxMemoryUsage. This allows splitting" + - s" $numNodesInGroup nodes in this iteration.") + logWarning(log"Tree learning is using approximately ${MDC(MEMORY_SIZE, memUsage)} " + + log"bytes per iteration, which exceeds requested limit " + + log"maxMemoryUsage=${MDC(MAX_MEMORY_SIZE, maxMemoryUsage)}. This allows splitting " + + log"${MDC(NUM_NODES, numNodesInGroup)} nodes in this iteration.") } // Convert mutable maps to immutable ones. val nodesForGroup: Map[Int, Array[LearningNode]] = diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala index 11af5a3368fa8..ea83be1237298 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering import scala.util.Random import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.NUM_ITERATIONS +import org.apache.spark.internal.LogKeys.{NUM_ITERATIONS, POINT_OF_CENTER} import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} import org.apache.spark.mllib.linalg.Vectors @@ -59,8 +59,8 @@ private[mllib] object LocalKMeans extends Logging { j += 1 } if (j == 0) { - logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + - s" Using duplicate point for center k = $i.") + logWarning(log"kMeansPlusPlus initialization ran out of distinct points for centers." + + log" Using duplicate point for center k = ${MDC(POINT_OF_CENTER, i)}.") centers(i) = points(0).toDense } else { centers(i) = points(j - 1).toDense diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 4e9952e6d768f..3329682d3b550 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -23,7 +23,7 @@ import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM} import org.apache.spark.{Partitioner, PartitionIdPassthrough, SparkException} import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -322,7 +322,10 @@ class BlockMatrix @Since("1.3.0") ( val m = numRows().toInt val n = numCols().toInt val mem = m * n / 125000 - if (mem > 500) logWarning(s"Storing this matrix will require $mem MiB of memory!") + if (mem > 500) { + logWarning(log"Storing this matrix will require ${MDC(LogKeys.MEMORY_SIZE, mem)} " + + log"MiB of memory!") + } val localBlocks = blocks.collect() val values = new Array[Double](m * n) localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 882872709ac35..63cd41439054e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -26,7 +26,7 @@ import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, D import breeze.numerics.{sqrt => brzSqrt} import org.apache.spark.annotation.Since -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat._ @@ -251,7 +251,8 @@ class RowMatrix @Since("1.0.0") ( } if (cols > 10000) { val memMB = (cols.toLong * cols) / 125000 - logWarning(s"$cols columns will require at least $memMB megabytes of memory!") + logWarning(log"${MDC(LogKeys.NUM_COLUMNS, cols)} columns will require at least " + + log"${MDC(LogKeys.MEMORY_SIZE, memMB)} megabytes of memory!") } } @@ -342,7 +343,8 @@ class RowMatrix @Since("1.0.0") ( val computeMode = mode match { case "auto" => if (k > 5000) { - logWarning(s"computing svd with k=$k and n=$n, please check necessity") + logWarning(log"computing svd with k=${MDC(LogKeys.NUM_LEADING_SINGULAR_VALUES, k)} and " + + log"n=${MDC(LogKeys.NUM_COLUMNS, n)}, please check necessity") } // TODO: The conditions below are not fully tested. @@ -395,7 +397,8 @@ class RowMatrix @Since("1.0.0") ( // criterion specified by tol after max number of iterations. // Thus use i < min(k, sigmas.length) instead of i < k. if (sigmas.length < k) { - logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") + logWarning(log"Requested ${MDC(LogKeys.NUM_LEADING_SINGULAR_VALUES, k)} singular " + + log"values but only found ${MDC(LogKeys.SIGMAS_LENGTH, sigmas.length)} converged.") } while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) { i += 1 @@ -403,7 +406,8 @@ class RowMatrix @Since("1.0.0") ( val sk = i if (sk < k) { - logWarning(s"Requested $k singular values but only found $sk nonzeros.") + logWarning(log"Requested ${MDC(LogKeys.NUM_LEADING_SINGULAR_VALUES, k)} singular " + + log"values but only found ${MDC(LogKeys.COUNT, sk)} nonzeros.") } // Warn at the end of the run as well, for increased visibility. @@ -625,9 +629,9 @@ class RowMatrix @Since("1.0.0") ( require(threshold >= 0, s"Threshold cannot be negative: $threshold") if (threshold > 1) { - logWarning(s"Threshold is greater than 1: $threshold " + - "Computation will be more efficient with promoted sparsity, " + - " however there is no correctness guarantee.") + logWarning(log"Threshold is greater than 1: ${MDC(LogKeys.THRESHOLD, threshold)} " + + log"Computation will be more efficient with promoted sparsity, " + + log"however there is no correctness guarantee.") } val gamma = if (threshold < 1e-6) { @@ -828,9 +832,9 @@ class RowMatrix @Since("1.0.0") ( val desiredTreeDepth = math.ceil(numerator / denominator) if (desiredTreeDepth > 4) { - logWarning( - s"Desired tree depth for treeAggregation is big ($desiredTreeDepth)." - + "Consider increasing driver max result size or reducing number of partitions") + logWarning(log"Desired tree depth for treeAggregation is big " + + log"(${MDC(LogKeys.DESIRED_TREE_DEPTH, desiredTreeDepth)}). " + + log"Consider increasing driver max result size or reducing number of partitions") } math.min(math.max(1, desiredTreeDepth), 10).toInt diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index d40e640a33d6f..a288d13e57f7b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{norm, DenseVector => BDV} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD @@ -203,8 +203,9 @@ object GradientDescent extends Logging { } if (numIterations * miniBatchFraction < 1.0) { - logWarning("Not all examples will be used if numIterations * miniBatchFraction < 1.0: " + - s"numIterations=$numIterations and miniBatchFraction=$miniBatchFraction") + logWarning(log"Not all examples will be used if numIterations * miniBatchFraction < 1.0: " + + log"numIterations=${MDC(LogKeys.NUM_ITERATIONS, numIterations)} and " + + log"miniBatchFraction=${MDC(LogKeys.MINI_BATCH_FRACTION, miniBatchFraction)}") } val stochasticLossHistory = new ArrayBuffer[Double](numIterations + 1) @@ -291,7 +292,9 @@ object GradientDescent extends Logging { } } } else { - logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero") + logWarning(log"Iteration " + + log"(${MDC(LogKeys.INDEX, i)}/${MDC(LogKeys.NUM_ITERATIONS, numIterations)}). " + + log"The size of sampled batch is zero") } i += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 9ffee8832db93..bc888aecec0ab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg.BLAS import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} @@ -66,11 +66,12 @@ class MatrixFactorizationModel @Since("0.8.0") ( require(features.first()._2.length == rank, s"$name feature dimension does not match the rank $rank.") if (features.partitioner.isEmpty) { - logWarning(s"$name factor does not have a partitioner. " - + "Prediction on individual records could be slow.") + logWarning(log"${MDC(LogKeys.FEATURE_NAME, name)} factor does not have a partitioner. " + + log"Prediction on individual records could be slow.") } if (features.getStorageLevel == StorageLevel.NONE) { - logWarning(s"$name factor is not cached. Prediction could be slow.") + logWarning(log"${MDC(LogKeys.FEATURE_NAME, name)} factor is not cached. " + + log"Prediction could be slow.") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 9aeab65e25de4..2059a9f785381 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat.test import org.apache.commons.math3.distribution.ChiSquaredDistribution import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD @@ -221,8 +221,9 @@ private[spark] object ChiSqTest extends Logging { } val size = observed.size if (size > 1000) { - logWarning("Chi-squared approximation may not be accurate due to low expected frequencies " - + s" as a result of a large number of categories: $size.") + logWarning(log"Chi-squared approximation may not be accurate due to low expected " + + log"frequencies as a result of a large number of categories: " + + log"${MDC(LogKeys.NUM_CATEGORIES, size)}.") } val obsArr = observed.toArray val expArr = if (expected.size == 0) Array.tabulate(size)(_ => 1.0 / size) else expected.toArray diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index c282dc59fa8d3..2f65dea0c4a89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -26,7 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} import org.apache.spark.mllib.tree.configuration.Algo._ @@ -209,15 +209,19 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { .map(Utils.memoryStringToMb) .getOrElse(Utils.DEFAULT_DRIVER_MEM_MB) if (driverMemory <= memThreshold) { - logWarning(s"$thisClassName.save() was called, but it may fail because of too little" + - s" driver memory (${driverMemory}m)." + - s" If failure occurs, try setting driver-memory ${memThreshold}m (or larger).") + logWarning(log"${MDC(LogKeys.CLASS_NAME, thisClassName)}.save() was called, " + + log"but it may fail because of too little driver memory " + + log"(${MDC(LogKeys.DRIVER_MEMORY_SIZE, driverMemory)}m). If failure occurs, " + + log"try setting driver-memory ${MDC(LogKeys.MEMORY_THRESHOLD_SIZE, memThreshold)}m " + + log"(or larger).") } } else { if (sc.executorMemory <= memThreshold) { - logWarning(s"$thisClassName.save() was called, but it may fail because of too little" + - s" executor memory (${sc.executorMemory}m)." + - s" If failure occurs try setting executor-memory ${memThreshold}m (or larger).") + logWarning(log"${MDC(LogKeys.CLASS_NAME, thisClassName)}.save() was called, " + + log"but it may fail because of too little executor memory " + + log"(${MDC(LogKeys.EXECUTOR_MEMORY_SIZE, sc.executorMemory)}m). If failure occurs, " + + log"try setting executor-memory ${MDC(LogKeys.MEMORY_THRESHOLD_SIZE, memThreshold)}m " + + log"(or larger).") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 579d6b77f62c3..aa2287f3af896 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -26,7 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.ml.linalg.BLAS import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint @@ -407,15 +407,19 @@ private[tree] object TreeEnsembleModel extends Logging { .map(Utils.memoryStringToMb) .getOrElse(Utils.DEFAULT_DRIVER_MEM_MB) if (driverMemory <= memThreshold) { - logWarning(s"$className.save() was called, but it may fail because of too little" + - s" driver memory (${driverMemory}m)." + - s" If failure occurs, try setting driver-memory ${memThreshold}m (or larger).") + logWarning(log"${MDC(LogKeys.CLASS_NAME, className)}.save() was called, " + + log"but it may fail because of too little driver memory " + + log"(${MDC(LogKeys.DRIVER_MEMORY_SIZE, driverMemory)}m). If failure occurs, " + + log"try setting driver-memory ${MDC(LogKeys.MEMORY_THRESHOLD_SIZE, memThreshold)}m " + + log"(or larger).") } } else { if (sc.executorMemory <= memThreshold) { - logWarning(s"$className.save() was called, but it may fail because of too little" + - s" executor memory (${sc.executorMemory}m)." + - s" If failure occurs try setting executor-memory ${memThreshold}m (or larger).") + logWarning(log"${MDC(LogKeys.CLASS_NAME, className)}.save() was called, " + + log"but it may fail because of too little executor memory " + + log"(${MDC(LogKeys.EXECUTOR_MEMORY_SIZE, sc.executorMemory)}m). If failure occurs, " + + log"try setting executor-memory ${MDC(LogKeys.MEMORY_THRESHOLD_SIZE, memThreshold)}m " + + log"(or larger).") } }