Skip to content

Commit

Permalink
extracted method startReporters
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrypekar committed Jul 29, 2015
1 parent 4ac4f67 commit def0611
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/scala/ly/stealth/mesos/kafka/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class KafkaServer extends BrokerServer {

BrokerServer.Distro.configureLog4j(broker.log4jOptions)
val options = broker.options(defaults)
BrokerServer.Distro.startReporters(options)

logger.info("Starting KafkaServer")
server = BrokerServer.Distro.newServer(options)
Expand Down Expand Up @@ -80,12 +81,19 @@ object BrokerServer {
val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object]
val server: Object = serverClass.getConstructor(configClass).newInstance(config).asInstanceOf[Object]

server
}

def startReporters(options: util.Map[String, String]): Object = {
val configClass = loader.loadClass("kafka.server.KafkaConfig")

val props: Properties = this.props(options, "server.properties")
val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object]

val metricsReporter = loader.loadClass("kafka.metrics.KafkaMetricsReporter$").getField("MODULE$").get(null)
val metricsReporterClass = metricsReporter.getClass
val verifiableProps = config.getClass.getMethod("props").invoke(config)
metricsReporterClass.getMethod("startReporters", verifiableProps.getClass).invoke(metricsReporter, verifiableProps)

server
}

def configureLog4j(options: util.Map[String, String]): Unit = {
Expand Down

0 comments on commit def0611

Please sign in to comment.