Skip to content

Commit

Permalink
production version 1.4.0 for ES 2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
smorovic committed Mar 23, 2016
1 parent 9a10b95 commit 6f7f385
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 79 deletions.
2 changes: 1 addition & 1 deletion src/insertSubsys.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

conn = httplib.HTTPConnection(host='localhost',port=9200)

q = { "es_central_cluster":"es-cdaq","es_tribe_host" : "es-tribe", "es_tribe_cluster" : "es-tribe", "polling_interval" : 15, "fetching_interval" : 5, "runIndex_read" : "runindex_"+subsys+"_read", "runIndex_write" : "runindex_"+subsys+"_write", "boxinfo_read" : "boxinfo_"+subsys+"_read", "enable_stats" : False, "node":{"status":"created"},"subsystem":subsys, "instance_name":"river_"+subsys+"_main","close_indices": True }
q = { "es_central_cluster":"es-cdaq","es_tribe_host" : "es-tribe", "es_tribe_cluster" : "es-tribe", "polling_interval" : 15, "fetching_interval" : 5, "runIndex_read" : "runindex_"+subsys+"_read", "runIndex_write" : "runindex_"+subsys+"_write", "boxinfo_write" : "boxinfo_"+subsys+"_write", "enable_stats" : False, "node":{"status":"created"},"subsystem":subsys, "instance_name":"river_"+subsys+"_main","close_indices": True }

print json.dumps(q),'\n'

Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/fffriver/AbstractRunRiverThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.update.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.common.xcontent.XContentBuilder;

import static org.elasticsearch.common.xcontent.XContentFactory.*;

