Skip to content

Commit

Permalink
Adding stats for unreferenced files cleanup count
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <[email protected]>
  • Loading branch information
Rishav Sagar committed Sep 25, 2023
1 parent 1dde018 commit 67212b2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Adding stats for unreferenced cleanup count ([#10157](https://github.com/opensearch-project/OpenSearch/pull/10157))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.UnreferencedFileCleanUpStats;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.DocsStats;
Expand Down Expand Up @@ -145,6 +146,7 @@ public abstract class Engine implements LifecycleAware, Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CounterMetric totalUnreferencedFileCleanupCount = new CounterMetric();
private final CountDownLatch closedLatch = new CountDownLatch(1);
protected final EventListener eventListener;
protected final ReentrantLock failEngineLock = new ReentrantLock();
Expand Down Expand Up @@ -267,6 +269,15 @@ protected final DocsStats docsStats(IndexReader indexReader) {
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
* Returns the {@link org.opensearch.index.merge.UnreferencedFileCleanUpStats} for this engine
*/
public UnreferencedFileCleanUpStats unreferencedFileCleanupStats() {
final UnreferencedFileCleanUpStats cleanUpStats = new UnreferencedFileCleanUpStats();
cleanUpStats.add(totalUnreferencedFileCleanupCount.count());
return cleanUpStats;
}

/**
* Performs the pre-closing checks on the {@link Engine}.
*
Expand Down Expand Up @@ -1340,7 +1351,9 @@ private void cleanUpUnreferencedFiles() {
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files.
// do nothing except increasing metric count and close this will kick off IndexFileDeleter which will
// remove all unreferenced files.
totalUnreferencedFileCleanupCount.inc();
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file ", ex);
}
Expand Down
24 changes: 23 additions & 1 deletion server/src/main/java/org/opensearch/index/merge/MergeStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.merge;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -65,8 +66,10 @@ public class MergeStats implements Writeable, ToXContentFragment {

private long totalBytesPerSecAutoThrottle;

public MergeStats() {
private final UnreferencedFileCleanUpStats cleanUpStats;

public MergeStats() {
cleanUpStats = new UnreferencedFileCleanUpStats();
}

public MergeStats(StreamInput in) throws IOException {
Expand All @@ -81,6 +84,12 @@ public MergeStats(StreamInput in) throws IOException {
totalStoppedTimeInMillis = in.readVLong();
totalThrottledTimeInMillis = in.readVLong();
totalBytesPerSecAutoThrottle = in.readVLong();

if (in.getVersion().onOrAfter(Version.CURRENT)) {
cleanUpStats = in.readOptionalWriteable(UnreferencedFileCleanUpStats::new);
} else {
cleanUpStats = new UnreferencedFileCleanUpStats();
}
}

public void add(
Expand Down Expand Up @@ -133,6 +142,7 @@ public void addTotals(MergeStats mergeStats) {
this.totalSizeInBytes += mergeStats.totalSizeInBytes;
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
addTotalCleanUpStats(mergeStats.cleanUpStats);
if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || mergeStats.totalBytesPerSecAutoThrottle == Long.MAX_VALUE) {
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
} else {
Expand Down Expand Up @@ -224,6 +234,14 @@ public ByteSizeValue getCurrentSize() {
return new ByteSizeValue(currentSizeInBytes);
}

public void addTotalCleanUpStats(UnreferencedFileCleanUpStats cleanUpStats) {
this.cleanUpStats.add(cleanUpStats);
}

public UnreferencedFileCleanUpStats getTotalCleanUpStats() {
return this.cleanUpStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.MERGES);
Expand All @@ -240,6 +258,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.TOTAL_THROTTLE_BYTES_PER_SEC).value(new ByteSizeValue(totalBytesPerSecAutoThrottle).toString());
}
builder.field(Fields.TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES, totalBytesPerSecAutoThrottle);
cleanUpStats.toXContent(builder, params);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -282,5 +301,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalStoppedTimeInMillis);
out.writeVLong(totalThrottledTimeInMillis);
out.writeVLong(totalBytesPerSecAutoThrottle);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(cleanUpStats);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.merge;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Stores stats about unreferenced file cleanup due to segment merge failure.
*
* @opensearch.internal
*/
public class UnreferencedFileCleanUpStats implements Writeable, ToXContentFragment {

private long totalUnreferencedFileCleanupCount;

public UnreferencedFileCleanUpStats() {

}

public UnreferencedFileCleanUpStats(StreamInput in) throws IOException {
totalUnreferencedFileCleanupCount = in.readVLong();
}

public void add(long totalUnreferencedFileCleanupCount) {
this.totalUnreferencedFileCleanupCount += totalUnreferencedFileCleanupCount;
}

public long getTotalUnreferencedFileCleanupCount() {
return totalUnreferencedFileCleanupCount;
}

public void add(UnreferencedFileCleanUpStats cleanUpStats) {
if (cleanUpStats == null) {
return;
}

this.totalUnreferencedFileCleanupCount += cleanUpStats.totalUnreferencedFileCleanupCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.UNREFERENCED_FILE_CLEANUP);
builder.field(Fields.COUNT, totalUnreferencedFileCleanupCount);
builder.endObject();
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalUnreferencedFileCleanupCount);
}

/**
* Fields for unreferenced file cleanup statistics
*
* @opensearch.internal
*/
static final class Fields {
static final String UNREFERENCED_FILE_CLEANUP = "unreferenced_file_cleanup";
static final String COUNT = "count";
}

}
18 changes: 17 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.UnreferencedFileCleanUpStats;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteSegmentStats;
Expand Down Expand Up @@ -1329,6 +1330,15 @@ public DocsStats docStats() {
return getEngine().docStats();
}

public UnreferencedFileCleanUpStats unreferencedFileCleanupStats() {
final Engine engine = getEngineOrNull();
if (engine == null) {
return new UnreferencedFileCleanUpStats();
}

return engine.unreferencedFileCleanupStats();
}

/**
* @return {@link CommitStats}
* @throws AlreadyClosedException if shard is closed
Expand Down Expand Up @@ -1383,7 +1393,13 @@ public MergeStats mergeStats() {
if (engine == null) {
return new MergeStats();
}
return engine.getMergeStats();

final MergeStats mergeStats = engine.getMergeStats();
if (indexSettings.shouldCleanupUnreferencedFiles()) {
mergeStats.addTotalCleanUpStats(engine.unreferencedFileCleanupStats());
}

return mergeStats;
}

public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.merge.UnreferencedFileCleanUpStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.RemoteTranslogStats;
Expand Down Expand Up @@ -480,6 +481,17 @@ public void testSerialization() throws IOException {
assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind());
assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag());
}

if (nodeIndicesStats == null) {
assertNull(deserializedNodeIndicesStats);
} else {
UnreferencedFileCleanUpStats cleanUpStats = nodeIndicesStats.getMerge().getTotalCleanUpStats();
UnreferencedFileCleanUpStats deserializedCleanUpStats = deserializedNodeIndicesStats.getMerge().getTotalCleanUpStats();
assertEquals(
cleanUpStats.getTotalUnreferencedFileCleanupCount(),
deserializedCleanUpStats.getTotalUnreferencedFileCleanupCount()
);
}
}
}
}
Expand Down

0 comments on commit 67212b2

Please sign in to comment.