Skip to content

Commit

Permalink
Allow to remove JMX Reporting through configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Jan 17, 2020
1 parent da70746 commit c0c90aa
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object DefaultConnectionFactory extends CassandraConnectionFactory {
.setConnectTimeoutMillis(conf.connectTimeoutMillis)
.setReadTimeoutMillis(conf.readTimeoutMillis)

val builder = Cluster.builder()
var builder = Cluster.builder()
.addContactPoints(conf.hosts.toSeq: _*)
.withPort(conf.port)
.withRetryPolicy(
Expand All @@ -58,6 +58,8 @@ object DefaultConnectionFactory extends CassandraConnectionFactory {
.setRefreshNodeListIntervalMillis(0)
.setRefreshSchemaIntervalMillis(0))

if (!conf.jmxEnabled) builder = builder.withoutJMXReporting()

if (conf.cassandraSSLConf.enabled) {
maybeCreateSSLOptions(conf.cassandraSSLConf) match {
case Some(sslOptions) builder.withSSL(sslOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ case class CassandraConnectorConf(
connectTimeoutMillis: Int = CassandraConnectorConf.ConnectionTimeoutParam.default,
readTimeoutMillis: Int = CassandraConnectorConf.ReadTimeoutParam.default,
connectionFactory: CassandraConnectionFactory = DefaultConnectionFactory,
cassandraSSLConf: CassandraConnectorConf.CassandraSSLConf = CassandraConnectorConf.DefaultCassandraSSLConf
cassandraSSLConf: CassandraConnectorConf.CassandraSSLConf = CassandraConnectorConf.DefaultCassandraSSLConf,
jmxEnabled: Boolean = true
) {

@transient
Expand Down Expand Up @@ -87,6 +88,12 @@ object CassandraConnectorConf extends Logging {
default = 9042,
description = """Cassandra native connection port""")

val JmxEnabledParam = ConfigParameter[Boolean](
name = "spark.cassandra.connection.jmxEnabled",
section = ReferenceSection,
default = true,
description = """Cassandra JMX Reporting""")

val LocalDCParam = ConfigParameter[Option[String]](
name = "spark.cassandra.connection.local_dc",
section = ReferenceSection,
Expand Down Expand Up @@ -219,6 +226,7 @@ object CassandraConnectorConf extends Logging {
ConnectionPortParam,
LocalDCParam,
ConnectionTimeoutParam,
JmxEnabledParam,
KeepAliveMillisParam,
MinReconnectionDelayParam,
MaxReconnectionDelayParam,
Expand Down Expand Up @@ -254,10 +262,11 @@ object CassandraConnectorConf extends Logging {
hostName <- hostsStr.split(",").toSet[String]
hostAddress <- resolveHost(hostName.trim)
} yield hostAddress

val port = conf.getInt(ConnectionPortParam.name, ConnectionPortParam.default)

val authConf = AuthConf.fromSparkConf(conf)
val jmxEnabled = conf.getOption(JmxEnabledParam.name).map(_.toBoolean).getOrElse(JmxEnabledParam.default)
val keepAlive = conf.getInt(KeepAliveMillisParam.name, KeepAliveMillisParam.default)

val localDC = conf.getOption(LocalDCParam.name)
Expand Down Expand Up @@ -312,7 +321,8 @@ object CassandraConnectorConf extends Logging {
connectTimeoutMillis = connectTimeout,
readTimeoutMillis = readTimeout,
connectionFactory = connectionFactory,
cassandraSSLConf = cassandraSSLConf
cassandraSSLConf = cassandraSSLConf,
jmxEnabled = jmxEnabled
)
}
}

0 comments on commit c0c90aa

Please sign in to comment.