-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47594] Connector module: Migrate logInfo with variables to structured logging framework #46022
Conversation
…uctured logging framework
s"Spark Connect server started at: " + | ||
s"${isa.getAddress.getHostAddress}:${isa.getPort}") | ||
log"Spark Connect server started at: " + | ||
log"${MDC(RPC_ADDRESS, isa.getAddress.getHostAddress)}:${MDC(PORT, isa.getPort)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if calling it HOST_PORT
would be more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way seems fine. Or, we can consider unifying them.
@@ -70,7 +70,7 @@ class KafkaTestUtils( | |||
private val JAVA_AUTH_CONFIG = "java.security.auth.login.config" | |||
|
|||
private val localHostNameForURI = Utils.localHostNameForURI() | |||
logInfo(s"Local host name is $localHostNameForURI") | |||
logInfo(log"Local host name is ${MDC(LogKey.URI, localHostNameForURI)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the LogKey URI
is duplicated with the Java class
, we will write it directly as LogKey.XXX
At the same time, we will delete the import
import org.apache.spark.internal.LogKey.ERROR
@@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { | |||
sessionHolder.addExecuteHolder(executeHolder) | |||
executions.put(executeHolder.key, executeHolder) | |||
lastExecutionTimeMs = None | |||
logInfo(s"ExecuteHolder ${executeHolder.key} is created.") | |||
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, executeHolder.key)} is created.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, executeHolder.key)} is created.") | |
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.") |
@@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { | |||
if (executions.isEmpty) { | |||
lastExecutionTimeMs = Some(System.currentTimeMillis()) | |||
} | |||
logInfo(s"ExecuteHolder $key is removed.") | |||
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, key)} is removed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_HOLDER_KEY, key)} is removed.") | |
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.") |
logInfo(s"Adding new query to the cache. Query Id ${query.id}, value $value.") | ||
logInfo( | ||
log"Adding new query to the cache. Query Id ${MDC(QUERY_ID, query.id)}, " + | ||
log"value ${MDC(QUERY_CACHE, value)}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log"value ${MDC(QUERY_CACHE, value)}.") | |
log"value ${MDC(QUERY_CACHE_VALUE, value)}.") |
s"$walTime nanos." | ||
logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + | ||
log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + | ||
log"${MDC(COUNT_POLL, numPolls)} polls " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KAFKA_PULLS_COUNT?
logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + | ||
log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + | ||
log"${MDC(COUNT_POLL, numPolls)} polls " + | ||
log"(polled out ${MDC(COUNT_RECORDS_POLL, numRecordsPolled)} records), " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KAFKA_RECORDS_PULLED_COUNT
@@ -325,7 +327,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( | |||
|
|||
override def restore(): Unit = { | |||
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => | |||
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") | |||
logInfo(log"Restoring KafkaRDD for time ${MDC(TIME, t)} " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: is the time here using ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Here t
is an instance of the Time
, and Time
defaults to outputting a time unit of ms
, as follows:
override def toString: String = (millis.toString + " ms") |
@panbingkun Thanks for the works. LGTM except for some minor comments. |
@gengliangwang |
Thanks, merging to master |
@@ -25,7 +25,8 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} | |||
|
|||
import org.apache.spark.SparkConf | |||
import org.apache.spark.deploy.SparkHadoopUtil | |||
import org.apache.spark.internal.Logging | |||
import org.apache.spark.internal.LogKey.PATH | |||
import org.apache.spark.internal.{Logging, MDC} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this import ordering issue was missed because dev/scalastyle
didn't include this module.
Here is the fix for dev/scalastyle
and this.
@@ -23,7 +23,8 @@ import scala.util.Random | |||
|
|||
import org.apache.spark.SparkConf | |||
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} | |||
import org.apache.spark.internal.Logging | |||
import org.apache.spark.internal.LogKey.EXECUTOR_ID | |||
import org.apache.spark.internal.{Logging, MDC} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Import ordering issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for fix it.
… `jvm-profiler` modules ### What changes were proposed in this pull request? This PR aims to fix `dev/scalastyle` to check `hadoop-cloud` and `jam-profiler` modules. Also, the detected scalastyle issues are fixed. ### Why are the changes needed? To prevent future scalastyle issues. Scala style violation was introduced here, but we missed because we didn't check all optional modules. - #46022 `jvm-profiler` module was added newly at Apache Spark 4.0.0 but we missed to add this to `dev/scalastyle`. Note that there was no scala style issues in that `module` at that time. - #44021 `hadoop-cloud` module was added at Apache Spark 2.3.0. - #17834 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with newly revised `dev/scalastyle`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46376 from dongjoon-hyun/SPARK-48127. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… `jvm-profiler` modules ### What changes were proposed in this pull request? This PR aims to fix `dev/scalastyle` to check `hadoop-cloud` and `jam-profiler` modules. Also, the detected scalastyle issues are fixed. ### Why are the changes needed? To prevent future scalastyle issues. Scala style violation was introduced here, but we missed because we didn't check all optional modules. - apache#46022 `jvm-profiler` module was added newly at Apache Spark 4.0.0 but we missed to add this to `dev/scalastyle`. Note that there was no scala style issues in that `module` at that time. - apache#44021 `hadoop-cloud` module was added at Apache Spark 2.3.0. - apache#17834 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with newly revised `dev/scalastyle`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46376 from dongjoon-hyun/SPARK-48127. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… `jvm-profiler` modules ### What changes were proposed in this pull request? This PR aims to fix `dev/scalastyle` to check `hadoop-cloud` and `jam-profiler` modules. Also, the detected scalastyle issues are fixed. ### Why are the changes needed? To prevent future scalastyle issues. Scala style violation was introduced here, but we missed because we didn't check all optional modules. - apache#46022 `jvm-profiler` module was added newly at Apache Spark 4.0.0 but we missed to add this to `dev/scalastyle`. Note that there was no scala style issues in that `module` at that time. - apache#44021 `hadoop-cloud` module was added at Apache Spark 2.3.0. - apache#17834 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with newly revised `dev/scalastyle`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46376 from dongjoon-hyun/SPARK-48127. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
The pr aims to migrate
logInfo
in moduleConnector
with variables tostructured logging framework
.Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No.