Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
added support for Kafka in metrics3 and associated sample config
Browse files Browse the repository at this point in the history
  • Loading branch information
mstump committed Jul 7, 2017
1 parent 479358a commit fd43c85
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ target/
pom.xml.releaseBackup
release.properties
TODO
.DS_Store
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
<version>3.1</version>
</dependency>
</dependencies>

<repositories>
<!-- re-specify central repo to ensure it's the default -->
<repository>
Expand All @@ -127,6 +126,11 @@
<enabled>false</enabled>
</snapshots>
</repository>
<!-- kafka reporter -->
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
<!-- riemann reporter -->
<repository>
<id>clojars.org</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private String sanitizeName(String name) {
return name.replaceAll("[^a-zA-Z0-9_-]", "_");
}

String resolvePrefix(String prefixTemplate) {
public String resolvePrefix(String prefixTemplate) {
Map<String, String> valueMap = new HashMap<String, String>();
if (localhost != null) {
String hostname = localhost.getHostName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.addthis.metrics.reporter.config;

import javax.validation.constraints.NotNull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractKafkaReporterConfig extends AbstractHostPortReporterConfig {

protected String name = "kafka";

protected String hostname = null;

protected String ip = null;

@NotNull
protected String serializer = "kafka.serializer.StringEncoder";

@NotNull
protected String partitioner = "kafka.producer.DefaultPartitioner";

@NotNull
protected String requiredAcks = "1";

@NotNull
protected String topic;

@NotNull
private Map<String, String> labels;

private Map<String, String> resolvedLabels;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getTopic() {
return this.topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getHostname() {
return this.hostname;
}

public void setHostname(String hostname) {
if (hostname != null) {
this.hostname = resolvePrefix(hostname);
}
this.hostname = null;
}

public String getIp() {
return this.ip;
}

public void setIp(String ip) {
if (ip != null) {
this.ip = resolvePrefix(ip);
}
this.ip = null;
}

public String getSerializer() {
return this.serializer;
}

public void setSerializer(String serializer) {
this.serializer = serializer;
}

public String getPartitioner() {
return this.partitioner;
}

public void setPartitioner(String partitioner) {
this.partitioner = partitioner;
}

public String getRequiredAcks() {
return this.requiredAcks;
}

public void setRequiredAcks(String requiredAcks) {
this.requiredAcks = requiredAcks;
}

public void setLabels(Map<String, String> labels) {
this.labels = labels;
this.resolvedLabels = new HashMap<String, String>(labels.size());
for (Map.Entry<String, String> entry : labels.entrySet())
{
this.resolvedLabels.put(entry.getKey(), resolvePrefix(entry.getValue()));
}
}

public Map<String, String> getResolvedLabels() {
return resolvedLabels;
}

@Override
public List<HostPort> getFullHostList()
{
return getHostListAndStringList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public void setPort(int port)
{
this.port = port;
}
}


public String toString()
{
return String.format("%s:%d", this.host, this.port);
}
}
30 changes: 30 additions & 0 deletions reporter-config-base/src/test/resources/sample/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
kafka:
-
requiredAcks: '1'
topic: 'cassandra'
period: 10
timeunit: 'SECONDS'
hostname: '${host.fqdn}'
ip: '${host.name}'
hosts:
- host: 'localhost'
port: 9092

labels:
'TEST LABEL' : 'TEST VALUE'

predicate:
color: 'white'
useQualifiedName: true
patterns:
- '^org\.apache\.cassandra\.metrics\.Cache.+'
- '^org\.apache\.cassandra\.metrics\.ClientRequest.+'
- '^org\.apache\.cassandra\.metrics\.CommitLog.+'
- '^org\.apache\.cassandra\.metrics\.Compaction.+'
- '^org\.apache\.cassandra\.metrics\.DroppedMetrics.+'
- '^org\.apache\.cassandra\.metrics\.ReadRepair.+'
- '^org\.apache\.cassandra\.metrics\.Storage.+'
- '^org\.apache\.cassandra\.metrics\.ThreadPools.+'
- '^org\.apache\.cassandra\.metrics\.CQL.+'
- '^org\.apache\.cassandra\.metrics\.Client.+'
- '^org\.apache\.cassandra\.metrics\.Table\.[a-zA-Z]+\.all'
22 changes: 21 additions & 1 deletion reporter-config3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,25 @@
<artifactId>reporter-config3</artifactId>
<name>metrics reporter config 3.x</name>
<description />

<dependencies>
<dependency>
<groupId>com.addthis.metrics</groupId>
<artifactId>reporter-config-base</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down Expand Up @@ -93,6 +105,14 @@
<artifactId>simpleclient_servlet</artifactId>
<optional>true</optional>
</dependency>
<!-- kafka -->
<dependency>
<groupId>com.github.mstump</groupId>
<artifactId>metrics-kafka</artifactId>
<version>c804bf1874</version>
<optional>true</optional>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.addthis.metrics3.reporter.config;

import java.util.List;
import java.util.LinkedList;
import java.util.Properties;
import java.util.StringJoiner;

import com.codahale.metrics.MetricRegistry;
import com.addthis.metrics.reporter.config.HostPort;
import com.addthis.metrics.reporter.config.AbstractKafkaReporterConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.hengyunabc.metrics.KafkaReporter;
import kafka.producer.ProducerConfig;


public class KafkaReporterConfig extends AbstractKafkaReporterConfig implements MetricsReporterConfigThree
{
private static final Logger log = LoggerFactory.getLogger(KafkaReporterConfig.class);

private MetricRegistry registry;

private KafkaReporter reporter;

private boolean checkClass(String className) {
if (!isClassAvailable(className))
{
log.error("Tried to enable InfluxDBReporter, but class {} was not found", className);
return false;
} else
{
return true;
}
}

@Override
public boolean enable(MetricRegistry registry) {
this.registry = registry;

boolean success = checkClass("com.addthis.metrics.reporter.config.AbstractKafkaReporterConfig");
if (!success)
{
return false;
}

List<HostPort> hosts = getFullHostList();
if (hosts == null || hosts.isEmpty())
{
log.error("No hosts specified, cannot enable KafkaReporter");
return false;
}

log.info("Enabling KafkaReporter to {}", "");
try
{
StringJoiner brokerList = new StringJoiner(",");
for (HostPort host : getFullHostList()) {
brokerList.add(host.toString());
}

Properties props = new Properties();
props.put("metadata.broker.list", brokerList.toString());
props.put("serializer.class", getSerializer());
props.put("partitioner.class", getPartitioner());
props.put("request.required.acks", getRequiredAcks());
ProducerConfig config = new ProducerConfig(props);

reporter = KafkaReporter.forRegistry(registry)
.config(config)
.topic(getTopic())
.hostName(getHostname())
.ip(getIp())
.labels(getResolvedLabels())
.prefix(getResolvedPrefix())
.filter(MetricFilterTransformer.generateFilter(getPredicate()))
.build();

reporter.start(getPeriod(), getRealTimeunit());
}
catch (Exception e)
{
log.error("Failure while Enabling KafkaReporter", e);
return false;
}
return true;
}

@Override
public void report() {
if (reporter != null) {
reporter.report();
}
}

}
Loading

0 comments on commit fd43c85

Please sign in to comment.