Skip to content

Commit

Permalink
Merge pull request #30 from opensearch-project/khushbr-writer-purge-fix
Browse files Browse the repository at this point in the history
Fixing Event Log file cleanup issue
  • Loading branch information
khushbr authored Jul 13, 2021
2 parents c1c32ab + c84b461 commit 5b41dd0
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 94 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@

public enum StatExceptionCode {
TOTAL_ERROR("TotalError"),
METRICS_WRITE_ERROR("MetricsWriteError"),
METRICS_REMOVE_ERROR("MetricsRemoveError"),

// Tracks the number of VM attach/dataDump or detach failures.
JVM_ATTACH_ERROR("JvmAttachErrror"),

// This error is thrown if the java_pid file is missing.
JVM_ATTACH_ERROR_JAVA_PID_FILE_MISSING("JvmAttachErrorJavaPidFileMissing"),

// The lock could not be acquired within the timeout.
JVM_ATTACH_LOCK_ACQUISITION_FAILED("JvmAttachLockAcquisitionFailed"),

// ThreadState could not be found for an OpenSearch thread in the critical OpenSearch path.
NO_THREAD_STATE_INFO("NoThreadStateInfo"),

// This metric indicates that we successfully completed a thread-dump. Likewise,
// an omission of this should indicate that the thread taking the dump got stuck.
JVM_THREAD_DUMP_SUCCESSFUL("JvmThreadDumpSuccessful"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import org.opensearch.performanceanalyzer.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.collectors.GCInfoCollector;
import org.opensearch.performanceanalyzer.collectors.HeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.MetricsPurgeActivity;
import org.opensearch.performanceanalyzer.collectors.MountedPartitionMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.NetworkE2ECollector;
import org.opensearch.performanceanalyzer.collectors.NetworkInterfaceCollector;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.jvm.GCMetrics;
import org.opensearch.performanceanalyzer.jvm.HeapMetrics;
import org.opensearch.performanceanalyzer.jvm.ThreadList;
Expand All @@ -50,26 +48,22 @@ public class MetricsConfiguration {
public static final int SAMPLING_INTERVAL = 5000;
public static final int ROTATION_INTERVAL = 30000;
public static final int STATS_ROTATION_INTERVAL = 60000;
public static final int DELETION_INTERVAL =
PluginSettings.instance().getMetricsDeletionInterval();

public static class MetricConfig {
public int samplingInterval;
public int rotationInterval;
public int deletionInterval;

public MetricConfig(int samplingInterval, int rotationInterval, int deletionInterval) {
public MetricConfig(int samplingInterval, int rotationInterval) {
this.samplingInterval = samplingInterval;
this.rotationInterval = rotationInterval;
this.deletionInterval = deletionInterval;
}
}

public static final Map<Class, MetricConfig> CONFIG_MAP = new HashMap<>();
public static final MetricConfig cdefault;

static {
cdefault = new MetricConfig(SAMPLING_INTERVAL, 0, 0);
cdefault = new MetricConfig(SAMPLING_INTERVAL, 0);

CONFIG_MAP.put(ThreadCPU.class, cdefault);
CONFIG_MAP.put(ThreadDiskIO.class, cdefault);
Expand All @@ -80,11 +74,8 @@ public MetricConfig(int samplingInterval, int rotationInterval, int deletionInte
CONFIG_MAP.put(NetworkE2ECollector.class, cdefault);
CONFIG_MAP.put(NetworkInterfaceCollector.class, cdefault);
CONFIG_MAP.put(OSGlobals.class, cdefault);
CONFIG_MAP.put(PerformanceAnalyzerMetrics.class, new MetricConfig(0, ROTATION_INTERVAL, 0));
CONFIG_MAP.put(
MetricsPurgeActivity.class,
new MetricConfig(ROTATION_INTERVAL, 0, DELETION_INTERVAL));
CONFIG_MAP.put(StatsCollector.class, new MetricConfig(STATS_ROTATION_INTERVAL, 0, 0));
CONFIG_MAP.put(PerformanceAnalyzerMetrics.class, new MetricConfig(0, ROTATION_INTERVAL));
CONFIG_MAP.put(StatsCollector.class, new MetricConfig(STATS_ROTATION_INTERVAL, 0));
CONFIG_MAP.put(DisksCollector.class, cdefault);
CONFIG_MAP.put(HeapMetricsCollector.class, cdefault);
CONFIG_MAP.put(GCInfoCollector.class, cdefault);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import java.nio.file.Paths;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;

@SuppressWarnings("checkstyle:constantname")
Expand Down Expand Up @@ -139,7 +139,8 @@ public static void addMetricEntry(StringBuilder value, String metricKey, long me

private static void emitMetric(BlockingQueue<Event> q, Event entry) {
if (!q.offer(entry)) {
// TODO: Emit a metric here.
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.METRICS_WRITE_ERROR, entry.key, 1);
LOG.debug("Could not enter metric {}", entry);
}
}
Expand Down Expand Up @@ -199,15 +200,16 @@ public static void removeMetrics(File keyPathFile) {
LOG.debug("Purge Could not delete file {}", keyPathFile);
}
} catch (Exception ex) {
StatsCollector.instance().logException(StatExceptionCode.METRICS_REMOVE_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.METRICS_REMOVE_ERROR, "", 1);
LOG.debug(
(Supplier<?>)
() ->
new ParameterizedMessage(
"Error in deleting file: {} for keyPath:{} with ExceptionCode: {}",
ex.toString(),
keyPathFile.getAbsolutePath(),
StatExceptionCode.METRICS_REMOVE_ERROR.toString()),
WriterMetrics.METRICS_REMOVE_ERROR.toString()),
ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
import org.opensearch.performanceanalyzer.rca.stats.measurements.MeasurementSet;

public enum WriterMetrics implements MeasurementSet {
/** Measures the time spent in deleting the event log files */
EVENT_LOG_FILES_DELETION_TIME(
"EventLogFilesDeletionTime",
"millis",
Arrays.asList(Statistics.MAX, Statistics.MEAN, Statistics.SUM)),
/** Measures the count of event log files deleted */
EVENT_LOG_FILES_DELETED(
"EventLogFilesDeleted", "count", Arrays.asList(Statistics.MAX, Statistics.SUM)),

SHARD_STATE_COLLECTOR_EXECUTION_TIME(
"ShardStateCollectorExecutionTime",
"millis",
Expand Down Expand Up @@ -145,6 +154,15 @@ public enum WriterMetrics implements MeasurementSet {
Statistics.SUM)),

STALE_METRICS("StaleMetrics", "count", Arrays.asList(Statistics.COUNT)),

METRICS_WRITE_ERROR(
"MetricsWriteError",
"namedCount",
Collections.singletonList(Statistics.NAMED_COUNTERS)),

METRICS_REMOVE_ERROR("MetricsRemoveError", "count", Arrays.asList(Statistics.COUNT)),

METRICS_REMOVE_FAILURE("MetricsRemoveFailure", "count", Arrays.asList(Statistics.COUNT)),
;

/** What we want to appear as the metric name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.core.Util;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader.EventDispatcher;

public class EventLogFileHandler {
Expand Down Expand Up @@ -78,7 +82,7 @@ public void writeTmpFile(List<Event> dataEntries, long epoch) {
* data.
*
* <p>If any of the above steps fail, then the tmp file is not deleted from the filesystem. This
* is fine as the MetricsPurgeActivity, will eventually clean it. The copies are atomic and
* is fine as the {@link deleteFiles()}, will eventually clean it. The copies are atomic and
* therefore the reader never reads incompletely written file.
*
* @param dataEntries The metrics to be written to file.
Expand Down Expand Up @@ -166,4 +170,34 @@ private void readInternal(Path pathToFile, int bufferSize, EventDispatcher proce
LOG.error("Error reading file", ex);
}
}

public void deleteAllFiles() {
LOG.debug("Cleaning up any leftover files.");
File root = new File(metricsLocation);
// Filter out '.tmp' files, we do not want to delete currBucket .tmp files
String[] filesToDelete = root.list((dir, name) -> !name.endsWith(TMP_FILE_EXT));
deleteFiles(Arrays.asList(filesToDelete));
}

public void deleteFiles(List<String> filesToDelete) {
LOG.debug("Starting to delete old writer files");
long startTime = System.currentTimeMillis();

if (filesToDelete == null) {
return;
}
int filesDeletedCount = 0;
File root = new File(metricsLocation);
for (String fileToDelete : filesToDelete) {
File file = new File(root, fileToDelete);
PerformanceAnalyzerMetrics.removeMetrics(file);
filesDeletedCount += 1;
}
long duration = System.currentTimeMillis() - startTime;
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, "", duration);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETED, "", filesDeletedCount);
LOG.debug("'{}' Old writer files cleaned up.", filesDeletedCount);
}
}

0 comments on commit 5b41dd0

Please sign in to comment.