diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 77317782068bd..41289c6414242 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -26,15 +26,21 @@ object LogKey extends Enumeration { val APP_DESC = Value val APP_ID = Value val APP_STATE = Value + val BATCH_ID = Value val BLOCK_ID = Value val BLOCK_MANAGER_ID = Value val BROADCAST_ID = Value val BUCKET = Value val BYTECODE_SIZE = Value + val CACHE_AUTO_REMOVED_SIZE = Value + val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value + val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value val CLUSTER_ID = Value + val CODEC_LEVEL = Value + val CODEC_NAME = Value val COLUMN_DATA_TYPE_SOURCE = Value val COLUMN_DATA_TYPE_TARGET = Value val COLUMN_DEFAULT_VALUE = Value @@ -47,6 +53,7 @@ object LogKey extends Enumeration { val CONFIG3 = Value val CONFIG4 = Value val CONFIG5 = Value + val CONSUMER = Value val CONTAINER = Value val CONTAINER_ID = Value val COUNT = Value @@ -59,13 +66,18 @@ object LogKey extends Enumeration { val DATA = Value val DATABASE_NAME = Value val DATAFRAME_CACHE_ENTRY = Value + val DATAFRAME_ID = Value + val DESCRIPTION = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value + val DURATION = Value val END_POINT = Value val ENGINE = Value val ERROR = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXECUTE_INFO = Value + val EXECUTE_KEY = Value val EXECUTOR_ENV_REGEX = Value val EXECUTOR_ID = Value val EXECUTOR_IDS = Value @@ -77,6 +89,7 @@ object LogKey extends Enumeration { val FIELD_NAME = Value val FILE_FORMAT = Value val FILE_FORMAT2 = Value + val FROM_OFFSET = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value val GROUP_ID = Value @@ -84,25 +97,34 @@ object LogKey extends Enumeration { val HIVE_OPERATION_STATE = Value val HIVE_OPERATION_TYPE = Value val HOST = Value + val HOST_PORT = Value val INDEX = Value val INFERENCE_MODE = Value + val INITIAL_CAPACITY = Value val INTERVAL = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPRESSION = Value + val KAFKA_PULLS_COUNT = Value + val KAFKA_RECORDS_PULLED_COUNT = Value val KEY = Value + val LAST_ACCESS_TIME = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value val LISTENER = Value + val LOAD_FACTOR = Value val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value + val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value + val MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE = Value val MAX_CAPACITY = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value val MERGE_DIR_NAME = Value + val MESSAGE = Value val METHOD_NAME = Value val MIN_SIZE = Value val NEW_VALUE = Value @@ -129,6 +151,7 @@ object LogKey extends Enumeration { val POLICY = Value val PORT = Value val PRODUCER_ID = Value + val QUERY_CACHE_VALUE = Value val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value @@ -138,6 +161,7 @@ object LogKey extends Enumeration { val RANGE = Value val RDD_ID = Value val REASON = Value + val REATTACHABLE = Value val RECEIVED_BLOCK_INFO = Value val REDUCE_ID = Value val RELATION_NAME = Value @@ -145,14 +169,16 @@ object LogKey extends Enumeration { val RESOURCE_NAME = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value - val RPC_ADDRESS = Value val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value + val RUN_ID = Value val SCHEMA = Value val SCHEMA2 = Value val SERVICE_NAME = Value + val SESSION_HOLD_INFO = Value val SESSION_ID = Value + val SESSION_KEY = Value val SHARD_ID = Value val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value @@ -176,12 +202,20 @@ object LogKey extends Enumeration { val THREAD = Value val THREAD_NAME = Value val TID = Value + val TIME = Value val TIMEOUT = Value val TIME_UNITS = Value val TIP = Value + val TOPIC = Value val TOPIC_PARTITION = Value + val TOPIC_PARTITIONS = Value + val TOPIC_PARTITION_OFFSET = Value + val TOPIC_PARTITION_OFFSET_RANGE = Value val TOTAL_EFFECTIVE_TIME = Value + val TOTAL_RECORDS_READ = Value + val TOTAL_SIZE = Value val TOTAL_TIME = Value + val TOTAL_TIME_READ = Value val UNSUPPORTED_EXPRESSION = Value val UNSUPPORTED_HINT_REASON = Value val UNTIL_OFFSET = Value diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index e8be11f48a2b3..4bedd625e6091 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{CONFIG, PATH} +import org.apache.spark.internal.LogKey.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroCompressionCodec._ import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION @@ -118,7 +118,8 @@ private[sql] object AvroUtils extends Logging { if (compressed.getSupportCompressionLevel) { val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level", compressed.getDefaultCompressionLevel.toString) - logInfo(s"Compressing Avro output using the $codecName codec at level $level") + logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " + + log"at level ${MDC(CODEC_LEVEL, level)}") val s = if (compressed == ZSTANDARD) { val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED) jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled) @@ -128,7 +129,7 @@ private[sql] object AvroUtils extends Logging { } jobConf.setInt(s"avro.mapred.$s.level", level.toInt) } else { - logInfo(s"Compressing Avro output using the $codecName codec") + logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec") } } case unknown => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index f2879ec806452..d4709db081fc8 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -25,7 +25,7 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, WAIT_RESULT_TIME, WAIT_SEND_TIME} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL} @@ -183,9 +183,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( */ def execute(lastConsumedStreamIndex: Long): Unit = { logInfo( - s"Starting for opId=${executeHolder.operationId}, " + - s"reattachable=${executeHolder.reattachable}, " + - s"lastConsumedStreamIndex=$lastConsumedStreamIndex") + log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " + + log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}") val startTime = System.nanoTime() var nextIndex = lastConsumedStreamIndex + 1 @@ -294,11 +294,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } } else if (streamFinished) { // Stream is finished and all responses have been sent - logInfo( - s"Stream finished for opId=${executeHolder.operationId}, " + - s"sent all responses up to last index ${nextIndex - 1}. " + - s"totalTime=${System.nanoTime - startTime}ns " + - s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns") + // scalastyle:off line.size.limit + logInfo(log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " + + log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + // scalastyle:on line.size.limit executionObserver.getError() match { case Some(t) => grpcObserver.onError(t) case None => grpcObserver.onCompleted() @@ -307,11 +309,14 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } else if (deadlineLimitReached) { // The stream is not complete, but should be finished now. // The client needs to reattach with ReattachExecute. - logInfo( - s"Deadline reached, shutting down stream for opId=${executeHolder.operationId} " + - s"after index ${nextIndex - 1}. " + - s"totalTime=${System.nanoTime - startTime}ns " + - s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns") + // scalastyle:off line.size.limit + logInfo(log"Deadline reached, shutting down stream for " + + log"opId=${MDC(OP_ID, executeHolder.operationId)} " + + log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " + + log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " + + log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms") + // scalastyle:on line.size.limit grpcObserver.onCompleted() finished = true } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index df1bbbf182a2e..30a899a2ac136 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -26,7 +26,8 @@ import io.grpc.stub.StreamObserver import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey import org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE import org.apache.spark.sql.connect.service.ExecuteHolder @@ -242,14 +243,16 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: /** Remove all cached responses */ def removeAll(): Unit = responseLock.synchronized { removeResponsesUntilIndex(lastProducedIndex) + // scalastyle:off line.size.limit logInfo( - s"Release all for opId=${executeHolder.operationId}. Execution stats: " + - s"total=${totalSize} " + - s"autoRemoved=${autoRemovedSize} " + - s"cachedUntilConsumed=$cachedSizeUntilHighestConsumed " + - s"cachedUntilProduced=$cachedSizeUntilLastProduced " + - s"maxCachedUntilConsumed=${cachedSizeUntilHighestConsumed.max} " + - s"maxCachedUntilProduced=${cachedSizeUntilLastProduced.max}") + log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " + + log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " + + log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " + + log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " + + log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " + + log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " + + log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}") + // scalastyle:on line.size.limit } /** Returns if the stream is finished. */ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index d8eb044e4f942..1ef4bbec3e039 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -38,7 +38,8 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.SESSION_ID import org.apache.spark.ml.{functions => MLFunctions} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest} import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, SparkSession} @@ -3131,7 +3132,9 @@ class SparkConnectPlanner( } } catch { case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any. - logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.") + logInfo( + log"Removing foreachBatch worker, query failed to start " + + log"for session ${MDC(SESSION_ID, sessionId)}.") foreachBatchRunnerCleaner.foreach(_.close()) throw ex } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index ce75ba3eb5986..ef5faac77e3e0 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.connect.service.SessionHolder import org.apache.spark.sql.connect.service.SparkConnectService @@ -63,7 +64,8 @@ object StreamingForeachBatchHelper extends Logging { sessionHolder: SessionHolder): ForeachBatchFnType = { (df: DataFrame, batchId: Long) => { val dfId = UUID.randomUUID().toString - logInfo(s"Caching DataFrame with id $dfId") // TODO: Add query id to the log. + // TODO: Add query id to the log. + logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}") // TODO(SPARK-44462): Sanity check there is no other active DataFrame for this query. // The query id needs to be saved in the cache for this check. @@ -72,7 +74,7 @@ object StreamingForeachBatchHelper extends Logging { try { fn(FnArgsWithId(dfId, df, batchId)) } finally { - logInfo(s"Removing DataFrame with id $dfId from the cache") + logInfo(log"Removing DataFrame with id ${MDC(DATAFRAME_ID, dfId)} from the cache") sessionHolder.removeCachedDataFrame(dfId) } } @@ -133,7 +135,9 @@ object StreamingForeachBatchHelper extends Logging { try { dataIn.readInt() match { case 0 => - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 0)") + logInfo( + log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( @@ -169,7 +173,9 @@ object StreamingForeachBatchHelper extends Logging { private lazy val streamingListener = { // Initialized on first registered query val listener = new StreamingRunnerCleanerListener sessionHolder.session.streams.addListener(listener) - logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}") + logInfo( + log"Registered runner clean up listener for " + + log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}") listener } @@ -195,7 +201,9 @@ object StreamingForeachBatchHelper extends Logging { private def cleanupStreamingRunner(key: CacheKey): Unit = { Option(cleanerCache.remove(key)).foreach { cleaner => - logInfo(s"Cleaning up runner for queryId ${key.queryId} runId ${key.runId}.") + logInfo( + log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " + + log"runId ${MDC(RUN_ID, key.runId)}.") cleaner.close() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala index 685991dbed876..74e9e32f208df 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala @@ -21,7 +21,8 @@ import java.io.EOFException import org.apache.spark.SparkException import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.FUNCTION_NAME import org.apache.spark.sql.connect.service.{SessionHolder, SparkConnectService} import org.apache.spark.sql.streaming.StreamingQueryListener @@ -82,7 +83,9 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder try { dataIn.readInt() match { case 0 => - logInfo(s"Streaming query listener function $functionName completed (ret: 0)") + logInfo( + log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " + + log"completed (ret: 0)") case SpecialLengths.PYTHON_EXCEPTION_THROWN => val msg = PythonWorkerUtils.readUTF(dataIn) throw new PythonException( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala index 2d848d3c8400a..c82cadbd5f7ab 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala @@ -28,7 +28,8 @@ import io.grpc.ServerCall import io.grpc.ServerCallHandler import io.grpc.ServerInterceptor -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{DESCRIPTION, MESSAGE} /** * A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. Useful for @@ -42,9 +43,11 @@ class LoggingInterceptor extends ServerInterceptor with Logging { private def logProto[T](description: String, message: T): Unit = { message match { case m: Message => - logInfo(s"$description:\n${jsonPrinter.print(m)}") + logInfo(log"${MDC(DESCRIPTION, description)}:\n${MDC(MESSAGE, jsonPrinter.print(m))}") case other => - logInfo(s"$description: (Unknown message type) $other") + logInfo( + log"${MDC(DESCRIPTION, description)}: " + + log"(Unknown message type) ${MDC(MESSAGE, other)}") } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 3dad57209982d..bb32ac1275fbe 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -32,7 +32,8 @@ import com.google.common.cache.{Cache, CacheBuilder} import org.apache.spark.{SparkEnv, SparkException, SparkSQLException} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.connect.proto -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -239,12 +240,16 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() - logInfo(s"Session $key accessed, time $lastAccessTimeMs.") + logInfo( + log"Session ${MDC(SESSION_KEY, key)} accessed, " + + log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs - logInfo(s"Session $key inactive timout set to $customInactiveTimeoutMs ms.") + logInfo( + log"Session ${MDC(SESSION_KEY, key)} " + + log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") } /** @@ -269,7 +274,9 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio if (closedTimeMs.isDefined) { throw new IllegalStateException(s"Session $key is already closed.") } - logInfo(s"Closing session with userId: $userId and sessionId: $sessionId") + logInfo( + log"Closing session with userId: ${MDC(USER_ID, userId)} and " + + log"sessionId: ${MDC(SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index e52cfe64a090e..4fe7f3eceb81a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -29,7 +29,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL} import org.apache.spark.util.ThreadUtils @@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { sessionHolder.addExecuteHolder(executeHolder) executions.put(executeHolder.key, executeHolder) lastExecutionTimeMs = None - logInfo(s"ExecuteHolder ${executeHolder.key} is created.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.") } schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started. @@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { if (executions.isEmpty) { lastExecutionTimeMs = Some(System.currentTimeMillis()) } - logInfo(s"ExecuteHolder $key is removed.") + logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.") } // close the execution outside the lock executeHolder.foreach { e => @@ -146,7 +146,8 @@ private[connect] class SparkConnectExecutionManager() extends Logging { } sessionExecutionHolders.foreach { case (_, executeHolder) => val info = executeHolder.getExecuteInfo - logInfo(s"Execution $info removed in removeSessionExecutions.") + logInfo( + log"Execution ${MDC(LogKey.EXECUTE_INFO, info)} removed in removeSessionExecutions.") removeExecuteHolder(executeHolder.key, abandoned = true) } } @@ -199,7 +200,9 @@ private[connect] class SparkConnectExecutionManager() extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL) - logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms") + logInfo( + log"Starting thread for cleanup of abandoned executions every " + + log"${MDC(LogKey.INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -238,7 +241,9 @@ private[connect] class SparkConnectExecutionManager() extends Logging { // .. and remove them. toRemove.foreach { executeHolder => val info = executeHolder.getExecuteInfo - logInfo(s"Found execution $info that was abandoned and expired and will be removed.") + logInfo( + log"Found execution ${MDC(LogKey.EXECUTE_INFO, info)} that was abandoned " + + log"and expired and will be removed.") removeExecuteHolder(executeHolder.key, abandoned = true) } logInfo("Finished periodic run of SparkConnectExecutionManager maintenance.") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index 3b42b58ae2afb..c55600886a393 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -21,7 +21,8 @@ import java.net.InetSocketAddress import scala.jdk.CollectionConverters._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT} import org.apache.spark.sql.SparkSession /** @@ -38,8 +39,8 @@ object SparkConnectServer extends Logging { SparkConnectService.server.getListenSockets.asScala.foreach { sa => val isa = sa.asInstanceOf[InetSocketAddress] logInfo( - s"Spark Connect server started at: " + - s"${isa.getAddress.getHostAddress}:${isa.getPort}") + log"Spark Connect server started at: " + + log"${MDC(HOST, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") } } catch { case e: Exception => diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 476254bc6e394..4b35971286ddf 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -35,7 +35,8 @@ import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc} import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.HOST import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} import org.apache.spark.sql.connect.execution.ConnectProgressExecutionListener @@ -346,7 +347,7 @@ object SparkConnectService extends Logging { val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) val sb = bindAddress match { case Some(hostname) => - logInfo(s"start GRPC service at: $hostname") + logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}") NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port)) case _ => NettyServerBuilder.forPort(port) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala index f8febbccfa6f3..1a34964932ef2 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala @@ -29,7 +29,8 @@ import scala.util.control.NonFatal import com.google.common.cache.CacheBuilder import org.apache.spark.{SparkEnv, SparkSQLException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{INTERVAL, SESSION_HOLD_INFO} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.config.Connect.{CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE, CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT, CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL} import org.apache.spark.util.ThreadUtils @@ -203,7 +204,9 @@ class SparkConnectSessionManager extends Logging { case Some(_) => // Already running. case None => val interval = SparkEnv.get.conf.get(CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL) - logInfo(s"Starting thread for cleanup of expired sessions every $interval ms") + logInfo( + log"Starting thread for cleanup of expired sessions every " + + log"${MDC(INTERVAL, interval)} ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -258,7 +261,9 @@ class SparkConnectSessionManager extends Logging { // Last chance - check expiration time and remove under lock if expired. val info = sessionHolder.getSessionHolderInfo if (shouldExpire(info, System.currentTimeMillis())) { - logInfo(s"Found session $info that expired and will be closed.") + logInfo( + log"Found session ${MDC(SESSION_HOLD_INFO, info)} that expired " + + log"and will be closed.") removeSessionHolder(info.key) } else { None diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 9690d10eba1a5..4c9b3baa689b3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{NEW_VALUE, OLD_VALUE, QUERY_ID} +import org.apache.spark.internal.LogKey.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} @@ -70,7 +70,9 @@ private[connect] class SparkConnectStreamingQueryCache( log"Query Id: ${MDC(QUERY_ID, query.id)}.Existing value ${MDC(OLD_VALUE, existing)}, " + log"new value ${MDC(NEW_VALUE, value)}.") case None => - logInfo(s"Adding new query to the cache. Query Id ${query.id}, value $value.") + logInfo( + log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + + log"value ${MDC(QUERY_CACHE_VALUE, value)}.") } schedulePeriodicChecks() // Starts the scheduler thread if it hasn't started. @@ -111,7 +113,9 @@ private[connect] class SparkConnectStreamingQueryCache( for ((k, v) <- queryCache) { if (v.userId.equals(sessionHolder.userId) && v.sessionId.equals(sessionHolder.sessionId)) { if (v.query.isActive && Option(v.session.streams.get(k.queryId)).nonEmpty) { - logInfo(s"Stopping the query with id ${k.queryId} since the session has timed out") + logInfo( + log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " + + log"since the session has timed out") try { v.query.stop() } catch { @@ -150,7 +154,9 @@ private[connect] class SparkConnectStreamingQueryCache( scheduledExecutor match { case Some(_) => // Already running. case None => - logInfo(s"Starting thread for polling streaming sessions every $sessionPollingPeriod") + logInfo( + log"Starting thread for polling streaming sessions " + + log"every ${MDC(DURATION, sessionPollingPeriod.toMillis)}") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { @@ -180,17 +186,23 @@ private[connect] class SparkConnectStreamingQueryCache( v.expiresAtMs match { case Some(ts) if nowMs >= ts => // Expired. Drop references. - logInfo(s"Removing references for $id in session ${v.sessionId} after expiry period") + logInfo( + log"Removing references for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} after expiry period") queryCache.remove(k) case Some(_) => // Inactive query waiting for expiration. Do nothing. - logInfo(s"Waiting for the expiration for $id in session ${v.sessionId}") + logInfo( + log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)}") case None => // Active query, check if it is stopped. Enable timeout if it is stopped. val isActive = v.query.isActive && Option(v.session.streams.get(id)).nonEmpty if (!isActive) { - logInfo(s"Marking query $id in session ${v.sessionId} inactive.") + logInfo( + log"Marking query ${MDC(QUERY_ID, id)} in " + + log"session ${MDC(SESSION_ID, v.sessionId)} inactive.") val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs))) // To consider: Clean up any runner registered for this query with the session holder diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index a53061bc1563f..b1bfe71930fb1 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -290,8 +290,8 @@ private[connect] object ErrorUtils extends Logging { if (events.isDefined) { // Errors thrown inside execution are user query errors, return then as INFO. logInfo( - s"Spark Connect error " + - s"during: $opType. UserId: $userId. SessionId: $sessionId.", + log"Spark Connect error during: ${MDC(OP_TYPE, opType)}. " + + log"UserId: ${MDC(USER_ID, userId)}. SessionId: ${MDC(SESSION_ID, sessionId)}.", original) } else { // Other errors are server RPC errors, return them as ERROR. diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala index 97c8592d1da6d..d3fe3264afe14 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.metric.CustomTaskMetric @@ -47,10 +48,13 @@ private[kafka010] object KafkaBatchReaderFactory extends PartitionReaderFactory val taskCtx = TaskContext.get() val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY) val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) - logInfo(s"Creating Kafka reader topicPartition=${p.offsetRange.topicPartition} " + - s"fromOffset=${p.offsetRange.fromOffset} untilOffset=${p.offsetRange.untilOffset}, " + - s"for query queryId=$queryId batchId=$batchId taskId=${TaskContext.get().taskAttemptId()} " + - s"partitionId=${TaskContext.get().partitionId()}") + logInfo(log"Creating Kafka reader " + + log"topicPartition=${MDC(TOPIC_PARTITION, p.offsetRange.topicPartition)} " + + log"fromOffset=${MDC(FROM_OFFSET, p.offsetRange.fromOffset)}} " + + log"untilOffset=${MDC(UNTIL_OFFSET, p.offsetRange.untilOffset)}, " + + log"for query queryId=${MDC(QUERY_ID, queryId)} batchId=${MDC(BATCH_ID, batchId)} " + + log"taskId=${MDC(TASK_ATTEMPT_ID, TaskContext.get().taskAttemptId())} " + + log"partitionId=${MDC(PARTITION_ID, TaskContext.get().partitionId())}") KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, p.failOnDataLoss, p.includeHeaders) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 9bf0a2e9e5133..e5e22243a5826 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.TaskContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, OFFSETS, TIP} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.read.InputPartition @@ -76,7 +76,7 @@ class KafkaContinuousStream( case GlobalTimestampRangeLimit(ts, strategy) => offsetReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy) } - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index be838ddc3c804..3313d42d1a30e 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, OFFSETS, TIP} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} @@ -256,7 +256,7 @@ private[kafka010] class KafkaMicroBatchStream( isStartingOffsets = true, strategy) } metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets }.partitionToOffsets } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 433da08176e74..5ed8576e88888 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -459,7 +459,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } - logInfo(s"Partitions added: $newPartitionInitialOffsets") + logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( s"Added partition $p starts from $o instead of 0. Some data may have been missed", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index 2ba4a9a563dfd..34d44fdf10591 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT} +import org.apache.spark.internal.LogKey.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -508,7 +508,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( () => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } - logInfo(s"Partitions added: $newPartitionInitialOffsets") + logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( s"Added partition $p starts from $o instead of 0. Some data may have been missed", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index ed3407c822b96..97b866067ea88 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.kafka010 -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TOPIC_PARTITIONS import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} @@ -69,8 +70,8 @@ private[kafka010] class KafkaRelation( kafkaOffsetReader.close() } - logInfo("GetBatch generating RDD of offset range: " + - offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + logInfo(log"GetBatch generating RDD of offset range: " + + log"${MDC(TOPIC_PARTITIONS, offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))}") // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays. val executorKafkaParams = diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala index 6ab4e91c53b37..5a75682f54f9a 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.BATCH_ID import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.streaming.Sink @@ -32,7 +33,7 @@ private[kafka010] class KafkaSink( override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= latestBatchId) { - logInfo(s"Skipping already committed batch $batchId") + logInfo(log"Skipping already committed batch ${MDC(BATCH_ID, batchId)}") } else { KafkaWriter.write(data.queryExecution, executorKafkaParams, topic) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 426672d2e4585..b0ab469690e2a 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{ERROR, TIP} +import org.apache.spark.internal.LogKey.{ERROR, FROM_OFFSET, OFFSETS, TIP, TOPIC_PARTITIONS, UNTIL_OFFSET} import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ @@ -128,7 +128,7 @@ private[kafka010] class KafkaSource( kafkaReader.fetchGlobalTimestampBasedOffsets(ts, isStartingOffsets = true, strategy) } metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") + logInfo(log"Initial offsets: ${MDC(OFFSETS, offsets)}") offsets }.partitionToOffsets } @@ -293,7 +293,8 @@ private[kafka010] class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - logInfo(s"GetBatch called with start = $start, end = $end") + logInfo(log"GetBatch called with start = ${MDC(FROM_OFFSET, start)}, " + + log"end = ${MDC(UNTIL_OFFSET, end)}") val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end) if (allDataForTriggerAvailableNow != null) { @@ -331,8 +332,8 @@ private[kafka010] class KafkaSource( .map(converter.toInternalRowWithoutHeaders) } - logInfo("GetBatch generating RDD of offset range: " + - offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + logInfo(log"GetBatch generating RDD of offset range: " + + log"${MDC(TOPIC_PARTITIONS, offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))}") sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema, isStreaming = true) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 5475864500941..4eb73e6d39f02 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -22,6 +22,8 @@ import java.{util => ju} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.LogKey.{FROM_OFFSET, PARTITION_ID, TOPIC} +import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer import org.apache.spark.storage.StorageLevel @@ -79,8 +81,8 @@ private[kafka010] class KafkaSourceRDD( s"for topic ${range.topic} partition ${range.partition}. " + "You either provided an invalid fromOffset, or the Kafka topic has been damaged") if (range.fromOffset == range.untilOffset) { - logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset " + - s"skipping ${range.topic} ${range.partition}") + logInfo(log"Beginning offset ${MDC(FROM_OFFSET, range.fromOffset)} is the same as ending " + + log"offset skipping ${MDC(TOPIC, range.topic)} ${MDC(PARTITION_ID, range.partition)}") consumer.release() Iterator.empty } else { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 3ea7d967744cf..72ceebb700d69 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -31,8 +31,9 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{Logging, MDC, MessageWithContext} -import org.apache.spark.internal.LogKey.{ERROR, GROUP_ID, OFFSET, RANGE, TIP, TOPIC_PARTITION, UNTIL_OFFSET} +import org.apache.spark.internal.LogKey._ import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS import org.apache.spark.sql.kafka010.KafkaExceptions import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} @@ -391,10 +392,12 @@ private[kafka010] class KafkaDataConsumer( .getOrElse("") val walTime = System.nanoTime() - startTimestampNano - logInfo( - s"From Kafka $kafkaMeta read $totalRecordsRead records through $numPolls polls (polled " + - s" out $numRecordsPolled records), taking $totalTimeReadNanos nanos, during time span of " + - s"$walTime nanos." + logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + + log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + + log"${MDC(KAFKA_PULLS_COUNT, numPolls)} polls " + + log"(polled out ${MDC(KAFKA_RECORDS_PULLED_COUNT, numRecordsPolled)} records), " + + log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / NANOS_PER_MILLIS.toDouble)} ms, " + + log"during time span of ${MDC(TIME, walTime / NANOS_PER_MILLIS.toDouble)} ms." ) releaseConsumer() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala index 83519de0d3b1e..afd426694d7b0 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala @@ -23,7 +23,8 @@ import scala.util.control.NonFatal import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PRODUCER_ID private[kafka010] class CachedKafkaProducer( val cacheKey: Seq[(String, Object)], @@ -32,7 +33,7 @@ private[kafka010] class CachedKafkaProducer( private[producer] def close(): Unit = { try { - logInfo(s"Closing the KafkaProducer with id: $id.") + logInfo(log"Closing the KafkaProducer with id: ${MDC(PRODUCER_ID, id)}.") producer.close() } catch { case NonFatal(e) => logWarning("Error while closing kafka producer.", e) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 8ec8f2556b9b9..068e3423cd26c 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -52,7 +52,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.ERROR +import org.apache.spark.internal.LogKey import org.apache.spark.kafka010.KafkaTokenUtil import org.apache.spark.util.{SecurityUtils, ShutdownHookManager, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -70,7 +70,7 @@ class KafkaTestUtils( private val JAVA_AUTH_CONFIG = "java.security.auth.login.config" private val localHostNameForURI = Utils.localHostNameForURI() - logInfo(s"Local host name is $localHostNameForURI") + logInfo(log"Local host name is ${MDC(LogKey.URI, localHostNameForURI)}") // MiniKDC uses canonical host name on host part, hence we need to provide canonical host name // on the 'host' part of the principal. @@ -333,7 +333,7 @@ class KafkaTestUtils( Utils.deleteRecursively(new File(f)) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } } @@ -654,13 +654,13 @@ class KafkaTestUtils( Utils.deleteRecursively(snapshotDir) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } try { Utils.deleteRecursively(logDir) } catch { case e: IOException if Utils.isWindows => - logWarning(log"${MDC(ERROR, e.getMessage)}") + logWarning(log"${MDC(LogKey.ERROR, e.getMessage)}") } System.clearProperty(ZOOKEEPER_AUTH_PROVIDER) } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index d3795a194dd47..86ee208496263 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{OFFSET, TIME, TOPIC_PARTITION, TOPIC_PARTITION_OFFSET_RANGE} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ @@ -177,7 +178,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset) acc + (tp -> off) }.foreach { case (tp, off) => - logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate") + logInfo(log"poll(0) returned messages, seeking ${MDC(TOPIC_PARTITION, tp)} to " + + log"${MDC(OFFSET, off)} to compensate") c.seek(tp, off) } } @@ -325,7 +327,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def restore(): Unit = { batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => - logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") + logInfo(log"Restoring KafkaRDD for time ${MDC(TIME, t)} " + + log"${MDC(TOPIC_PARTITION_OFFSET_RANGE, b.mkString("[", ", ", "]"))}") generatedRDDs += t -> new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 6b47e9d72f4b7..91df53c9e06bb 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{KEY, MAX_CAPACITY} +import org.apache.spark.internal.LogKey._ import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] sealed trait KafkaDataConsumer[K, V] { @@ -132,7 +132,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { logDebug(s"Get $groupId $topicPartition nextOffset $nextOffset requested $offset") if (offset != nextOffset) { - logInfo(s"Initial fetch for $groupId $topicPartition $offset") + logInfo(log"Initial fetch for ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(timeout) } @@ -145,7 +146,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( var record = buffer.next() if (record.offset != offset) { - logInfo(s"Buffer miss for $groupId $topicPartition $offset") + logInfo(log"Buffer miss for ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(timeout) require(buffer.hasNext(), @@ -169,7 +171,8 @@ private[kafka010] class InternalKafkaConsumer[K, V]( logDebug(s"compacted start $groupId $topicPartition starting $offset") // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics if (offset != nextOffset) { - logInfo(s"Initial fetch for compacted $groupId $topicPartition $offset") + logInfo(log"Initial fetch for compacted ${MDC(GROUP_ID, groupId)} " + + log"${MDC(TOPIC_PARTITION, topicPartition)} ${MDC(OFFSET, offset)}") seek(offset) poll(pollTimeoutMs) } @@ -240,7 +243,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { maxCapacity: Int, loadFactor: Float): Unit = synchronized { if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") + logInfo(log"Initializing cache ${MDC(INITIAL_CAPACITY, initialCapacity)} " + + log"${MDC(MAX_CAPACITY, maxCapacity)} ${MDC(LOAD_FACTOR, loadFactor)}") cache = new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer[_, _]]( initialCapacity, loadFactor, true) { override def removeEldestEntry( @@ -356,8 +360,8 @@ private[kafka010] object KafkaDataConsumer extends Logging { // at all. This may happen if the cache was invalidate while this consumer was being used. // Just close this consumer. internalConsumer.close() - logInfo(s"Released a supposedly cached consumer that was not found in the cache " + - s"$internalConsumer") + logInfo(log"Released a supposedly cached consumer that was not found in the cache " + + log"${MDC(CONSUMER, internalConsumer)}") } } } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 6c57091bc3c46..5bc89864cf0af 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -23,7 +23,8 @@ import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import org.apache.kafka.common.TopicPartition import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FROM_OFFSET, PARTITION_ID, TOPIC, UNTIL_OFFSET} import org.apache.spark.internal.config.Network._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD @@ -186,12 +187,13 @@ private[spark] class KafkaRDD[K, V]( val part = thePart.asInstanceOf[KafkaRDDPartition] require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { - logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + - s"skipping ${part.topic} ${part.partition}") + logInfo(log"Beginning offset ${MDC(FROM_OFFSET, part.fromOffset)} is the same as ending " + + log"offset skipping ${MDC(TOPIC, part.topic)} ${MDC(PARTITION_ID, part.partition)}") Iterator.empty } else { - logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") + logInfo(log"Computing topic ${MDC(TOPIC, part.topic)}, partition " + + log"${MDC(PARTITION_ID, part.partition)} offsets ${MDC(FROM_OFFSET, part.fromOffset)} " + + log"-> ${MDC(UNTIL_OFFSET, part.untilOffset)}") if (compacted) { new CompactedKafkaRDDIterator[K, V]( part, diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 4ddf1d9993ed6..47b03c2b75376 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -28,7 +28,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLib import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import com.amazonaws.services.kinesis.model.Record -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.WORKER_URL import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp @@ -209,7 +210,7 @@ private[kinesis] class KinesisReceiver[T]( workerThread.setDaemon(true) workerThread.start() - logInfo(s"Started receiver with workerId $workerId") + logInfo(log"Started receiver with workerId ${MDC(WORKER_URL, workerId)}") } /** @@ -225,7 +226,7 @@ private[kinesis] class KinesisReceiver[T]( } workerThread.join() workerThread = null - logInfo(s"Stopped receiver for workerId $workerId") + logInfo(log"Stopped receiver for workerId ${MDC(WORKER_URL, workerId)}") } workerId = null if (kinesisCheckpointer != null) { diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index 94e109680fbcc..8424dde7d9c40 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -27,7 +27,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason import com.amazonaws.services.kinesis.model.Record import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{RETRY_INTERVAL, SHARD_ID, WORKER_URL} +import org.apache.spark.internal.LogKey.{REASON, RETRY_INTERVAL, SHARD_ID, WORKER_URL} /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. @@ -54,7 +54,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w */ override def initialize(shardId: String): Unit = { this.shardId = shardId - logInfo(s"Initialized workerId $workerId with shardId $shardId") + logInfo(log"Initialized workerId ${MDC(WORKER_URL, workerId)} " + + log"with shardId ${MDC(SHARD_ID, shardId)}") } /** @@ -99,8 +100,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w } } else { /* RecordProcessor has been stopped. */ - logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" + - s" and shardId $shardId. No more records will be processed.") + logInfo(log"Stopped: KinesisReceiver has stopped for workerId ${MDC(WORKER_URL, workerId)}" + + log" and shardId ${MDC(SHARD_ID, shardId)}. No more records will be processed.") } } @@ -117,7 +118,8 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w override def shutdown( checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit = { - logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") + logInfo(log"Shutdown: Shutting down workerId ${MDC(WORKER_URL, workerId)} " + + log"with reason ${MDC(REASON, reason)}") // null if not initialized before shutdown: if (shardId == null) { logWarning(log"No shardId for workerId ${MDC(WORKER_URL, workerId)}?") diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 558f6e44a6384..15ffbbd9d730c 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey.PATH +import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.util.ThreadUtils @@ -108,7 +109,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex if (fs.exists(filenamePath)) { fs.delete(filenamePath, true) } - logInfo(s"Copying executor profiling file to $profileOutputFile") + logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}") inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") threadpool.scheduleWithFixedDelay( diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala index e144092cdecd2..fb9abfe59aa78 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -23,7 +23,8 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey.EXECUTOR_ID +import org.apache.spark.internal.{Logging, MDC} /** @@ -52,7 +53,8 @@ class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { if (codeProfilingEnabled) { codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION) if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { - logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling") + logInfo(log"Executor id ${MDC(EXECUTOR_ID, pluginCtx.executorID())} " + + log"selected for JVM code profiling") profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) profiler.start() } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 6cf240f12a1ca..d38f94fd1ac26 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, HOST_PORT} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -226,7 +226,7 @@ private class ClientEndpoint( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(log"Error connecting to master ${MDC(RPC_ADDRESS, remoteAddress)}.") + logError(log"Error connecting to master ${MDC(HOST_PORT, remoteAddress)}.") lostMasters += remoteAddress // Note that this heuristic does not account for the fact that a Master can recover within // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This @@ -240,7 +240,7 @@ private class ClientEndpoint( override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { if (!lostMasters.contains(remoteAddress)) { - logError(log"Error connecting to master (${MDC(RPC_ADDRESS, remoteAddress)}).", cause) + logError(log"Error connecting to master (${MDC(HOST_PORT, remoteAddress)}).", cause) lostMasters += remoteAddress if (lostMasters.size >= masterEndpoints.size) { logError("No master is available, exiting.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bf7f91ea7ce31..eb944244fc9da 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_PORT} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -858,7 +858,7 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + - log"Shutting down. ${MDC(RPC_ADDRESS, remoteAddress)}") + log"Shutting down. ${MDC(HOST_PORT, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 8404785d8e0b0..d7f285aeb892b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{EXECUTOR_ID, REASON, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_PORT, REASON} import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -307,7 +307,7 @@ private[spark] abstract class YarnSchedulerBackend( case NonFatal(e) => logWarning(log"Attempted to get executor loss reason for executor id " + log"${MDC(EXECUTOR_ID, executorId)} at RPC address " + - log"${MDC(RPC_ADDRESS, executorRpcAddress)}, but got no response. " + + log"${MDC(HOST_PORT, executorRpcAddress)}, but got no response. " + log"Marking as agent lost.", e) RemoveExecutor(executorId, ExecutorProcessLost()) }(ThreadUtils.sameThread) @@ -395,7 +395,7 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(log"ApplicationMaster has disassociated: ${MDC(RPC_ADDRESS, remoteAddress)}") + logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_PORT, remoteAddress)}") amEndpoint = None } }