Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/attribute metadata #1923

Merged
merged 7 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[cygnus-ngsi][RowAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902)
[cygnus-ngsi][ColumnAggregator] Fix attribute type key to make it unique. (#1904)
[cygnus-ngsi][PostgisSink, PostgreSQLSink] Implement records expiration for Postgis and PostgreSQL sinks (#1915)
[cygnus-ngsi][ColumnAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void aggregate(NGSIEvent event) {
String attrType = contextAttribute.getType();
JsonElement attrValue = contextAttribute.getValue();
String attrMetadata = contextAttribute.getContextMetadata();
JsonArray jsonAttrMetadata = new Gson().fromJson(attrMetadata, JsonArray.class);
JsonParser jsonParser = new JsonParser();
JsonArray jsonAttrMetadata = (JsonArray) jsonParser.parse(attrMetadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some test should be added covering the failing case (now solved by the PR)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous PR #1919 were modifying existing tests about this feature

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I understand new tests are not needed.

In that case, NTC.

LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type=" + attrType + ")");
if (isEnableGeoParse() && (attrType.equals("geo:json") || attrType.equals("geo:point"))) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package com.telefonica.iot.cygnus.aggregation;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.telefonica.iot.cygnus.containers.NotifyContextRequest;
import com.telefonica.iot.cygnus.interceptors.NGSIEvent;
Expand Down Expand Up @@ -81,6 +83,8 @@ public void aggregate(NGSIEvent event) {
String attrType = contextAttribute.getType();
JsonElement attrValue = contextAttribute.getValue();
String attrMetadata = contextAttribute.getContextMetadata();
JsonParser jsonParser = new JsonParser();
JsonArray jsonAttrMetadata = (JsonArray) jsonParser.parse(attrMetadata);
LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type="
+ attrType + ")");
// aggregate the attribute information
Expand All @@ -92,7 +96,7 @@ public void aggregate(NGSIEvent event) {
aggregation.get(NGSIConstants.ATTR_NAME).add(new JsonPrimitive(attrName));
aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType));
aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue);
aggregation.get(NGSIConstants.ATTR_MD).add(new JsonPrimitive(attrMetadata));
aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata);
} // for
setAggregation(aggregation);
} // aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,11 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA
genericAggregator.getAggregation().get(NGSIConstants.ATTR_TYPE).get(i).toString());
String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName;
line += csvSeparator + printableAttrMdFileName;
genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(),metadata.getAsString(), recvTimeTs));
if (metadata.isJsonPrimitive()) {
genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(), metadata.getAsString(), recvTimeTs));
} else {
genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(), metadata.toString(), recvTimeTs));
}
}else {
if (genericAggregator.isAttrMetadataStore())
line += csvSeparator + "NULL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,12 @@ private void persistAggregation(NGSIGenericAggregator aggregator) throws CygnusP
BasicDBObject basicDBObject = BasicDBObject.parse(jsonObjects.get(i).toString());
aggregation.add(new Document(basicDBObject.toMap()));
if (rowAttrPersistence) {
Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
Long timeInstant;
if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
} else {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString());
}
if (timeInstant != null) {
aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,8 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun
}
}
System.out.println(aggregation);
String correctBatch = "{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someNumber\":2,\"somneBoolean\":true,\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someGeoJson\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\",\"someJson\":\"{\\\"String\\\": \\\"string\\\"}\",\"someString\":\"foo\",\"someString2\":\"\"},{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someName1\":\"-3.7167, 40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\"}";
assertEquals(aggregation, correctBatch);
} catch (Exception e) {
fail();
}
Expand Down Expand Up @@ -1462,6 +1464,8 @@ public void testNativeTypeRowBatch() throws CygnusBadConfiguration, CygnusRuntim
}
}
System.out.println(aggregation);
String correctBatch = "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someNumber\",\"attrType\":\"number\",\"attrValue\":2},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"somneBoolean\",\"attrType\":\"Boolean\",\"attrValue\":true},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someDate\",\"attrType\":\"DateTime\",\"attrValue\":\"2016-09-21T01:23:00.00Z\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someGeoJson\",\"attrType\":\"geo:json\",\"attrValue\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someJson\",\"attrType\":\"json\",\"attrValue\":\"{\\\"String\\\": \\\"string\\\"}\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString\",\"attrType\":\"string\",\"attrValue\":\"foo\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString2\",\"attrType\":\"string\",\"attrValue\":\"\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName2\",\"attrType\":\"someType2\",\"attrValue\":\"someValue2\"}";
assertEquals(correctBatch, aggregation);
} catch (Exception e) {
fail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ public void testNativeTypeRowBatchJson() throws CygnusBadConfiguration, CygnusRu
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someJson\",\"attrType\":\"json\",\"attrValue\":\"{\"String\": \"string\"}\",\"attrMd\":[]}\n" +
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString\",\"attrType\":\"string\",\"attrValue\":\"foo\",\"attrMd\":[]}\n" +
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString2\",\"attrType\":\"string\",\"attrValue\":\"\",\"attrMd\":[]}\n" +
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":\"[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]\"}\n" +
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]}\n" +
"{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName2\",\"attrType\":\"someType2\",\"attrValue\":\"someValue2\",\"attrMd\":[]}";
if (ngsihdfsSink.jsonToPersist(aggregator.getAggregationToPersist()).equals(correctBatch)) {
assertTrue(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun
BasicDBObject basicDBObject = BasicDBObject.parse(jsonObjects.get(i).toString());
aggregation.add(new Document(basicDBObject.toMap()));
if (aggregator instanceof NGSIGenericRowAggregator) {
Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
Long timeInstant;
if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
} else {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString());
}
if (timeInstant != null) {
aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant));
} else {
Expand Down Expand Up @@ -286,7 +291,12 @@ public void testNativeTypeRowBatch() throws CygnusBadConfiguration, CygnusRuntim
for (int i = 0 ; i < jsonObjects.size() ; i++) {
aggregation.add(Document.parse(jsonObjects.get(i).toString()));
if (aggregator instanceof NGSIGenericRowAggregator) {
Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
Long timeInstant;
if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString());
} else {
timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString());
}
if (timeInstant != null) {
aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant));
} else {
Expand Down