Skip to content

Commit

Permalink
[api] collect instance configurations via API (#156)
Browse files Browse the repository at this point in the history
* [json] adding json configuration parsing.

* [json] parser fixes.

* [json] move to jackson from gson -- too much drama

* [test] addressing configuration changes.

* [json] test json configuration parser - fix multiple issues.

* [auto_discovery] pull configurations from API, and apply configs.

[autodiscovery] check for json configs every iteration - improvments in pipeline.

[httpclient] url encoded content type with GETs

[httpclient] wrap responses.

[api] if response is 204 no updates + method refactor

* [ci] no need to actually test the build against 1.6 - we can just target it with 1.7/1.8

* [api] cleaner url structure
  • Loading branch information
truthbk authored Oct 9, 2017
1 parent d2d6b73 commit c36514e
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 172 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ jdk:
- oraclejdk8
- oraclejdk7
- openjdk7
- openjdk6

addons:
hostname: dd-jmxfetch-testhost
25 changes: 19 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<url>http://maven.apache.org</url>

<properties>
<maven.compiler.target>1.6</maven.compiler.target>
<commons-io.version>2.4</commons-io.version>
<commons-lang.version>2.6</commons-lang.version>
<apache-commons-lang3.version>3.5</apache-commons-lang3.version>
Expand All @@ -20,8 +21,8 @@
<jcommander.version>1.35</jcommander.version>
<junit.version>4.11</junit.version>
<log4j.version>1.2.17</log4j.version>
<gson.version>1.4</gson.version>
<mockito.version>2.2.27</mockito.version>
<jackson.version>2.9.0</jackson.version>
<mockito.version>2.2.27</mockito.version>
<maven-surefire-plugin.version>2.9</maven-surefire-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<snakeyaml.version>1.13</snakeyaml.version>
Expand Down Expand Up @@ -65,11 +66,23 @@
<groupId>com.datadoghq</groupId>
<artifactId>java-dogstatsd-client</artifactId>
<version>${java-dogstatsd-client.version}</version>
</dependency>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<!-- Note: core-annotations version x.y.0 is generally compatible with
(identical to) version x.y.1, x.y.2, etc. -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Expand Down
164 changes: 123 additions & 41 deletions src/main/java/org/datadog/jmxfetch/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.io.IOUtils;
import org.datadog.jmxfetch.reporter.Reporter;
import org.datadog.jmxfetch.util.CustomLogger;
import org.datadog.jmxfetch.util.FileHelper;
Expand All @@ -38,6 +39,8 @@
import com.beust.jcommander.ParameterException;
import com.google.common.primitives.Bytes;

import com.fasterxml.jackson.core.JsonProcessingException;


@SuppressWarnings("unchecked")
public class App {
Expand All @@ -52,16 +55,24 @@ public class App {
private static final int AD_MAX_MAG_INSTANCES = 4; // 1000 instances ought to be enough for anyone

private static int loopCounter;
private AtomicBoolean reinit = new AtomicBoolean(false);
private int lastJSONConfigTS;
private HashMap<String, Object> adJSONConfigs;
private ConcurrentHashMap<String, YamlParser> configs;
private ConcurrentHashMap<String, YamlParser> adConfigs = new ConcurrentHashMap<String, YamlParser>();
private ConcurrentHashMap<String, YamlParser> adPipeConfigs = new ConcurrentHashMap<String, YamlParser>();
private ArrayList<Instance> instances = new ArrayList<Instance>();
private LinkedList<Instance> brokenInstances = new LinkedList<Instance>();
private AtomicBoolean reinit = new AtomicBoolean(false);

private AppConfig appConfig;
private HttpClient client;


public App(AppConfig appConfig) {
this.appConfig = appConfig;
// setup client
if (appConfig.remoteEnabled()) {
client = new HttpClient(appConfig.getIPCHost(), appConfig.getIPCPort(), false);
}
this.configs = getConfigs(appConfig);
}

Expand Down Expand Up @@ -120,6 +131,9 @@ public static void main(String[] args) {

LOGGER.info("JMX Fetch has started");

//set up the config status
config.updateStatus();

App app = new App(config);

// Initiate JMX Connections, get attributes that match the yaml configuration
Expand Down Expand Up @@ -243,7 +257,7 @@ void start() {
long delta_s = 0;
FileInputStream adPipe = null;

if(appConfig.getAutoDiscoveryEnabled()) {
if(appConfig.getAutoDiscoveryPipeEnabled()) {
LOGGER.info("Auto Discovery enabled");
adPipe = newAutoDiscoveryPipe();
try {
Expand All @@ -261,11 +275,11 @@ void start() {
System.exit(0);
}

// any SD configs waiting in pipe?
if(adPipe == null && appConfig.getAutoDiscoveryEnabled()) {
if(adPipe == null && appConfig.getAutoDiscoveryPipeEnabled()) {
// If SD is enabled and the pipe is not open, retry opening pipe
adPipe = newAutoDiscoveryPipe();
}
// any AutoDiscovery configs waiting?
try {
if(adPipe != null && adPipe.available() > 0) {
byte[] buffer = new byte[0];
Expand All @@ -291,10 +305,14 @@ void start() {
}
setReinit(processAutoDiscovery(buffer));
}

if(appConfig.remoteEnabled()) {
setReinit(getJSONConfigs());
}
} catch(IOException e) {
LOGGER.warn("Unable to read from pipe - Service Discovery configuration may have been skipped.");
} catch(Exception e) {
LOGGER.warn("Unknown problem parsing auto-discovery configuration: " + e);
LOGGER.warn("Problem parsing auto-discovery configuration: " + e);
}

long start = System.currentTimeMillis();
Expand All @@ -306,7 +324,7 @@ void start() {
doIteration();
} else {
LOGGER.warn("No instance could be initiated. Retrying initialization.");
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
configs = getConfigs(appConfig);
init(true);
}
Expand Down Expand Up @@ -425,7 +443,7 @@ public void doIteration() {
}

try {
appConfig.getStatus().flush(appConfig.getIPCPort());
appConfig.getStatus().flush();
} catch (Exception e) {
LOGGER.error("Unable to flush stats.", e);
}
Expand Down Expand Up @@ -460,12 +478,16 @@ public boolean addConfig(String name, YamlParser config) {
return false;
}

this.adConfigs.put(name, config);
this.adPipeConfigs.put(name, config);
this.setReinit(true);

return true;
}

public boolean addJsonConfig(String name, String json) {
return false;
}

private ConcurrentHashMap<String, YamlParser> getConfigs(AppConfig config) {
ConcurrentHashMap<String, YamlParser> configs = new ConcurrentHashMap<String, YamlParser>();
YamlParser fileConfig;
Expand Down Expand Up @@ -504,6 +526,47 @@ private ConcurrentHashMap<String, YamlParser> getConfigs(AppConfig config) {
return configs;
}

private boolean getJSONConfigs() {
HttpClient.HttpResponse response;
boolean update = false;

if (this.client == null) {
return update;
}

try {
String uripath = "agent/jmx/configs?timestamp="+lastJSONConfigTS;
response = client.request("GET", "", uripath);
if (!response.isResponse2xx()) {
LOGGER.warn("Failed collecting JSON configs: [" +
response.getResponseCode() +"] " +
response.getResponseBody());
return update;
} else if (response.getResponseCode() == 204) {
LOGGER.debug("No configuration changes...");
return update;
}

LOGGER.debug("Received the following JSON configs: " + response.getResponseBody());

InputStream jsonInputStream = IOUtils.toInputStream(response.getResponseBody(), "UTF-8");
JsonParser parser = new JsonParser(jsonInputStream);
int timestamp = ((Integer) parser.getJsonTimestamp()).intValue();
if (timestamp > lastJSONConfigTS) {
adJSONConfigs = (HashMap<String, Object>)parser.getJsonConfigs();
lastJSONConfigTS = timestamp;
update = true;
LOGGER.debug("update is in order - updating timestamp: " + lastJSONConfigTS);
}
} catch (JsonProcessingException e) {
LOGGER.error("error processing JSON response: " + e);
} catch (IOException e) {
LOGGER.error("unable to collect remote JMX configs: " + e);
}

return update;
}

private void reportStatus(AppConfig appConfig, Reporter reporter, Instance instance,
int metricCount, String message, String status) {
String checkName = instance.getCheckName();
Expand All @@ -521,17 +584,50 @@ private void sendServiceCheck(Reporter reporter, Instance instance, String messa
reporter.resetServiceCheckCount(checkName);
}

private void instantiate(LinkedHashMap<String, Object> instanceMap, LinkedHashMap<String, Object> initConfig,
String checkName, AppConfig appConfig, boolean forceNewConnection) {

Instance instance;
Reporter reporter = appConfig.getReporter();

try {
instance = new Instance(instanceMap, initConfig, checkName, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(checkName, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
return;
}

try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
}
}

public void init(boolean forceNewConnection) {
clearInstances(instances);
clearInstances(brokenInstances);


Reporter reporter = appConfig.getReporter();


Iterator<Entry<String, YamlParser>> it = configs.entrySet().iterator();
// SD config cache doesn't remove configs - it just overwrites.
Iterator<Entry<String, YamlParser>> itSD = adConfigs.entrySet().iterator();
Iterator<Entry<String, YamlParser>> itSD = adPipeConfigs.entrySet().iterator();
while (it.hasNext() || itSD.hasNext()) {
Map.Entry<String, YamlParser> entry;
boolean sdIterator = false;
Expand All @@ -544,6 +640,7 @@ public void init(boolean forceNewConnection) {

String name = entry.getKey();
YamlParser yamlConfig = entry.getValue();
// AD config cache doesn't remove configs - it just overwrites.
if(!sdIterator) {
it.remove();
}
Expand All @@ -557,35 +654,20 @@ public void init(boolean forceNewConnection) {
}

for (LinkedHashMap<String, Object> configInstance : configInstances) {
Instance instance;
//Create a new Instance object
try {
instance = new Instance(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig);
} catch (Exception e) {
String warning = "Unable to create instance. Please check your yaml file";
appConfig.getStatus().addInitFailedCheck(name, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
continue;
}
try {
// initiate the JMX Connection
instance.init(forceNewConnection);
instances.add(instance);
} catch (IOException e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = CANNOT_CONNECT_TO_INSTANCE + instance + ". " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
} catch (Exception e) {
instance.cleanUp();
brokenInstances.add(instance);
String warning = "Unexpected exception while initiating instance " + instance + " : " + e.getMessage();
this.reportStatus(appConfig, reporter, instance, 0, warning, Status.STATUS_ERROR);
this.sendServiceCheck(reporter, instance, warning, Status.STATUS_ERROR);
LOGGER.error(warning, e);
instantiate(configInstance, (LinkedHashMap<String, Object>) yamlConfig.getInitConfig(),
name, appConfig, forceNewConnection);
}
}

//Process JSON configurations
if (adJSONConfigs != null) {
for (String check : adJSONConfigs.keySet()) {
HashMap<String, Object> checkConfig = (HashMap<String, Object>) adJSONConfigs.get(check);
LinkedHashMap<String, Object> initConfig = (LinkedHashMap<String, Object>) checkConfig.get("init_config");
ArrayList<LinkedHashMap<String, Object>> configInstances = (ArrayList<LinkedHashMap<String, Object>>) checkConfig.get("instances");
for (LinkedHashMap<String, Object> configInstance : configInstances) {
instantiate(configInstance, initConfig, check, appConfig, forceNewConnection);
}
}
}
Expand Down
Loading

0 comments on commit c36514e

Please sign in to comment.