Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modification of NGSIElasticsearchSink #2194

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private TimeRelatedValues getTimeRelatedValues(long notifiedRecvTimeTs, String a
} // if else

v.recvTimeDt = new Date(v.recvTimeTs);
v.idx = this.index + "-" + this.indexDateFormatter.format(v.recvTimeDt);
v.idx = this.index;
return v;
} // getTimeRelatedValues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void testPersistBatchWithOneAttributeRow() throws Exception {
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -837,7 +837,7 @@ public void testPersistBatchWithOneAttributeColumn() throws Exception {
byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -890,7 +890,7 @@ public void testPersistBatchWithTwoAttributesIgnoreEmptyRow() throws Exception {
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -944,7 +944,7 @@ public void testPersistBatchWithTwoAttributesWithEmptyRow() throws Exception {
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1019,7 +1019,7 @@ public void testPersistBatchWithTwoAttributesIgnoreEmptyColumn() throws Exceptio
byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1075,7 +1075,7 @@ public void testPersistBatchWithTwoAttributesWithEmptyColumn() throws Exception
byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1130,9 +1130,9 @@ public void testPersistBatchWithThreeAttributesWithMetadataRow() throws Exceptio
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

if (timezone == "Asia/Tokyo") {
assertEquals("cygnus-room_service-room_service_path-room1-room-2018.01.02", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
} else {
assertEquals("cygnus-room_service-room_service_path-room1-room-2018.01.01", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
}
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

Expand Down Expand Up @@ -1243,9 +1243,9 @@ public void testPersistBatchWithThreeAttributesWithMetadataColumn() throws Excep
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

if (timezone == "Asia/Tokyo") {
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-2018.01.02", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
} else {
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-2018.01.01", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
}
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

Expand Down Expand Up @@ -1308,7 +1308,7 @@ public void testPersistBatchWithOneNullAttributeRow() throws Exception {
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1356,7 +1356,7 @@ public void testPersistBatchWithOneNullAttributeIgnoreWhiteSpacesRow() throws Ex
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1407,7 +1407,7 @@ public void testPersistBatchWithOneNullAttributeColumn() throws Exception {
byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1457,7 +1457,7 @@ public void testPersistBatchWithOneNullAttributeIgnoreWhiteSpacesColumn() throws
byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1504,7 +1504,7 @@ public void testPersistBatchWithTwoNullAttributesRow() throws Exception {
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1569,7 +1569,7 @@ public void testPersistBatchWithTwoNullAttributesIgnoreWhiteSpacesRow() throws E
sink.persistBatch(batch);
verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture());

assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1637,7 +1637,7 @@ public void testPersistBatchWithTwoNullAttributesColumn() throws Exception {
byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down Expand Up @@ -1686,7 +1686,7 @@ public void testPersistBatchWithTwoNullAttributesIgnoreWhiteSpacesColumn() throw
byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8));
String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase();

assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue());
assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue());
assertEquals("cygnus_type", mappingTypeCaptor.getValue());

List<Map<String, String>> requestedData = dataCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ So `NGSIElasticsearchSink` constructs the index name according to the following
4. when you use `Column-like storing`, append a hash string calculated by the attribute names to be stored
* MD5 hash is calculated by concatinated attribute names such as `attrName1:attrName2:...`.
5. when it starts with `-, _, +`, append 'idx' at the beggning of the base string.
6. append the created &lt;date&gt; such as `yyyy.mm.dd`.

According to the above rules, `NGSIElasticsearchSink` can handle the multiple subscriptions with different attributes of the same entity.

Expand Down