Skip to content

Commit

Permalink
lazy-loaded singleton implemented for DefaultOutput (#63)
Browse files Browse the repository at this point in the history
* lazy-loaded singleton implemented for DefaultOutput

* add comment about init on demand holder idiom with link

* separate DefaultOutputHolder to separate file, rename to InitializationOnDemandHolder

* add logging for tls mode
  • Loading branch information
eemhu authored Jan 23, 2025
1 parent af9688d commit daca951
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 115 deletions.
53 changes: 6 additions & 47 deletions src/main/java/com/teragrep/aer_02/DefaultOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,16 @@

import com.codahale.metrics.*;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.client.*;
import com.teragrep.rlp_01.pool.Pool;
import com.teragrep.rlp_01.pool.UnboundPool;

import java.util.logging.Logger;

/**
* Implementation of an shareable output. Required to be thread-safe.
* Implementation of a shareable output. Required to be thread-safe.
*/
final class DefaultOutput implements Output {

private final Pool<IManagedRelpConnection> relpConnectionPool;
private final String relpAddress;
private final int relpPort;
private final Logger logger;
public final class DefaultOutput implements Output {

DefaultOutput(
Logger logger,
Expand All @@ -72,7 +67,6 @@ final class DefaultOutput implements Output {
) {
this(
logger,
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
Expand All @@ -87,55 +81,20 @@ final class DefaultOutput implements Output {
);
}

DefaultOutput(
Logger logger,
String name,
RelpConnectionConfig relpConnectionConfig,
MetricRegistry metricRegistry
) {
this(
logger,
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
relpConnectionConfig.asRelpConfig(),
name,
metricRegistry,
relpConnectionConfig.asSocketConfig()
),
new ManagedRelpConnectionStub()
)
);
}

DefaultOutput(
Logger logger,
RelpConnectionConfig relpConnectionConfig,
Pool<IManagedRelpConnection> relpConnectionPool
) {
this.relpAddress = relpConnectionConfig.relpAddress();
this.relpPort = relpConnectionConfig.relpPort();

DefaultOutput(Logger logger, Pool<IManagedRelpConnection> relpConnectionPool) {
this.relpConnectionPool = relpConnectionPool;
this.logger = logger;
logger.info("DefaultOutput constructor done");
}

private final Pool<IManagedRelpConnection> relpConnectionPool;

@Override
public void accept(byte[] syslogMessage) {
RelpBatch batch = new RelpBatch();
batch.insert(syslogMessage);
IManagedRelpConnection connection = relpConnectionPool.get();
connection.ensureSent(syslogMessage);
relpConnectionPool.offer(connection);
}

@Override
public String toString() {
return "DefaultOutput{" + "relpAddress='" + relpAddress + '\'' + ", relpPort=" + relpPort + '}';
}

@Override
public void close() {
relpConnectionPool.close();
Expand Down
146 changes: 146 additions & 0 deletions src/main/java/com/teragrep/aer_02/InitializationOnDemandHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Teragrep Eventhub Reader as an Azure Function
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.aer_02;

import com.codahale.metrics.MetricRegistry;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.aer_02.metrics.JmxReport;
import com.teragrep.aer_02.metrics.PrometheusReport;
import com.teragrep.aer_02.metrics.Report;
import com.teragrep.aer_02.metrics.Slf4jReport;
import com.teragrep.aer_02.tls.AzureSSLContextSupplier;
import com.teragrep.rlp_01.client.IManagedRelpConnection;
import com.teragrep.rlp_01.client.ManagedRelpConnectionStub;
import com.teragrep.rlp_01.pool.Pool;
import com.teragrep.rlp_01.pool.UnboundPool;
import io.prometheus.client.dropwizard.DropwizardExports;

import java.util.logging.Logger;

/**
* Uses Initialization on demand holder idiom. See
* <a href="https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom">Wikipedia article</a> for more details.
*/
public final class InitializationOnDemandHolder {

private InitializationOnDemandHolder() {

}

private static final LazyInstance INSTANCE = new LazyInstance();

public static LazyInstance lazyInstance() {
return INSTANCE;
}

static final class LazyInstance {

private final DefaultOutput defaultOutput;
private final MetricRegistry metricRegistry;
private final Report report;

private LazyInstance() {
final Logger logger = Logger.getAnonymousLogger();
metricRegistry = new MetricRegistry();
Sourceable environmentSource = new EnvironmentSource();
RelpConnectionConfig relpConnectionConfig = new RelpConnectionConfig(environmentSource);
report = new JmxReport(
new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry),
metricRegistry
);

report.start();

Pool<IManagedRelpConnection> relpConnectionPool;
if (environmentSource.source("relp.tls.mode", "none").equals("keyVault")) {
logger.info("Using keyVault TLS mode");
relpConnectionPool = new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
relpConnectionConfig.asRelpConfig(),
"defaultOutput",
metricRegistry,
relpConnectionConfig.asSocketConfig(),
new AzureSSLContextSupplier()
),
new ManagedRelpConnectionStub()
);
}
else {
logger.info("Using plain mode");
relpConnectionPool = new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
relpConnectionConfig.asRelpConfig(),
"defaultOutput",
metricRegistry,
relpConnectionConfig.asSocketConfig()
),
new ManagedRelpConnectionStub()
);
}

defaultOutput = new DefaultOutput(logger, relpConnectionPool);

Runtime.getRuntime().addShutdownHook(new Thread(this::close));
}

public DefaultOutput defaultOutput() {
return defaultOutput;
}

public MetricRegistry metricRegistry() {
return metricRegistry;
}

private void close() {
report.close();
defaultOutput.close();
}
}
}
50 changes: 8 additions & 42 deletions src/main/java/com/teragrep/aer_02/SyslogBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,12 @@
*/
package com.teragrep.aer_02;