//Remote query stuff
Expand Down Expand Up @@ -61,7 +62,7 @@ public class AbstractRunRiverThread extends Thread {
public String runNumber;
public String runIndex_read;
public String runIndex_write;
public String boxinfo_read;
public String boxinfo_write;
public Boolean statsEnabled;
public Boolean closeIndices;
public String subsystem;
Expand Down Expand Up @@ -99,7 +100,7 @@ public AbstractRunRiverThread(String riverName, Map<String, Object> rSettings,
runNumber = XContentMapValues.nodeStringValue(rSettings.get("runNumber"), "0");
runIndex_read = XContentMapValues.nodeStringValue(rSettings.get("runIndex_read"), "runindex_cdaq_read");
runIndex_write = XContentMapValues.nodeStringValue(rSettings.get("runIndex_write"), "runindex_cdaq_write");
boxinfo_read = XContentMapValues.nodeStringValue(rSettings.get("boxinfo_read"), "boxinfo_cdaq_read");
boxinfo_write = XContentMapValues.nodeStringValue(rSettings.get("boxinfo_write"), "boxinfo_cdaq_write");
statsEnabled = Boolean.valueOf(XContentMapValues.nodeStringValue(rSettings.get("enable_stats"), "false"));
closeIndices = Boolean.valueOf(XContentMapValues.nodeStringValue(rSettings.get("close_indices"), "true"));
river_esindex = XContentMapValues.nodeStringValue(rSettings.get("river_esindex"), "river");
Expand All @@ -121,12 +122,12 @@ public void run() {
} catch (IOException e) {
logger.error("beforeLoop IOEception: ", e);
inError = true;
System.exit(3);
System.exit(4);
return;
} catch (Exception e) {
logger.error("beforeLoop Exception: ", e);
inError = true;
System.exit(3);
System.exit(5);
return;
}
//main loop
Expand All @@ -136,13 +137,15 @@ public void run() {
mainLoop();
} catch (IOException e) {
logger.error("Mainloop IOEception: ", e);
selfDelete();
inError = true;
System.exit(3);
System.exit(6);
break;
} catch (Exception e) {
logger.error("Mainloop Exception: ", e);
System.exit(3);
logger.error("Mainloop General Exception: ", e);
selfDelete();
inError = true;
System.exit(7);
break;
}

Expand All @@ -154,6 +157,10 @@ public void run() {
//afterLoop();
}

public void selfDelete(){
isRunning = false;
}

public void mainLoop() throws Exception {
return;
}
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/fffriver/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,15 @@ public void checkBoxInfo(){
boxinfoQuery.getJSONObject("filter").getJSONObject("term")
.put("activeRuns",runNumber);

SearchResponse response = client.prepareSearch(boxinfo_read).setSource(boxinfoQuery)
SearchResponse response = client.prepareSearch(boxinfo_write).setSource(boxinfoQuery)
.execute().actionGet();

collectStats(riverName,"boxinfoQuery",boxinfo_read,response);
collectStats(riverName,"boxinfoQuery",boxinfo_write,response);

logger.info("Boxinfo: "+ String.valueOf(response.getHits().getTotalHits()));
if (response.getHits().getTotalHits() == 0 ) {
execRunClose();
setRunning(false);
//selfDelete();
selfDelete();
}
}

Expand Down
65 changes: 36 additions & 29 deletions src/main/java/org/fffriver/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@
import java.net.InetSocketAddress;

//ELASTICSEARCH

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.transport.TransportAddress;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.logging.Loggers;

public class Main {
Expand All @@ -42,6 +46,7 @@ public static void main(String[] argv) {

//start transport client
try {

//ES 2.X API:
Settings settings = Settings.settingsBuilder().put("cluster.name", river_escluster).build();
client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(InetAddress.getByName(river_eshost), 9300)));
Expand All @@ -52,52 +57,54 @@ public static void main(String[] argv) {
return;
}

//get document id from river index
GetResponse response;
try {
//get document in river_id in river index
response = client.prepareGet(river_esindex,"instance",river_id).get();
catch (Exception e) {
logger.error("Main river exception (GET): ", e);
System.exit(3);
return;
}
//read params and push into settings Map
Map <String, Object> settings = new HashMap<String, Object>();
settings.put("role",role);
settings.put("subsystem",river_subsys);
settings.put("runNumber",river_runnumber);
settings.put("es_tribe_host", response.getSource().get("es_tribe_host"));
settings.put("es_tribe_cluster", response.getSource().get("es_tribe_cluster"));
settings.put("polling_interval",response.getSource().get("polling_interval"));
settings.put("fetching_interval",response.getSource().get("fetching_interval"));
settings.put("runIndex_read",response.getSource().get("runIndex_read"));
settings.put("runIndex_write",response.getSource().get("runIndex_write"));
try {
settings.put("boxinfo_read",response.getSource().get("boxinfo_read"));
catch (Exception e) {
//fallback to old name
settings.put("boxinfo_read",response.getSource().get("boxinfo_write"));
}
settings.put("enable_stats",response.getSource().get("enable_stats"));
settings.put("close_indices",response.getSource().get("close_indices"));
settings.put("river_esindex",river_esindex);
settings.put("es_central_cluster",river_escluster);

try {
//read params and push into settings Map
Map <String, Object> settings = new HashMap<String, Object>();
settings.put("role",role);
settings.put("subsystem",river_subsys);
settings.put("runNumber",river_runnumber);
settings.put("es_tribe_host", response.getSource().get("es_tribe_host"));
settings.put("es_tribe_cluster", response.getSource().get("es_tribe_cluster"));
settings.put("polling_interval",response.getSource().get("polling_interval"));
settings.put("fetching_interval",response.getSource().get("fetching_interval"));
settings.put("runIndex_read",response.getSource().get("runIndex_read"));
settings.put("runIndex_write",response.getSource().get("runIndex_write"));
settings.put("boxinfo_write",response.getSource().get("boxinfo_write"));
settings.put("enable_stats",response.getSource().get("enable_stats"));
settings.put("close_indices",response.getSource().get("close_indices"));
settings.put("river_esindex",river_esindex);
settings.put("es_central_cluster",river_escluster);

//super(riverName, settings);
//this.client = client;
if (role.equals("monitor")){
rm = new RunMonitor(river_id,settings,client);
rm.run();
} else if (role.equals("collector")) {
cd = new Collector(river_id,settings,client);
cd.run();
}
client.close();

//stop
/*
if (role.equals("monitor")){
rm.setRunning(false);
}else if (role.equals("collector")){
cd.setRunning(false);
}*/
//exit
}
catch (Exception e) {
logger.error("Main river exception: ", e);
System.exit(3);
return;
}
client.close();


}
}
43 changes: 5 additions & 38 deletions src/main/java/org/fffriver/RunMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@
public class RunMonitor extends AbstractRunRiverThread {

JSONObject streamHistMapping;
JSONObject streamHistMappingAlt;
JSONObject stateHistMapping;
JSONObject stateHistMappingAlt;
JSONObject stateHistSummaryMapping;
JSONObject stateHistSummaryMappingAlt;
JSONObject statsMapping;
JSONObject runQuery;

Expand Down Expand Up @@ -104,8 +101,7 @@ public void createRun (String runNumber) throws Exception {
.field("fetching_interval", fetching_interval)
.field("runIndex_read", runIndex_read)
.field("runIndex_write", runIndex_write)
.field("boxinfo_read", boxinfo_read)
.field("boxinfo_write", boxinfo_read)//fallback option
.field("boxinfo_write", boxinfo_write)
.field("enable_stats", statsEnabled)
.field("close_indices", closeIndices)
.field("es_central_cluster", es_central_cluster)
Expand Down Expand Up @@ -139,11 +135,8 @@ public void getQueries() {
try {
runQuery = getJson("runRanger");
stateHistMapping = getJson("stateHistMapping");
stateHistMappingAlt = getJson("stateHistMappingAlt");
stateHistSummaryMapping = getJson("stateHistSummaryMapping");
stateHistSummaryMappingAlt = getJson("stateHistSummaryMappingAlt");
streamHistMapping = getJson("streamHistMapping");
streamHistMappingAlt = getJson("streamHistMappingAlt");
statsMapping = getJson("statsMapping");
} catch (Exception e) {
logger.error("RunMonitor getQueries Exception: ", e);
Expand All @@ -165,20 +158,11 @@ public void createStateMapping(Client client, String runIndex){
.setTypes("state-hist").execute().actionGet();
//if (!response.mappings().isEmpty()){ logger.info("State Mapping already exists"); return; }
logger.info("create/update StateMapping");
try {
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("state-hist")
.setSource(stateHistMappingAlt)
.execute().actionGet();
} catch (Exception e) {
client.admin().indices().preparePutMapping()
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("state-hist")
.setSource(stateHistMapping)
.execute().actionGet();
{

}

public void createStateSummaryMapping(Client client, String runIndex){
Expand All @@ -187,20 +171,11 @@ public void createStateSummaryMapping(Client client, String runIndex){
.setTypes("state-hist-summary").execute().actionGet();
//if (!response.mappings().isEmpty()){ logger.info("State Summary Mapping already exists"); return; }
logger.info("create/update StateSummaryMapping");
try {
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("state-hist-summary")
.setSource(stateHistSummaryMappingAlt)
.execute().actionGet();
} catch (Exception e) {
client.admin().indices().preparePutMapping()
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("state-hist-summary")
.setSource(stateHistSummaryMapping)
.execute().actionGet();

}
}


Expand All @@ -209,20 +184,12 @@ public void createStreamMapping(Client client, String runIndex){
GetMappingsResponse response = client.admin().indices().prepareGetMappings(runIndex_write)
.setTypes("stream-hist").execute().actionGet();
//if (!response.mappings().isEmpty()){ logger.info("Stream Mapping already exists"); return; }
logger.info("create/update StreamMapping");
try {
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("stream-hist")
.setSource(streamHistMappingAlt)
.execute().actionGet();
} catch (Exception e) {
client.admin().indices().preparePutMapping()
logger.info("create/update StreamMapping");
client.admin().indices().preparePutMapping()
.setIndices(runIndex_write)
.setType("stream-hist")
.setSource(streamHistMapping)
.execute().actionGet();
}
}

public void createStatIndex(Client client, String index){
Expand Down

0 comments on commit 6f7f385

Please sign in to comment.