Skip to content

Commit

Permalink
Separate json records into multiple messages (#40)
Browse files Browse the repository at this point in the history
* implement JsonRecords object which parses json records type event into separate messages.

* JsonRecords.java: make rv immutable; use try-with-resources with JsonReader

Co-authored-by: Moonbow-1 <[email protected]>

* comments for JsonRecords.java event passthroughs

* make expected messages explicit

* add EqualsVerifier and test for JsonRecords.

---------

Co-authored-by: Moonbow-1 <[email protected]>
  • Loading branch information
eemhu and MoonBow-1 authored Dec 2, 2024
1 parent 0b8dc5d commit 67f9d1d
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 2 deletions.
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@
<version>5.9.2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/nl.jqno.equalsverifier/equalsverifier -->
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<version>3.17.4</version>
<scope>test</scope>
</dependency>
<!-- logging -->
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j2-impl -->
<dependency>
Expand All @@ -162,6 +169,17 @@
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<!-- JSON libraries -->
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.parsson</groupId>
<artifactId>parsson</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>
<build>
<directory>${project.basedir}/target</directory>
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/teragrep/aer_02/SyslogBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.json.JsonRecords;
import com.teragrep.aer_02.metrics.JmxReport;
import com.teragrep.aer_02.metrics.PrometheusReport;
import com.teragrep.aer_02.metrics.Report;
Expand Down Expand Up @@ -144,8 +145,11 @@ public void eventHubTriggerToSyslog(
if (events[index] != null) {
final ZonedDateTime et = ZonedDateTime.parse(enqueuedTimeUtcArray.get(index) + "Z"); // needed as the UTC time presented does not have a TZ
context.getLogger().fine("Accepting event: " + events[index]);
consumer
.accept(events[index], partitionContext, et, offsetArray.get(index), propertiesArray[index], systemPropertiesArray[index]);
final String[] records = new JsonRecords(events[index]).records();
for (final String record : records) {
consumer
.accept(record, partitionContext, et, offsetArray.get(index), propertiesArray[index], systemPropertiesArray[index]);
}
}
else {
context.getLogger().warning("eventHubTriggerToSyslog event data is null");
Expand Down
113 changes: 113 additions & 0 deletions src/main/java/com/teragrep/aer_02/json/JsonRecords.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Teragrep Eventhub Reader as an Azure Function
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.aer_02.json;

import jakarta.json.*;
import jakarta.json.stream.JsonParsingException;

import java.io.StringReader;
import java.util.Objects;

public final class JsonRecords {

private final String event;

public JsonRecords(final String event) {
this.event = event;
}

/**
* Expects <code>{"records":[{},{}, ..., {}]}</code> type JSON string.
*
* @return individual records as an array or the original event.
*/
public String[] records() {
final String[] rv = new String[] {
event
};

final JsonStructure mainStructure;
try (final JsonReader reader = Json.createReader(new StringReader(event))) {
mainStructure = reader.read();
}
catch (JsonParsingException e) {
// pass event through as-is if JSON parsing fails
return rv;
}

final JsonValue recordsStructure = mainStructure.getValue("/records");

if (recordsStructure.getValueType().equals(JsonValue.ValueType.ARRAY)) {
final JsonArray recordsArray = recordsStructure.asJsonArray();
String[] records = new String[recordsArray.size()];

for (int i = 0; i < recordsArray.size(); i++) {
// Take string representation of inner value regardless of actual datatype
records[i] = recordsArray.get(i).toString();
}

return records;
}

// pass event through as-is if "records" is not an array type
return rv;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
JsonRecords that = (JsonRecords) o;
return Objects.equals(event, that.event);
}

@Override
public int hashCode() {
return Objects.hashCode(event);
}
}
57 changes: 57 additions & 0 deletions src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
import jakarta.json.Json;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -154,6 +155,62 @@ void testSyslogBridge() {
Assertions.assertEquals(3, loops);
}

@Test
void testSyslogBridgeWithJsonRecordsData() {
PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
Map<String, Object> props = new HashMap<>();
final SyslogBridge bridge = new SyslogBridge();

final String jsonRecords = Json
.createObjectBuilder()
.add("records", Json.createArrayBuilder().add("record1").add("record2").add("record3").build())
.build()
.toString();

bridge.eventHubTriggerToSyslog(new String[] {
jsonRecords, jsonRecords, jsonRecords
}, pcf.asMap(), new Map[] {
props, props, props
}, new Map[] {
new SystemPropsFake("0").asMap(), new SystemPropsFake("1").asMap(), new SystemPropsFake("2").asMap()
}, Arrays.asList("2010-01-01T00:00:00", "2010-01-02T00:00:00", "2010-01-03T00:00:00"),
Arrays.asList("0", "1", "2"), new ExecutionContextFake()
);

// there are 3 JSON records-type events with 3 records each, totalling 9 messages
Assertions.assertEquals(9, messages.size());

final String[] expectedSeqNums = new String[] {
"0", "0", "0", "1", "1", "1", "2", "2", "2"
};

final String[] expectedMessages = new String[] {
"\"record1\"",
"\"record2\"",
"\"record3\"",
"\"record1\"",
"\"record2\"",
"\"record3\"",
"\"record1\"",
"\"record2\"",
"\"record3\""
};

int loops = 0;
for (String message : messages) {
final RFC5424Frame frame = new RFC5424Frame(false);
frame.load(new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)));
Assertions.assertTrue(Assertions.assertDoesNotThrow(frame::next));
Assertions.assertEquals(expectedMessages[loops], frame.msg.toString());
Assertions.assertEquals("localhost.localdomain", frame.hostname.toString());
Assertions.assertEquals("aer-02", frame.appName.toString());
Assertions.assertEquals(expectedSeqNums[loops], frame.msgId.toString());
loops++;
}

Assertions.assertEquals(9, loops);
}

@Test
void testSyslogBridgeMetrics() {
PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
Expand Down
108 changes: 108 additions & 0 deletions src/test/java/com/teragrep/aer_02/json/JsonRecordsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Teragrep Eventhub Reader as an Azure Function
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.aer_02.json;

import jakarta.json.Json;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class JsonRecordsTest {

@Test
void testRecordsAsObjectsCase() {
final String records = Json
.createObjectBuilder()
.add("records", Json.createArrayBuilder().add(Json.createObjectBuilder().add("a", "b")).add(Json.createObjectBuilder().add("c", "d"))).build().toString();
JsonRecords jr = new JsonRecords(records);
final String[] result = jr.records();
Assertions.assertEquals(2, result.length);
Assertions.assertEquals("{\"a\":\"b\"}", result[0]);
Assertions.assertEquals("{\"c\":\"d\"}", result[1]);
}

@Test
void testRecordsAsStringsAndNumbersCase() {
final String records = Json
.createObjectBuilder()
.add("records", Json.createArrayBuilder().add("abc").add(123).build())
.build()
.toString();
JsonRecords jr = new JsonRecords(records);
final String[] result = jr.records();
Assertions.assertEquals(2, result.length);
Assertions.assertEquals("\"abc\"", result[0]);
Assertions.assertEquals("123", result[1]);
}

@Test
void testNonJsonRecordsCase() {
final String records = "{{]///...<>;";
JsonRecords jr = new JsonRecords(records);
final String[] result = jr.records();
Assertions.assertEquals(1, result.length);
Assertions.assertEquals("{{]///...<>;", result[0]);
}

@Test
void testEquals() {
JsonRecords jr = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
JsonRecords jr2 = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
Assertions.assertEquals(jr, jr2);
}

@Test
void testNotEquals() {
JsonRecords jr = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
JsonRecords jr2 = new JsonRecords("{\"e\":\"f\",\"g\":\"h\"}");
Assertions.assertNotEquals(jr, jr2);
}

@Test
void testEqualsContract() {
EqualsVerifier.forClass(JsonRecords.class).verify();
}
}

0 comments on commit 67f9d1d

Please sign in to comment.