From 4ebcd71a2a0be84cfb20d6e042666e5066182dde Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Fri, 4 Sep 2020 09:10:01 +0200 Subject: [PATCH 1/6] fix metadata json Array parsing --- .../iot/cygnus/aggregation/NGSIGenericColumnAggregator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java index d286eff94..0c5390862 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericColumnAggregator.java @@ -108,7 +108,8 @@ public void aggregate(NGSIEvent event) { String attrType = contextAttribute.getType(); JsonElement attrValue = contextAttribute.getValue(); String attrMetadata = contextAttribute.getContextMetadata(); - JsonArray jsonAttrMetadata = new Gson().fromJson(attrMetadata, JsonArray.class); + JsonParser jsonParser = new JsonParser(); + JsonArray jsonAttrMetadata = (JsonArray) jsonParser.parse(attrMetadata); LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type=" + attrType + ")"); if (isEnableGeoParse() && (attrType.equals("geo:json") || attrType.equals("geo:point"))) { try { From a8e3ccd1b4439f9a78ef0755739be9b6a90b9541 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Fri, 4 Sep 2020 10:30:55 +0200 Subject: [PATCH 2/6] fix metadata json Array parsing for row --- .../iot/cygnus/aggregation/NGSIGenericRowAggregator.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java index d5351274f..092d9e6f2 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java @@ -18,7 +18,9 @@ package com.telefonica.iot.cygnus.aggregation; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.telefonica.iot.cygnus.containers.NotifyContextRequest; import com.telefonica.iot.cygnus.interceptors.NGSIEvent; @@ -81,6 +83,8 @@ public void aggregate(NGSIEvent event) { String attrType = contextAttribute.getType(); JsonElement attrValue = contextAttribute.getValue(); String attrMetadata = contextAttribute.getContextMetadata(); + JsonParser jsonParser = new JsonParser(); + JsonArray jsonAttrMetadata = (JsonArray) jsonParser.parse(attrMetadata); LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type=" + attrType + ")"); // aggregate the attribute information @@ -92,7 +96,8 @@ public void aggregate(NGSIEvent event) { aggregation.get(NGSIConstants.ATTR_NAME).add(new JsonPrimitive(attrName)); aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType)); aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue); - aggregation.get(NGSIConstants.ATTR_MD).add(new JsonPrimitive(attrMetadata)); + aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata); + //aggregation.get(NGSIConstants.ATTR_MD).add(new JsonPrimitive(attrMetadata)); } // for setAggregation(aggregation); } // aggregate From 27faf7d6e085d9801305b373b8ec5389f82d9ee5 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Fri, 4 Sep 2020 10:31:18 +0200 Subject: [PATCH 3/6] add batch validation to ckan tests --- .../com/telefonica/iot/cygnus/sinks/NGSICKANSinkTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSICKANSinkTest.java b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSICKANSinkTest.java index 179fa4a38..003af88e3 100644 --- a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSICKANSinkTest.java +++ b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSICKANSinkTest.java @@ -1261,6 +1261,8 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun } } System.out.println(aggregation); + String correctBatch = "{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someNumber\":2,\"somneBoolean\":true,\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someGeoJson\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\",\"someJson\":\"{\\\"String\\\": \\\"string\\\"}\",\"someString\":\"foo\",\"someString2\":\"\"},{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someName1\":\"-3.7167, 40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\"}"; + assertEquals(aggregation, correctBatch); } catch (Exception e) { fail(); } @@ -1462,6 +1464,8 @@ public void testNativeTypeRowBatch() throws CygnusBadConfiguration, CygnusRuntim } } System.out.println(aggregation); + String correctBatch = "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someNumber\",\"attrType\":\"number\",\"attrValue\":2},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"somneBoolean\",\"attrType\":\"Boolean\",\"attrValue\":true},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someDate\",\"attrType\":\"DateTime\",\"attrValue\":\"2016-09-21T01:23:00.00Z\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someGeoJson\",\"attrType\":\"geo:json\",\"attrValue\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someJson\",\"attrType\":\"json\",\"attrValue\":\"{\\\"String\\\": \\\"string\\\"}\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString\",\"attrType\":\"string\",\"attrValue\":\"foo\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString2\",\"attrType\":\"string\",\"attrValue\":\"\"},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]},{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName2\",\"attrType\":\"someType2\",\"attrValue\":\"someValue2\"}"; + assertEquals(correctBatch, aggregation); } catch (Exception e) { fail(); } From b149fb437bb72f380a41fd70b21f43bfdef40dc4 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez Date: Fri, 4 Sep 2020 10:32:05 +0200 Subject: [PATCH 4/6] fix json like sinks to manage metadata as array --- .../telefonica/iot/cygnus/sinks/NGSIHDFSSink.java | 6 +++++- .../telefonica/iot/cygnus/sinks/NGSIMongoSink.java | 7 ++++++- .../iot/cygnus/sinks/NGSIHDFSSinkTest.java | 2 +- .../iot/cygnus/sinks/NGSIMongoSinkTest.java | 14 ++++++++++++-- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java index 25ae54bfa..5728ddc36 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSink.java @@ -666,7 +666,11 @@ protected NGSIGenericAggregator processCSVFields (NGSIGenericAggregator genericA genericAggregator.getAggregation().get(NGSIConstants.ATTR_TYPE).get(i).toString()); String printableAttrMdFileName = "hdfs:///user/" + username + "/" + attrMdFileName; line += csvSeparator + printableAttrMdFileName; - genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(),metadata.getAsString(), recvTimeTs)); + if (metadata.isJsonPrimitive()) { + genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(), metadata.getAsString(), recvTimeTs)); + } else { + genericAggregator.setMdAggregations(persistMetadata(attrMdFileName, genericAggregator.getMdAggregations(), metadata.toString(), recvTimeTs)); + } }else { if (genericAggregator.isAttrMetadataStore()) line += csvSeparator + "NULL"; diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSink.java index 657e05a6c..955ed0ba5 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSink.java @@ -219,7 +219,12 @@ private void persistAggregation(NGSIGenericAggregator aggregator) throws CygnusP BasicDBObject basicDBObject = BasicDBObject.parse(jsonObjects.get(i).toString()); aggregation.add(new Document(basicDBObject.toMap())); if (rowAttrPersistence) { - Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + Long timeInstant; + if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + } else { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString()); + } if (timeInstant != null) { aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant)); } else { diff --git a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSinkTest.java b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSinkTest.java index 873a529d1..ae73d7762 100644 --- a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSinkTest.java +++ b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIHDFSSinkTest.java @@ -1940,7 +1940,7 @@ public void testNativeTypeRowBatchJson() throws CygnusBadConfiguration, CygnusRu "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someJson\",\"attrType\":\"json\",\"attrValue\":\"{\"String\": \"string\"}\",\"attrMd\":[]}\n" + "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString\",\"attrType\":\"string\",\"attrValue\":\"foo\",\"attrMd\":[]}\n" + "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someString2\",\"attrType\":\"string\",\"attrValue\":\"\",\"attrMd\":[]}\n" + - "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":\"[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]\"}\n" + + "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName1\",\"attrType\":\"someType1\",\"attrValue\":\"-3.7167, 40.3833\",\"attrMd\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]}\n" + "{\"recvTimeTs\":\"1461136795801\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"attrName\":\"someName2\",\"attrType\":\"someType2\",\"attrValue\":\"someValue2\",\"attrMd\":[]}"; if (ngsihdfsSink.jsonToPersist(aggregator.getAggregationToPersist()).equals(correctBatch)) { assertTrue(true); diff --git a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSinkTest.java b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSinkTest.java index 35e00f74a..6a2814e3e 100644 --- a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSinkTest.java +++ b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIMongoSinkTest.java @@ -233,7 +233,12 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun BasicDBObject basicDBObject = BasicDBObject.parse(jsonObjects.get(i).toString()); aggregation.add(new Document(basicDBObject.toMap())); if (aggregator instanceof NGSIGenericRowAggregator) { - Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + Long timeInstant; + if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + } else { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString()); + } if (timeInstant != null) { aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant)); } else { @@ -286,7 +291,12 @@ public void testNativeTypeRowBatch() throws CygnusBadConfiguration, CygnusRuntim for (int i = 0 ; i < jsonObjects.size() ; i++) { aggregation.add(Document.parse(jsonObjects.get(i).toString())); if (aggregator instanceof NGSIGenericRowAggregator) { - Long timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + Long timeInstant; + if (aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).isJsonPrimitive()) { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).getAsString()); + } else { + timeInstant = CommonUtils.getTimeInstant(aggregator.getAggregation().get(NGSIConstants.ATTR_MD).get(i).toString()); + } if (timeInstant != null) { aggregation.get(i).append(NGSIConstants.RECV_TIME, new Date(timeInstant)); } else { From 1a398902d5a376f34b0d9fb54a368f3aec63d2d7 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez <44349593+IvanHdzC@users.noreply.github.com> Date: Fri, 4 Sep 2020 10:33:19 +0200 Subject: [PATCH 5/6] Update CHANGES_NEXT_RELEASE --- CHANGES_NEXT_RELEASE | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index ceac62861..dc276304a 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1,3 +1,4 @@ +[cygnus-ngsi][RowAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902) [cygnus-ngsi][ColumnAggregator] Fix attribute type key to make it unique. (#1904) [cygnus-ngsi][PostgisSink, PostgreSQLSink] Implement records expiration for Postgis and PostgreSQL sinks (#1915) [cygnus-ngsi][ColumnAggregator] Handle metadata as json. This is to upgrade json-like sinks behaviour. (#1902) From 611a63ca19c1bcebc88e8d2bca4119031679c8b0 Mon Sep 17 00:00:00 2001 From: Ivan Hernandez <44349593+IvanHdzC@users.noreply.github.com> Date: Fri, 4 Sep 2020 10:38:06 +0200 Subject: [PATCH 6/6] remove comments --- .../iot/cygnus/aggregation/NGSIGenericRowAggregator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java index 092d9e6f2..da42a4c40 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java @@ -97,7 +97,6 @@ public void aggregate(NGSIEvent event) { aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType)); aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue); aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata); - //aggregation.get(NGSIConstants.ATTR_MD).add(new JsonPrimitive(attrMetadata)); } // for setAggregation(aggregation); } // aggregate