Skip to content

Commit

Permalink
add generic aggregation to ckan sink
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanHdzC committed Feb 17, 2020
1 parent a3abc27 commit d50030e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,24 +245,36 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getOrgName() {
return orgName;
public String getOrgName(boolean enableLowercase) {
if (enableLowercase) {
return orgName.toLowerCase();
} else {
return orgName;
}
}

public void setOrgName(String orgName) {
this.orgName = orgName;
}

public String getPkgName() {
return pkgName;
public String getPkgName(boolean enableLowercase) {
if (enableLowercase) {
return pkgName.toLowerCase();
} else {
return pkgName;
}
}

public void setPkgName(String pkgName) {
this.pkgName = pkgName;
}

public String getResName() {
return resName;
public String getResName(boolean enableLowercase) {
if (enableLowercase) {
return resName.toLowerCase();
} else {
return resName;
}
}

public void setResName(String resName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package com.telefonica.iot.cygnus.sinks;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator;
import com.telefonica.iot.cygnus.backends.ckan.CKANBackendImpl;
import com.telefonica.iot.cygnus.backends.ckan.CKANBackend;
import com.telefonica.iot.cygnus.containers.NotifyContextRequest;
Expand All @@ -35,6 +40,7 @@
import com.telefonica.iot.cygnus.utils.NGSIConstants;
import com.telefonica.iot.cygnus.utils.NGSIUtils;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Locale;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.flume.Context;
Expand Down Expand Up @@ -249,8 +255,18 @@ void persistBatch(NGSIBatch batch) throws CygnusBadConfiguration, CygnusRuntimeE
String servicePath = firstEvent.getServicePathForData();

// Get an aggregator for this entity and initialize it based on the first event
CKANAggregator aggregator = getAggregator(this.rowAttrPersistence);
aggregator.initialize(firstEvent);
NGSIGenericAggregator aggregator = getAggregator(rowAttrPersistence);
aggregator.setService(events.get(0).getServiceForNaming(enableNameMappings));
aggregator.setServicePathForData(events.get(0).getServicePathForData());
aggregator.setServicePathForNaming(events.get(0).getServicePathForNaming(enableGrouping, enableNameMappings));
aggregator.setEntityForNaming(events.get(0).getEntityForNaming(enableGrouping, enableNameMappings, enableEncoding));
aggregator.setEntityType(events.get(0).getEntityTypeForNaming(enableGrouping, enableNameMappings));
aggregator.setAttribute(events.get(0).getAttributeForNaming(enableNameMappings));
aggregator.setEnableUTCRecvTime(true);
aggregator.setOrgName(buildOrgName(service));
aggregator.setPkgName(buildPkgName(service, aggregator.getServicePathForNaming()));
aggregator.setResName(buildResName(aggregator.getEntityForNaming()));
aggregator.initialize(events.get(0));

for (NGSIEvent event : events) {
aggregator.aggregate(event);
Expand Down Expand Up @@ -314,202 +330,25 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError {
} // try catch
} // truncateByTime

/**
* Class for aggregating fieldValues.
*/
protected abstract class CKANAggregator {

// string containing the data records
protected String records;

protected String service;
protected String servicePathForData;
protected String servicePathForNaming;
protected String entityForNaming;
protected String orgName;
protected String pkgName;
protected String resName;
protected String resId;

public CKANAggregator() {
records = "";
} // CKANAggregator

public String getAggregation() {
return records;
} // getAggregation

public String getOrgName(boolean enableLowercase) {
if (enableLowercase) {
return orgName.toLowerCase();
} else {
return orgName;
} // if else
} // getOrgName

public String getPkgName(boolean enableLowercase) {
if (enableLowercase) {
return pkgName.toLowerCase();
} else {
return pkgName;
} // if else
} // getPkgName

public String getResName(boolean enableLowercase) {
if (enableLowercase) {
return resName.toLowerCase();
} else {
return resName;
} // if else
} // getResName

public void initialize(NGSIEvent event) throws CygnusBadConfiguration {
service = event.getServiceForNaming(enableNameMappings);
servicePathForData = event.getServicePathForData();
servicePathForNaming = event.getServicePathForNaming(enableGrouping, enableNameMappings);
entityForNaming = event.getEntityForNaming(enableGrouping, enableNameMappings, enableEncoding);
orgName = buildOrgName(service);
pkgName = buildPkgName(service, servicePathForNaming);
resName = buildResName(entityForNaming);
} // initialize

public abstract void aggregate(NGSIEvent cygnusEvent);

} // CKANAggregator

/**
* Class for aggregating batches in row mode.
*/
protected class RowAggregator extends CKANAggregator {

@Override
public void initialize(NGSIEvent event) throws CygnusBadConfiguration {
super.initialize(event);
} // initialize

@Override
public void aggregate(NGSIEvent event) {
// get the getRecvTimeTs headers
long recvTimeTs = event.getRecvTimeTs();
String recvTime = CommonUtils.getHumanReadable(recvTimeTs, true);

// get the getRecvTimeTs body
NotifyContextRequest.ContextElement contextElement = event.getContextElement();
String entityId = contextElement.getId();
String entityType = contextElement.getType();
LOGGER.debug("[" + getName() + "] Processing context element (id=" + entityId + ", type="
+ entityType + ")");

// iterate on all this context element attributes, if there are attributes
ArrayList<NotifyContextRequest.ContextAttribute> contextAttributes = contextElement.getAttributes();

if (contextAttributes == null || contextAttributes.isEmpty()) {
LOGGER.warn("No attributes within the notified entity, nothing is done (id=" + entityId
+ ", type=" + entityType + ")");
return;
} // if

for (NotifyContextRequest.ContextAttribute contextAttribute : contextAttributes) {
String attrName = contextAttribute.getName();
String attrType = contextAttribute.getType();
String attrValue = contextAttribute.getContextValue(true);
String attrMetadata = contextAttribute.getContextMetadata();
LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type="
+ attrType + ")");

// create a column and aggregate it
String record = "{\"" + NGSIConstants.RECV_TIME_TS + "\": \"" + recvTimeTs / 1000 + "\","
+ "\"" + NGSIConstants.RECV_TIME + "\": \"" + recvTime + "\","
+ "\"" + NGSIConstants.FIWARE_SERVICE_PATH + "\": \"" + servicePathForData + "\","
+ "\"" + NGSIConstants.ENTITY_ID + "\": \"" + entityId + "\","
+ "\"" + NGSIConstants.ENTITY_TYPE + "\": \"" + entityType + "\","
+ "\"" + NGSIConstants.ATTR_NAME + "\": \"" + attrName + "\","
+ "\"" + NGSIConstants.ATTR_TYPE + "\": \"" + attrType + "\""
+ (isSpecialValue(attrValue) ? "" : ",\"" + NGSIConstants.ATTR_VALUE + "\": " + attrValue)
+ (isSpecialMetadata(attrMetadata) ? "" : ",\"" + NGSIConstants.ATTR_MD + "\": " + attrMetadata)
+ "}";

if (records.isEmpty()) {
records += record;
} else {
records += "," + record;
} // if else
} // for
} // aggregate

} // RowAggregator

/**
* Class for aggregating batches in column mode.
*/
protected class ColumnAggregator extends CKANAggregator {

@Override
public void initialize(NGSIEvent event) throws CygnusBadConfiguration {
super.initialize(event);
} // initialize

@Override
public void aggregate(NGSIEvent event) {
// get the getRecvTimeTs headers
long recvTimeTs = event.getRecvTimeTs();
String recvTime = CommonUtils.getHumanReadable(recvTimeTs, true);

// get the getRecvTimeTs body
NotifyContextRequest.ContextElement contextElement = event.getContextElement();
String entityId = contextElement.getId();
String entityType = contextElement.getType();
LOGGER.debug("[" + getName() + "] Processing context element (id=" + entityId + ", type="
+ entityType + ")");

// iterate on all this context element attributes, if there are attributes
ArrayList<NotifyContextRequest.ContextAttribute> contextAttributes = contextElement.getAttributes();

if (contextAttributes == null || contextAttributes.isEmpty()) {
LOGGER.warn("No attributes within the notified entity, nothing is done (id=" + entityId
+ ", type=" + entityType + ")");
return;
} // if

String record = "{\"" + NGSIConstants.RECV_TIME + "\": \"" + recvTime + "\","
+ "\"" + NGSIConstants.FIWARE_SERVICE_PATH + "\": \"" + servicePathForData + "\","
+ "\"" + NGSIConstants.ENTITY_ID + "\": \"" + entityId + "\","
+ "\"" + NGSIConstants.ENTITY_TYPE + "\": \"" + entityType + "\"";

for (NotifyContextRequest.ContextAttribute contextAttribute : contextAttributes) {
String attrName = contextAttribute.getName();
String attrType = contextAttribute.getType();
String attrValue = contextAttribute.getContextValue(true);
String attrMetadata = contextAttribute.getContextMetadata();
LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type="
+ attrType + ")");

// create part of the column with the current attribute (a.k.a. a column)
record += (isSpecialValue(attrValue) ? "" : ",\"" + attrName + "\": " + attrValue)
+ (isSpecialMetadata(attrMetadata) ? "" : ",\"" + attrName + "_md\": " + attrMetadata);
} // for

// now, aggregate the column
if (records.isEmpty()) {
records += record + "}";
} else {
records += "," + record + "}";
} // if else
} // aggregate

} // ColumnAggregator

protected CKANAggregator getAggregator(boolean rowAttrPersistence) {
protected NGSIGenericAggregator getAggregator(boolean rowAttrPersistence) {
if (rowAttrPersistence) {
return new RowAggregator();
return new NGSIGenericRowAggregator();
} else {
return new ColumnAggregator();
return new NGSIGenericColumnAggregator();
} // if else
} // getAggregator

private void persistAggregation(CKANAggregator aggregator, String service, String servicePath)
private void persistAggregation(NGSIGenericAggregator aggregator, String service, String servicePath)
throws CygnusBadConfiguration, CygnusRuntimeError, CygnusPersistenceError {
String aggregation = aggregator.getAggregation();
ArrayList<JsonObject> jsonObjects = NGSIUtils.linkedHashMapToJsonListWithOutEmptyMD(aggregator.getAggregationToPersist());
String aggregation = "";
for (JsonObject jsonObject : jsonObjects) {
if (aggregation.isEmpty()) {
aggregation = jsonObject.toString();
} else {
aggregation += "," + jsonObject;
}
}
String orgName = aggregator.getOrgName(enableLowercase);
String pkgName = aggregator.getPkgName(enableLowercase);
String resName = aggregator.getResName(enableLowercase);
Expand All @@ -521,7 +360,7 @@ private void persistAggregation(CKANAggregator aggregator, String service, Strin

// Do try-catch only for metrics gathering purposes... after that, re-throw
try {
if (aggregator instanceof RowAggregator) {
if (aggregator instanceof NGSIGenericRowAggregator) {
persistenceBackend.persist(orgName, pkgName, resName, aggregation, true);
} else {
persistenceBackend.persist(orgName, pkgName, resName, aggregation, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,29 @@ public static ArrayList<JsonObject> linkedHashMapToJsonList(LinkedHashMap<String
return jsonStrings;
}

public static ArrayList<JsonObject> linkedHashMapToJsonListWithOutEmptyMD(LinkedHashMap<String, ArrayList<JsonElement>> aggregation) {
ArrayList<JsonObject> jsonStrings = new ArrayList<>();
int numEvents = collectionSizeOnLinkedHashMap(aggregation);
for (int i = 0; i < numEvents; i++) {
Iterator<String> it = aggregation.keySet().iterator();
JsonObject jsonObject = new JsonObject();
while (it.hasNext()) {
String entry = (String) it.next();
ArrayList<JsonElement> values = (ArrayList<JsonElement>) aggregation.get(entry);
if (values.get(i) != null) {
if (entry.contains("_md") || entry.contains("Md")) {
if (!values.get(i).toString().contains("[]"))
jsonObject.add(entry, values.get(i));
} else {
jsonObject.add(entry, values.get(i));
}
}
}
jsonStrings.add(jsonObject);
}
return jsonStrings;
}

public static LinkedHashMap<String, ArrayList<JsonElement>> cropLinkedHashMap(LinkedHashMap<String, ArrayList<JsonElement>> aggregation, ArrayList<String> keysToCrop) {
LinkedHashMap<String, ArrayList<JsonElement>> cropedLinkedHashMap = (LinkedHashMap<String, ArrayList<JsonElement>>) aggregation.clone();
for (String key : keysToCrop) {
Expand Down

0 comments on commit d50030e

Please sign in to comment.