From fd43c853b06d35f6d4c5fad7c119f1532dfc1d0c Mon Sep 17 00:00:00 2001 From: Matthew Stump Date: Thu, 29 Jun 2017 21:45:30 -0700 Subject: [PATCH] added support for Kafka in metrics3 and associated sample config --- .gitignore | 1 + pom.xml | 6 +- .../AbstractHostPortReporterConfig.java | 2 +- .../config/AbstractKafkaReporterConfig.java | 128 ++++++++++++++++++ .../metrics/reporter/config/HostPort.java | 7 +- .../src/test/resources/sample/kafka.yaml | 30 ++++ reporter-config3/pom.xml | 22 ++- .../reporter/config/KafkaReporterConfig.java | 111 +++++++++++++++ .../reporter/config/ReporterConfig.java | 28 ++++ .../reporter/config/sample/SampleTest.java | 11 +- .../reporter/config/sample/ValidateTest.java | 1 + samples/kafka.yaml | 30 ++++ 12 files changed, 371 insertions(+), 6 deletions(-) create mode 100644 reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractKafkaReporterConfig.java create mode 100644 reporter-config-base/src/test/resources/sample/kafka.yaml create mode 100644 reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/KafkaReporterConfig.java create mode 100644 samples/kafka.yaml diff --git a/.gitignore b/.gitignore index 67beeec..c49c09f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ target/ pom.xml.releaseBackup release.properties TODO +.DS_Store \ No newline at end of file diff --git a/pom.xml b/pom.xml index 45ecfbf..a9d62f9 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,6 @@ 3.1 - @@ -127,6 +126,11 @@ false + + + jitpack.io + https://jitpack.io + clojars.org diff --git a/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractHostPortReporterConfig.java b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractHostPortReporterConfig.java index a7d2998..649fa74 100644 --- a/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractHostPortReporterConfig.java +++ b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractHostPortReporterConfig.java @@ -172,7 +172,7 @@ private String sanitizeName(String name) { return name.replaceAll("[^a-zA-Z0-9_-]", "_"); } - String resolvePrefix(String prefixTemplate) { + public String resolvePrefix(String prefixTemplate) { Map valueMap = new HashMap(); if (localhost != null) { String hostname = localhost.getHostName(); diff --git a/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractKafkaReporterConfig.java b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractKafkaReporterConfig.java new file mode 100644 index 0000000..17e9cb4 --- /dev/null +++ b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/AbstractKafkaReporterConfig.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.addthis.metrics.reporter.config; + +import javax.validation.constraints.NotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class AbstractKafkaReporterConfig extends AbstractHostPortReporterConfig { + + protected String name = "kafka"; + + protected String hostname = null; + + protected String ip = null; + + @NotNull + protected String serializer = "kafka.serializer.StringEncoder"; + + @NotNull + protected String partitioner = "kafka.producer.DefaultPartitioner"; + + @NotNull + protected String requiredAcks = "1"; + + @NotNull + protected String topic; + + @NotNull + private Map labels; + + private Map resolvedLabels; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getTopic() { + return this.topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getHostname() { + return this.hostname; + } + + public void setHostname(String hostname) { + if (hostname != null) { + this.hostname = resolvePrefix(hostname); + } + this.hostname = null; + } + + public String getIp() { + return this.ip; + } + + public void setIp(String ip) { + if (ip != null) { + this.ip = resolvePrefix(ip); + } + this.ip = null; + } + + public String getSerializer() { + return this.serializer; + } + + public void setSerializer(String serializer) { + this.serializer = serializer; + } + + public String getPartitioner() { + return this.partitioner; + } + + public void setPartitioner(String partitioner) { + this.partitioner = partitioner; + } + + public String getRequiredAcks() { + return this.requiredAcks; + } + + public void setRequiredAcks(String requiredAcks) { + this.requiredAcks = requiredAcks; + } + + public void setLabels(Map labels) { + this.labels = labels; + this.resolvedLabels = new HashMap(labels.size()); + for (Map.Entry entry : labels.entrySet()) + { + this.resolvedLabels.put(entry.getKey(), resolvePrefix(entry.getValue())); + } + } + + public Map getResolvedLabels() { + return resolvedLabels; + } + + @Override + public List getFullHostList() + { + return getHostListAndStringList(); + } +} diff --git a/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/HostPort.java b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/HostPort.java index 39b4758..edb2bf1 100644 --- a/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/HostPort.java +++ b/reporter-config-base/src/main/java/com/addthis/metrics/reporter/config/HostPort.java @@ -54,6 +54,9 @@ public void setPort(int port) { this.port = port; } -} - + public String toString() + { + return String.format("%s:%d", this.host, this.port); + } +} diff --git a/reporter-config-base/src/test/resources/sample/kafka.yaml b/reporter-config-base/src/test/resources/sample/kafka.yaml new file mode 100644 index 0000000..e5a560d --- /dev/null +++ b/reporter-config-base/src/test/resources/sample/kafka.yaml @@ -0,0 +1,30 @@ +kafka: + - + requiredAcks: '1' + topic: 'cassandra' + period: 10 + timeunit: 'SECONDS' + hostname: '${host.fqdn}' + ip: '${host.name}' + hosts: + - host: 'localhost' + port: 9092 + + labels: + 'TEST LABEL' : 'TEST VALUE' + + predicate: + color: 'white' + useQualifiedName: true + patterns: + - '^org\.apache\.cassandra\.metrics\.Cache.+' + - '^org\.apache\.cassandra\.metrics\.ClientRequest.+' + - '^org\.apache\.cassandra\.metrics\.CommitLog.+' + - '^org\.apache\.cassandra\.metrics\.Compaction.+' + - '^org\.apache\.cassandra\.metrics\.DroppedMetrics.+' + - '^org\.apache\.cassandra\.metrics\.ReadRepair.+' + - '^org\.apache\.cassandra\.metrics\.Storage.+' + - '^org\.apache\.cassandra\.metrics\.ThreadPools.+' + - '^org\.apache\.cassandra\.metrics\.CQL.+' + - '^org\.apache\.cassandra\.metrics\.Client.+' + - '^org\.apache\.cassandra\.metrics\.Table\.[a-zA-Z]+\.all' diff --git a/reporter-config3/pom.xml b/reporter-config3/pom.xml index a13a7ea..34ef59d 100644 --- a/reporter-config3/pom.xml +++ b/reporter-config3/pom.xml @@ -23,13 +23,25 @@ reporter-config3 metrics reporter config 3.x - com.addthis.metrics reporter-config-base 3.0.3 + + org.apache.kafka + kafka_2.10 + 0.8.2.2 + + + org.apache.zookeeper + zookeeper + + + test + true + io.dropwizard.metrics metrics-core @@ -93,6 +105,14 @@ simpleclient_servlet true + + + com.github.mstump + metrics-kafka + c804bf1874 + true + compile + diff --git a/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/KafkaReporterConfig.java b/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/KafkaReporterConfig.java new file mode 100644 index 0000000..3dbb18d --- /dev/null +++ b/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/KafkaReporterConfig.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.addthis.metrics3.reporter.config; + +import java.util.List; +import java.util.LinkedList; +import java.util.Properties; +import java.util.StringJoiner; + +import com.codahale.metrics.MetricRegistry; +import com.addthis.metrics.reporter.config.HostPort; +import com.addthis.metrics.reporter.config.AbstractKafkaReporterConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.github.hengyunabc.metrics.KafkaReporter; +import kafka.producer.ProducerConfig; + + +public class KafkaReporterConfig extends AbstractKafkaReporterConfig implements MetricsReporterConfigThree +{ + private static final Logger log = LoggerFactory.getLogger(KafkaReporterConfig.class); + + private MetricRegistry registry; + + private KafkaReporter reporter; + + private boolean checkClass(String className) { + if (!isClassAvailable(className)) + { + log.error("Tried to enable InfluxDBReporter, but class {} was not found", className); + return false; + } else + { + return true; + } + } + + @Override + public boolean enable(MetricRegistry registry) { + this.registry = registry; + + boolean success = checkClass("com.addthis.metrics.reporter.config.AbstractKafkaReporterConfig"); + if (!success) + { + return false; + } + + List hosts = getFullHostList(); + if (hosts == null || hosts.isEmpty()) + { + log.error("No hosts specified, cannot enable KafkaReporter"); + return false; + } + + log.info("Enabling KafkaReporter to {}", ""); + try + { + StringJoiner brokerList = new StringJoiner(","); + for (HostPort host : getFullHostList()) { + brokerList.add(host.toString()); + } + + Properties props = new Properties(); + props.put("metadata.broker.list", brokerList.toString()); + props.put("serializer.class", getSerializer()); + props.put("partitioner.class", getPartitioner()); + props.put("request.required.acks", getRequiredAcks()); + ProducerConfig config = new ProducerConfig(props); + + reporter = KafkaReporter.forRegistry(registry) + .config(config) + .topic(getTopic()) + .hostName(getHostname()) + .ip(getIp()) + .labels(getResolvedLabels()) + .prefix(getResolvedPrefix()) + .filter(MetricFilterTransformer.generateFilter(getPredicate())) + .build(); + + reporter.start(getPeriod(), getRealTimeunit()); + } + catch (Exception e) + { + log.error("Failure while Enabling KafkaReporter", e); + return false; + } + return true; + } + + @Override + public void report() { + if (reporter != null) { + reporter.report(); + } + } + +} diff --git a/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/ReporterConfig.java b/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/ReporterConfig.java index 4ed36ea..ef72bec 100644 --- a/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/ReporterConfig.java +++ b/reporter-config3/src/main/java/com/addthis/metrics3/reporter/config/ReporterConfig.java @@ -40,6 +40,8 @@ public class ReporterConfig extends AbstractReporterConfig { @Valid private List csv; @Valid + private List kafka; + @Valid private List ganglia; @Valid private List graphite; @@ -70,6 +72,14 @@ public void setCsv(List csv) { this.csv = csv; } + public List getKafka() { + return kafka; + } + + public void setKafka(List kafka) { + this.kafka = kafka; + } + public List getGanglia() { return ganglia; } @@ -154,6 +164,20 @@ public boolean enableCsv(MetricRegistry registry) { return !failures; } + public boolean enableKafka(MetricRegistry registry) { + boolean failures = false; + if (kafka == null) { + log.debug("Asked to enable kafka, but it was not configured"); + return false; + } + for (KafkaReporterConfig kafkaConfig : kafka) { + if (!kafkaConfig.enable(registry)) { + failures = true; + } + } + return !failures; + } + public boolean enableGanglia(MetricRegistry registry) { boolean failures = false; if (ganglia == null) { @@ -263,6 +287,9 @@ public boolean enableAll(MetricRegistry registry) { if (csv != null && enableCsv(registry)) { enabled = true; } + if (kafka != null && enableKafka(registry)) { + enabled = true; + } if (ganglia != null && enableGanglia(registry)) { enabled = true; } @@ -302,6 +329,7 @@ private void report(List reporters) { public void report() { report(console); report(csv); + report(kafka); report(ganglia); report(graphite); report(influxdb); diff --git a/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/SampleTest.java b/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/SampleTest.java index ffaff95..82cf717 100644 --- a/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/SampleTest.java +++ b/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/SampleTest.java @@ -126,6 +126,16 @@ public void sampleStatsD() throws Exception { runLoop(config); } + @Test + public void sampleKafka() throws Exception { + ReporterConfig config = ReporterConfig.loadFromFile("src/test/resources/sample/kafka.yaml"); + System.out.println(yaml.dump(config)); + log.info("Sample Kafka"); + assertNotNull(config.getKafka()); + assertEquals(1, config.getKafka().size()); + runLoop(config); + } + @Test public void sampleStatsDMulti() throws Exception { ReporterConfig config = ReporterConfig.loadFromFile("src/test/resources/sample/statsd-multi.yaml"); @@ -153,5 +163,4 @@ public void sampleMulti() throws Exception { log.info("Multi Reporter"); runLoop(config); } - } diff --git a/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/ValidateTest.java b/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/ValidateTest.java index 4828753..516990f 100644 --- a/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/ValidateTest.java +++ b/reporter-config3/src/test/java/com/addthis/metrics3/reporter/config/sample/ValidateTest.java @@ -41,6 +41,7 @@ public void validateSamples() throws IOException ReporterConfig.loadFromFileAndValidate("src/test/resources/sample/ganglia-gmond-predicate-measurement.yaml"); ReporterConfig.loadFromFileAndValidate("src/test/resources/sample/graphite.yaml"); ReporterConfig.loadFromFileAndValidate("src/test/resources/sample/influxdb.yaml"); + ReporterConfig.loadFromFileAndValidate("src/test/resources/sample/kafka.yaml"); ReporterConfig.loadFromFileAndValidate("src/test/resources/sample/multi.yaml"); } diff --git a/samples/kafka.yaml b/samples/kafka.yaml new file mode 100644 index 0000000..e5a560d --- /dev/null +++ b/samples/kafka.yaml @@ -0,0 +1,30 @@ +kafka: + - + requiredAcks: '1' + topic: 'cassandra' + period: 10 + timeunit: 'SECONDS' + hostname: '${host.fqdn}' + ip: '${host.name}' + hosts: + - host: 'localhost' + port: 9092 + + labels: + 'TEST LABEL' : 'TEST VALUE' + + predicate: + color: 'white' + useQualifiedName: true + patterns: + - '^org\.apache\.cassandra\.metrics\.Cache.+' + - '^org\.apache\.cassandra\.metrics\.ClientRequest.+' + - '^org\.apache\.cassandra\.metrics\.CommitLog.+' + - '^org\.apache\.cassandra\.metrics\.Compaction.+' + - '^org\.apache\.cassandra\.metrics\.DroppedMetrics.+' + - '^org\.apache\.cassandra\.metrics\.ReadRepair.+' + - '^org\.apache\.cassandra\.metrics\.Storage.+' + - '^org\.apache\.cassandra\.metrics\.ThreadPools.+' + - '^org\.apache\.cassandra\.metrics\.CQL.+' + - '^org\.apache\.cassandra\.metrics\.Client.+' + - '^org\.apache\.cassandra\.metrics\.Table\.[a-zA-Z]+\.all'