From c0c90aa328a7242c60fd099d8515a9635a96d907 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Fri, 17 Jan 2020 01:07:10 +0100 Subject: [PATCH] Allow to remove JMX Reporting through configuration --- .../cql/CassandraConnectionFactory.scala | 4 +++- .../connector/cql/CassandraConnectorConf.scala | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala index ef0aa32bf..1ec5de57e 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala @@ -40,7 +40,7 @@ object DefaultConnectionFactory extends CassandraConnectionFactory { .setConnectTimeoutMillis(conf.connectTimeoutMillis) .setReadTimeoutMillis(conf.readTimeoutMillis) - val builder = Cluster.builder() + var builder = Cluster.builder() .addContactPoints(conf.hosts.toSeq: _*) .withPort(conf.port) .withRetryPolicy( @@ -58,6 +58,8 @@ object DefaultConnectionFactory extends CassandraConnectionFactory { .setRefreshNodeListIntervalMillis(0) .setRefreshSchemaIntervalMillis(0)) + if (!conf.jmxEnabled) builder = builder.withoutJMXReporting() + if (conf.cassandraSSLConf.enabled) { maybeCreateSSLOptions(conf.cassandraSSLConf) match { case Some(sslOptions) ⇒ builder.withSSL(sslOptions) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala index eda69d5e2..1896e3863 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala @@ -27,7 +27,8 @@ case class CassandraConnectorConf( connectTimeoutMillis: Int = CassandraConnectorConf.ConnectionTimeoutParam.default, readTimeoutMillis: Int = CassandraConnectorConf.ReadTimeoutParam.default, connectionFactory: CassandraConnectionFactory = DefaultConnectionFactory, - cassandraSSLConf: CassandraConnectorConf.CassandraSSLConf = CassandraConnectorConf.DefaultCassandraSSLConf + cassandraSSLConf: CassandraConnectorConf.CassandraSSLConf = CassandraConnectorConf.DefaultCassandraSSLConf, + jmxEnabled: Boolean = true ) { @transient @@ -87,6 +88,12 @@ object CassandraConnectorConf extends Logging { default = 9042, description = """Cassandra native connection port""") + val JmxEnabledParam = ConfigParameter[Boolean]( + name = "spark.cassandra.connection.jmxEnabled", + section = ReferenceSection, + default = true, + description = """Cassandra JMX Reporting""") + val LocalDCParam = ConfigParameter[Option[String]]( name = "spark.cassandra.connection.local_dc", section = ReferenceSection, @@ -219,6 +226,7 @@ object CassandraConnectorConf extends Logging { ConnectionPortParam, LocalDCParam, ConnectionTimeoutParam, + JmxEnabledParam, KeepAliveMillisParam, MinReconnectionDelayParam, MaxReconnectionDelayParam, @@ -254,10 +262,11 @@ object CassandraConnectorConf extends Logging { hostName <- hostsStr.split(",").toSet[String] hostAddress <- resolveHost(hostName.trim) } yield hostAddress - + val port = conf.getInt(ConnectionPortParam.name, ConnectionPortParam.default) val authConf = AuthConf.fromSparkConf(conf) + val jmxEnabled = conf.getOption(JmxEnabledParam.name).map(_.toBoolean).getOrElse(JmxEnabledParam.default) val keepAlive = conf.getInt(KeepAliveMillisParam.name, KeepAliveMillisParam.default) val localDC = conf.getOption(LocalDCParam.name) @@ -312,7 +321,8 @@ object CassandraConnectorConf extends Logging { connectTimeoutMillis = connectTimeout, readTimeoutMillis = readTimeout, connectionFactory = connectionFactory, - cassandraSSLConf = cassandraSSLConf + cassandraSSLConf = cassandraSSLConf, + jmxEnabled = jmxEnabled ) } }