diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index ed401decbef1b..7da8a9c52a6de 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -88,7 +88,7 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def registerBean(bean: MetricMBean) { if (!server.isRegistered(bean.objectName)) { debug("Registering MBean for %s." format bean.objectName) - server.registerMBean(bean, bean.objectName); + server.registerMBean(bean, bean.objectName) } } } diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala index 431e14c101325..9453524cdd8dd 100644 --- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala +++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala @@ -19,65 +19,49 @@ package org.apache.samza.metrics.reporter -import org.junit.Assert._ -import org.junit.AfterClass -import org.junit.BeforeClass -import org.junit.Test -import org.apache.samza.task.TaskContext -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.JvmMetrics - -import java.lang.management.ManagementFactory -import java.rmi.registry.LocateRegistry - +import javax.management.MBeanServer import javax.management.ObjectName -import javax.management.remote.JMXServiceURL -import javax.management.remote.JMXConnectorServerFactory -import javax.management.remote.JMXConnectorServer -import javax.management.remote.JMXConnectorFactory +import junit.framework.Assert.assertEquals +import org.apache.samza.metrics.JmxUtil +import org.apache.samza.metrics.MetricsRegistryMap +import org.junit.Test +import org.mockito.Matchers +import org.mockito.Mockito.mock +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.Mockito.when +import org.mockito.Mockito.reset -import scala.collection.JavaConverters._ +class TestJmxReporter { -object TestJmxReporter { - val port = 4500 - val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port) - var cs: JMXConnectorServer = null + val REPORTER_SOURCE = "test" - @BeforeClass - def beforeSetupServers { - LocateRegistry.createRegistry(port) - val mbs = ManagementFactory.getPlatformMBeanServer() - cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs) - cs.start - } + @Test + def testJmxReporter { + val metricGroup = "org.apache.samza.metrics.JvmMetrics" + val metricName = "mem-non-heap-used-mb" + val objectName: ObjectName = JmxUtil.getObjectName(metricGroup, metricName, REPORTER_SOURCE) - @AfterClass - def afterCleanLogDirs { - if (cs != null) { - cs.stop - } - } -} + val registry: MetricsRegistryMap = new MetricsRegistryMap + val mBeanServerMock: MBeanServer = mock(classOf[MBeanServer]) -class TestJmxReporter { - import TestJmxReporter.url + // Create dummy test metric. + registry.newCounter(metricGroup, metricName) - // TODO: Fix in SAMZA-1196 - //@Test - def testJmxReporter { - val registry = new MetricsRegistryMap - val jvm = new JvmMetrics(registry) - val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]().asJava)) + when(mBeanServerMock.isRegistered(objectName)).thenReturn(false) - reporter.register("test", registry) + val reporter = new JmxReporter(mBeanServerMock) + reporter.register(REPORTER_SOURCE, registry) reporter.start - jvm.run - val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection - val stateViaJMX = mbserver.getAttribute(new ObjectName("org.apache.samza.metrics.JvmMetrics:type=test,name=mem-non-heap-used-mb"), "Value").asInstanceOf[Float] + verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName)) + + reset(mBeanServerMock) + // Create dummy counter to test metrics reporting through listener. + registry.newCounter(metricGroup, metricName) - assertTrue(stateViaJMX > 0) + assertEquals(1, registry.listeners.size) + verify(mBeanServerMock, times(1)).registerMBean(Matchers.anyObject(), Matchers.eq(objectName)) reporter.stop }