Skip to content

Commit

Permalink
Make sure IndexOutput of metadata file is always closed (#8653)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Jul 12, 2023
1 parent 8c18f51 commit 25a6a82
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,30 +475,32 @@ public void uploadMetadata(
RemoteSegmentMetadata.CURRENT_VERSION
);
try {
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) {
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
}
}
}

ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos"));
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
)
);
indexOutput.close();
ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(
new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos")
);
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
)
);
}
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.junit.After;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -48,7 +47,6 @@
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8638")
public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
private ClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,28 @@ public void testUploadMetadataNonEmpty() throws IOException {
}
}

public void testUploadMetadataMissingSegment() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();

Directory storeDirectory = mock(Directory.class);
IndexOutput indexOutput = mock(IndexOutput.class);

String generation = RemoteStoreUtils.invertLong(segmentInfos.getGeneration());
String primaryTerm = RemoteStoreUtils.invertLong(12);
when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT))).thenReturn(
indexOutput
);

Collection<String> segmentFiles = List.of("_123.si");
assertThrows(
NoSuchFileException.class,
() -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L)
);
verify(indexOutput).close();
verify(storeDirectory).deleteFile(startsWith("metadata__" + primaryTerm + "__" + generation));
}

public void testNoMetadataHeaderCorruptIndexException() throws IOException {
List<String> metadataFiles = List.of(metadataFilename);
when(
Expand Down

0 comments on commit 25a6a82

Please sign in to comment.