diff --git a/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala b/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala index d659a67..431d170 100644 --- a/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala @@ -42,6 +42,7 @@ class KafkaServer extends BrokerServer { BrokerServer.Distro.configureLog4j(broker.log4jOptions) val options = broker.options(defaults) + BrokerServer.Distro.startReporters(options) logger.info("Starting KafkaServer") server = BrokerServer.Distro.newServer(options) @@ -80,12 +81,19 @@ object BrokerServer { val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] val server: Object = serverClass.getConstructor(configClass).newInstance(config).asInstanceOf[Object] + server + } + + def startReporters(options: util.Map[String, String]): Object = { + val configClass = loader.loadClass("kafka.server.KafkaConfig") + + val props: Properties = this.props(options, "server.properties") + val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] + val metricsReporter = loader.loadClass("kafka.metrics.KafkaMetricsReporter$").getField("MODULE$").get(null) val metricsReporterClass = metricsReporter.getClass val verifiableProps = config.getClass.getMethod("props").invoke(config) metricsReporterClass.getMethod("startReporters", verifiableProps.getClass).invoke(metricsReporter, verifiableProps) - - server } def configureLog4j(options: util.Map[String, String]): Unit = {