import com.codahale.metrics.MetricRegistry;
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.aer_02.json.JsonRecords;
import com.teragrep.aer_02.metrics.JmxReport;
import com.teragrep.aer_02.metrics.PrometheusReport;
import com.teragrep.aer_02.metrics.Report;
import com.teragrep.aer_02.metrics.Slf4jReport;
import com.teragrep.aer_02.tls.AzureSSLContextSupplier;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.dropwizard.DropwizardExports;
import io.prometheus.client.exporter.common.TextFormat;

import java.io.*;
Expand Down Expand Up @@ -125,41 +117,18 @@ public void eventHubTriggerToSyslog(
final Sourceable configSource = new EnvironmentSource();
final String hostname = new Hostname("localhost").hostname();

final MetricRegistry metricRegistry = new MetricRegistry();

context.getLogger().info("initializing at " + this);

final Report report = new JmxReport(
new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry),
metricRegistry
);
report.start();

DefaultOutput defaultOutput;
if (configSource.source("relp.tls.mode", "none").equals("keyVault")) {
context.getLogger().info("connection tls enabled");

defaultOutput = new DefaultOutput(
context.getLogger(),
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry,
new AzureSSLContextSupplier()
);
}
else {
context.getLogger().info("connection tls disabled");
defaultOutput = new DefaultOutput(
context.getLogger(),
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry
);
}

final InitializationOnDemandHolder.LazyInstance lazyInstance = InitializationOnDemandHolder.lazyInstance();
final DefaultOutput defaultOutput = lazyInstance.defaultOutput();
context.getLogger().info("initialized at " + this);

EventDataConsumer consumer = new EventDataConsumer(configSource, defaultOutput, hostname, metricRegistry);
final EventDataConsumer consumer = new EventDataConsumer(
configSource,
defaultOutput,
hostname,
lazyInstance.metricRegistry()
);

for (int index = 0; index < events.length; index++) {
if (events[index] != null) {
Expand All @@ -175,9 +144,6 @@ public void eventHubTriggerToSyslog(
context.getLogger().warning("eventHubTriggerToSyslog event data is null");
}
}

// close connections to prevent resource leak
defaultOutput.close();
}
catch (Throwable t) {
context.getLogger().severe("exiting because caught Throwable: " + t);
Expand Down
38 changes: 17 additions & 21 deletions src/test/java/com/teragrep/aer_02/DefaultOutputTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,16 @@ public void testSendLatencyMetricIsCapped() { // Should only keep information on
new ManagedRelpConnectionStub()
);

try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {
DefaultOutput output = new DefaultOutput(Logger.getAnonymousLogger(), pool);

try {
for (int i = 0; i < measurementLimit + 100; i++) { // send more messages than the limit is
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
}
finally {
output.close();
}

Assertions.assertEquals(measurementLimit, sendReservoir.size()); // should have measurementLimit amount of records saved
Assertions.assertEquals(1, connectReservoir.size()); // only connected once
Expand Down Expand Up @@ -145,15 +143,14 @@ public void testConnectionLatencyMetricIsCapped() { // Should take information o
new ManagedRelpConnectionStub()
);

try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {
DefaultOutput output = new DefaultOutput(Logger.getAnonymousLogger(), pool);

try {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
finally {
output.close();
}

Assertions.assertEquals(1, sendReservoir.size()); // only sent 1 message
Assertions.assertEquals(measurementLimit, connectReservoir.size()); // should have measurementLimit amount of records saved
Expand Down Expand Up @@ -182,15 +179,14 @@ public void testConnectionLatencyMetricWithException() { // should not update va
new ManagedRelpConnectionStub()
);

try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {
DefaultOutput output = new DefaultOutput(Logger.getAnonymousLogger(), pool);

try {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}
finally {
output.close();
}

Timer sendTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "sendLatency"));
Timer connectionTimer = metricRegistry.timer(name(DefaultOutput.class, "<[defaultOutput]>", "connectLatency"));
Expand Down
Loading

0 comments on commit daca951

Please sign in to comment.