Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Event Log file cleanup issue #36

Merged
merged 9 commits into from
Jul 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.opensearch.performanceanalyzer.collectors.MasterServiceEventMetrics;
import org.opensearch.performanceanalyzer.collectors.MasterServiceMetrics;
import org.opensearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.MetricsPurgeActivity;
import org.opensearch.performanceanalyzer.collectors.NetworkInterfaceCollector;
import org.opensearch.performanceanalyzer.collectors.NodeDetailsCollector;
import org.opensearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector;
Expand Down Expand Up @@ -223,8 +222,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MetricsPurgeActivity());

scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new NodeDetailsCollector(configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static void configureMetrics() {
MetricsConfiguration.CONFIG_MAP.put(NodeStatsAllShardsMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(NodeStatsFixedShardsMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(
MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0, 0));
MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0));
MetricsConfiguration.CONFIG_MAP.put(MasterServiceMetrics.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,42 @@
package org.opensearch.performanceanalyzer.writer;


import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;

public class EventLogQueueProcessor {
private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class);

private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1);
private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds
private final EventLogFileHandler eventLogFileHandler;
private final long initialDelayMillis;
private final long purgePeriodicityMillis;
private final PerformanceAnalyzerController controller;
private long lastCleanupTimeBucket;
private long lastTimeBucket;

public EventLogQueueProcessor(
EventLogFileHandler eventLogFileHandler,
long initialDelayMillis,
Expand All @@ -64,6 +71,7 @@ public EventLogQueueProcessor(
this.eventLogFileHandler = eventLogFileHandler;
this.initialDelayMillis = initialDelayMillis;
this.purgePeriodicityMillis = purgePeriodicityMillis;
this.lastCleanupTimeBucket = 0;
this.lastTimeBucket = 0;
this.controller = controller;
}
Expand Down Expand Up @@ -97,7 +105,7 @@ public void scheduleExecutor() {

// This executes every purgePeriodicityMillis interval.
public void purgeQueueAndPersist() {
// Return if the writer is not enabled.
// Drain the Queue, and if writer is enabled then persist to event log file.
if (PerformanceAnalyzerConfigAction.getInstance() == null) {
return;
} else if (!controller.isPerformanceAnalyzerEnabled()) {
Expand Down Expand Up @@ -162,6 +170,33 @@ public void purgeQueueAndPersist() {
eventLogFileHandler.writeTmpFile(nextMetrics, nextTimeBucket);
}
LOG.debug("Writing to disk complete.");

// Delete the older event log files every filesCleanupPeriod (defaults to 60)
// In case files deletion takes longer/fails, we are okay with eventQueue reaching
// its max size (100000), post that {@link PerformanceAnalyzerMetrics#emitMetric()}
// will emit metric {@link WriterMetrics#METRICS_WRITE_ERROR} and return.
cleanup();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the cleanup is called after the files are written, the first time this thread runs, it will delete even the most recently written files because the call to cleanup will not have the lastCleanupTimeBucket and hence deleteAllFiles will be called deleting everything but the .tmp files.

Because the full cleanup is just a one time cleanup and happens at the start, let's do it in the constructor itself ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This would cause to lose the first timebucket metrics. I would address this as part of separate PR I will have for StatCollector refactoring.

}

private void cleanup() {
long currentTimeMillis = System.currentTimeMillis();
if (lastCleanupTimeBucket != 0) {
// Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s)
long currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(currentTimeMillis);
if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) {
// Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket)
List<String> filesForCleanup = LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket)
.filter(timeMillis -> timeMillis % MetricsConfiguration.SAMPLING_INTERVAL == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList());
eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup));
lastCleanupTimeBucket = currCleanupTimeBucket;
}
} else {
// First purge since the start-up, cleanup any lingering files.
eventLogFileHandler.deleteAllFiles();
lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(currentTimeMillis);
}
}

private void writeAndRotate(
Expand Down