Skip to content

Commit

Permalink
[SPARK-47594] Connector module: Migrate logInfo with variables to str…
Browse files Browse the repository at this point in the history
…uctured logging framework

### What changes were proposed in this pull request?
The pr aims to migrate `logInfo` in module `Connector` with variables to `structured logging framework`.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46022 from panbingkun/SPARK-47594.

Lead-authored-by: panbingkun <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
panbingkun and gengliangwang committed Apr 16, 2024
1 parent 9a1fc11 commit 6919feb
Show file tree
Hide file tree
Showing 37 changed files with 257 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ object LogKey extends Enumeration {
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
val BATCH_ID = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
val BUCKET = Value
val BYTECODE_SIZE = Value
val CACHE_AUTO_REMOVED_SIZE = Value
val CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
val CACHE_UNTIL_LAST_PRODUCED_SIZE = Value
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val CLUSTER_ID = Value
val CODEC_LEVEL = Value
val CODEC_NAME = Value
val COLUMN_DATA_TYPE_SOURCE = Value
val COLUMN_DATA_TYPE_TARGET = Value
val COLUMN_DEFAULT_VALUE = Value
Expand All @@ -47,6 +53,7 @@ object LogKey extends Enumeration {
val CONFIG3 = Value
val CONFIG4 = Value
val CONFIG5 = Value
val CONSUMER = Value
val CONTAINER = Value
val CONTAINER_ID = Value
val COUNT = Value
Expand All @@ -59,13 +66,18 @@ object LogKey extends Enumeration {
val DATA = Value
val DATABASE_NAME = Value
val DATAFRAME_CACHE_ENTRY = Value
val DATAFRAME_ID = Value
val DESCRIPTION = Value
val DRIVER_ID = Value
val DROPPED_PARTITIONS = Value
val DURATION = Value
val END_POINT = Value
val ENGINE = Value
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXECUTE_INFO = Value
val EXECUTE_KEY = Value
val EXECUTOR_ENV_REGEX = Value
val EXECUTOR_ID = Value
val EXECUTOR_IDS = Value
Expand All @@ -77,32 +89,42 @@ object LogKey extends Enumeration {
val FIELD_NAME = Value
val FILE_FORMAT = Value
val FILE_FORMAT2 = Value
val FROM_OFFSET = Value
val FUNCTION_NAME = Value
val FUNCTION_PARAMETER = Value
val GROUP_ID = Value
val HADOOP_VERSION = Value
val HIVE_OPERATION_STATE = Value
val HIVE_OPERATION_TYPE = Value
val HOST = Value
val HOST_PORT = Value
val INDEX = Value
val INFERENCE_MODE = Value
val INITIAL_CAPACITY = Value
val INTERVAL = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val JOIN_CONDITION_SUB_EXPRESSION = Value
val KAFKA_PULLS_COUNT = Value
val KAFKA_RECORDS_PULLED_COUNT = Value
val KEY = Value
val LAST_ACCESS_TIME = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
val LISTENER = Value
val LOAD_FACTOR = Value
val LOG_TYPE = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE = Value
val MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE = Value
val MAX_CAPACITY = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MERGE_DIR_NAME = Value
val MESSAGE = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
val NEW_VALUE = Value
Expand All @@ -129,6 +151,7 @@ object LogKey extends Enumeration {
val POLICY = Value
val PORT = Value
val PRODUCER_ID = Value
val QUERY_CACHE_VALUE = Value
val QUERY_HINT = Value
val QUERY_ID = Value
val QUERY_PLAN = Value
Expand All @@ -138,21 +161,24 @@ object LogKey extends Enumeration {
val RANGE = Value
val RDD_ID = Value
val REASON = Value
val REATTACHABLE = Value
val RECEIVED_BLOCK_INFO = Value
val REDUCE_ID = Value
val RELATION_NAME = Value
val REMAINING_PARTITIONS = Value
val RESOURCE_NAME = Value
val RETRY_COUNT = Value
val RETRY_INTERVAL = Value
val RPC_ADDRESS = Value
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
val RUN_ID = Value
val SCHEMA = Value
val SCHEMA2 = Value
val SERVICE_NAME = Value
val SESSION_HOLD_INFO = Value
val SESSION_ID = Value
val SESSION_KEY = Value
val SHARD_ID = Value
val SHUFFLE_BLOCK_INFO = Value
val SHUFFLE_ID = Value
Expand All @@ -176,12 +202,20 @@ object LogKey extends Enumeration {
val THREAD = Value
val THREAD_NAME = Value
val TID = Value
val TIME = Value
val TIMEOUT = Value
val TIME_UNITS = Value
val TIP = Value
val TOPIC = Value
val TOPIC_PARTITION = Value
val TOPIC_PARTITIONS = Value
val TOPIC_PARTITION_OFFSET = Value
val TOPIC_PARTITION_OFFSET_RANGE = Value
val TOTAL_EFFECTIVE_TIME = Value
val TOTAL_RECORDS_READ = Value
val TOTAL_SIZE = Value
val TOTAL_TIME = Value
val TOTAL_TIME_READ = Value
val UNSUPPORTED_EXPRESSION = Value
val UNSUPPORTED_HINT_REASON = Value
val UNTIL_OFFSET = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, PATH}
import org.apache.spark.internal.LogKey.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
Expand Down Expand Up @@ -118,7 +118,8 @@ private[sql] object AvroUtils extends Logging {
if (compressed.getSupportCompressionLevel) {
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(s"Compressing Avro output using the $codecName codec at level $level")
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " +
log"at level ${MDC(CODEC_LEVEL, level)}")
val s = if (compressed == ZSTANDARD) {
val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled)
Expand All @@ -128,7 +129,7 @@ private[sql] object AvroUtils extends Logging {
}
jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec")
}
}
case unknown =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{INDEX, OP_ID, TOTAL_TIME, WAIT_RESULT_TIME, WAIT_SEND_TIME}
import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL}
Expand Down Expand Up @@ -183,9 +183,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
*/
def execute(lastConsumedStreamIndex: Long): Unit = {
logInfo(
s"Starting for opId=${executeHolder.operationId}, " +
s"reattachable=${executeHolder.reattachable}, " +
s"lastConsumedStreamIndex=$lastConsumedStreamIndex")
log"Starting for opId=${MDC(OP_ID, executeHolder.operationId)}, " +
log"reattachable=${MDC(REATTACHABLE, executeHolder.reattachable)}, " +
log"lastConsumedStreamIndex=${MDC(STREAM_ID, lastConsumedStreamIndex)}")
val startTime = System.nanoTime()

var nextIndex = lastConsumedStreamIndex + 1
Expand Down Expand Up @@ -294,11 +294,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
}
} else if (streamFinished) {
// Stream is finished and all responses have been sent
logInfo(
s"Stream finished for opId=${executeHolder.operationId}, " +
s"sent all responses up to last index ${nextIndex - 1}. " +
s"totalTime=${System.nanoTime - startTime}ns " +
s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns")
// scalastyle:off line.size.limit
logInfo(log"Stream finished for opId=${MDC(OP_ID, executeHolder.operationId)}, " +
log"sent all responses up to last index ${MDC(STREAM_ID, nextIndex - 1)}. " +
log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms")
// scalastyle:on line.size.limit
executionObserver.getError() match {
case Some(t) => grpcObserver.onError(t)
case None => grpcObserver.onCompleted()
Expand All @@ -307,11 +309,14 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
} else if (deadlineLimitReached) {
// The stream is not complete, but should be finished now.
// The client needs to reattach with ReattachExecute.
logInfo(
s"Deadline reached, shutting down stream for opId=${executeHolder.operationId} " +
s"after index ${nextIndex - 1}. " +
s"totalTime=${System.nanoTime - startTime}ns " +
s"waitingForResults=${consumeSleep}ns waitingForSend=${sendSleep}ns")
// scalastyle:off line.size.limit
logInfo(log"Deadline reached, shutting down stream for " +
log"opId=${MDC(OP_ID, executeHolder.operationId)} " +
log"after index ${MDC(STREAM_ID, nextIndex - 1)}. " +
log"totalTime=${MDC(TOTAL_TIME, (System.nanoTime - startTime) / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForResults=${MDC(WAIT_RESULT_TIME, consumeSleep / NANOS_PER_MILLIS.toDouble)} ms " +
log"waitingForSend=${MDC(WAIT_SEND_TIME, sendSleep / NANOS_PER_MILLIS.toDouble)} ms")
// scalastyle:on line.size.limit
grpcObserver.onCompleted()
finished = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import io.grpc.stub.StreamObserver

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey
import org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE
import org.apache.spark.sql.connect.service.ExecuteHolder

Expand Down Expand Up @@ -242,14 +243,16 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
/** Remove all cached responses */
def removeAll(): Unit = responseLock.synchronized {
removeResponsesUntilIndex(lastProducedIndex)
// scalastyle:off line.size.limit
logInfo(
s"Release all for opId=${executeHolder.operationId}. Execution stats: " +
s"total=${totalSize} " +
s"autoRemoved=${autoRemovedSize} " +
s"cachedUntilConsumed=$cachedSizeUntilHighestConsumed " +
s"cachedUntilProduced=$cachedSizeUntilLastProduced " +
s"maxCachedUntilConsumed=${cachedSizeUntilHighestConsumed.max} " +
s"maxCachedUntilProduced=${cachedSizeUntilLastProduced.max}")
log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " +
log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " +
log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " +
log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " +
log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " +
log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " +
log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}")
// scalastyle:on line.size.limit
}

/** Returns if the stream is finished. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.SESSION_ID
import org.apache.spark.ml.{functions => MLFunctions}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, SparkSession}
Expand Down Expand Up @@ -3131,7 +3132,9 @@ class SparkConnectPlanner(
}
} catch {
case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.")
logInfo(
log"Removing foreachBatch worker, query failed to start " +
log"for session ${MDC(SESSION_ID, sessionId)}.")
foreachBatchRunnerCleaner.foreach(_.close())
throw ex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.connect.service.SparkConnectService
Expand Down Expand Up @@ -63,7 +64,8 @@ object StreamingForeachBatchHelper extends Logging {
sessionHolder: SessionHolder): ForeachBatchFnType = { (df: DataFrame, batchId: Long) =>
{
val dfId = UUID.randomUUID().toString
logInfo(s"Caching DataFrame with id $dfId") // TODO: Add query id to the log.
// TODO: Add query id to the log.
logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}")

// TODO(SPARK-44462): Sanity check there is no other active DataFrame for this query.
// The query id needs to be saved in the cache for this check.
Expand All @@ -72,7 +74,7 @@ object StreamingForeachBatchHelper extends Logging {
try {
fn(FnArgsWithId(dfId, df, batchId))
} finally {
logInfo(s"Removing DataFrame with id $dfId from the cache")
logInfo(log"Removing DataFrame with id ${MDC(DATAFRAME_ID, dfId)} from the cache")
sessionHolder.removeCachedDataFrame(dfId)
}
}
Expand Down Expand Up @@ -133,7 +135,9 @@ object StreamingForeachBatchHelper extends Logging {
try {
dataIn.readInt() match {
case 0 =>
logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 0)")
logInfo(
log"Python foreach batch for dfId ${MDC(DATAFRAME_ID, args.dfId)} " +
log"completed (ret: 0)")
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw new PythonException(
Expand Down Expand Up @@ -169,7 +173,9 @@ object StreamingForeachBatchHelper extends Logging {
private lazy val streamingListener = { // Initialized on first registered query
val listener = new StreamingRunnerCleanerListener
sessionHolder.session.streams.addListener(listener)
logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
logInfo(
log"Registered runner clean up listener for " +
log"session ${MDC(SESSION_ID, sessionHolder.sessionId)}")
listener
}

Expand All @@ -195,7 +201,9 @@ object StreamingForeachBatchHelper extends Logging {

private def cleanupStreamingRunner(key: CacheKey): Unit = {
Option(cleanerCache.remove(key)).foreach { cleaner =>
logInfo(s"Cleaning up runner for queryId ${key.queryId} runId ${key.runId}.")
logInfo(
log"Cleaning up runner for queryId ${MDC(QUERY_ID, key.queryId)} " +
log"runId ${MDC(RUN_ID, key.runId)}.")
cleaner.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import java.io.EOFException

import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.FUNCTION_NAME
import org.apache.spark.sql.connect.service.{SessionHolder, SparkConnectService}
import org.apache.spark.sql.streaming.StreamingQueryListener

Expand Down Expand Up @@ -82,7 +83,9 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder
try {
dataIn.readInt() match {
case 0 =>
logInfo(s"Streaming query listener function $functionName completed (ret: 0)")
logInfo(
log"Streaming query listener function ${MDC(FUNCTION_NAME, functionName)} " +
log"completed (ret: 0)")
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
val msg = PythonWorkerUtils.readUTF(dataIn)
throw new PythonException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DESCRIPTION, MESSAGE}

/**
* A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. Useful for
Expand All @@ -42,9 +43,11 @@ class LoggingInterceptor extends ServerInterceptor with Logging {
private def logProto[T](description: String, message: T): Unit = {
message match {
case m: Message =>
logInfo(s"$description:\n${jsonPrinter.print(m)}")
logInfo(log"${MDC(DESCRIPTION, description)}:\n${MDC(MESSAGE, jsonPrinter.print(m))}")
case other =>
logInfo(s"$description: (Unknown message type) $other")
logInfo(
log"${MDC(DESCRIPTION, description)}: " +
log"(Unknown message type) ${MDC(MESSAGE, other)}")
}
}

Expand Down
Loading

0 comments on commit 6919feb

Please sign in to comment.