Skip to content

Commit

Permalink
Add node id to segment and translog metadata (opensearch-project#10229)…
Browse files Browse the repository at this point in the history
… (opensearch-project#10353)

---------


(cherry picked from commit 7159e2e)

Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 689adc1 commit 9c678ab
Show file tree
Hide file tree
Showing 25 changed files with 445 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,15 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
final IndexShard newShard = newIndexShard(
indexService,
shard,
wrapper,
getInstanceFromNode(CircuitBreakerService.class),
env.nodeId(),
listener
);
shardRef.set(newShard);
recoverShard(newShard);

Expand All @@ -651,6 +659,7 @@ public static final IndexShard newIndexShard(
final IndexShard shard,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper,
final CircuitBreakerService cbs,
final String nodeId,
final IndexingOperationListener... listeners
) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
Expand Down Expand Up @@ -679,7 +688,8 @@ public static final IndexShard newIndexShard(
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId
);
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ public synchronized IndexShard createShard(
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId()
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

package org.opensearch.index.remote;

import org.opensearch.common.collect.Tuple;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Utils for remote store
Expand Down Expand Up @@ -69,4 +75,31 @@ public static String getSegmentName(String filename) {

return filename.substring(0, endIdx);
}

/**
*
* @param mdFiles List of segment/translog metadata files
* @param fn Function to extract PrimaryTerm_Generation and Node Id from metadata file name .
* fn returns null if node id is not part of the file name
*/
static public void verifyNoMultipleWriters(List<String> mdFiles, Function<String, Tuple<String, String>> fn) {
Map<String, String> nodesByPrimaryTermAndGen = new HashMap<>();
mdFiles.forEach(mdFile -> {
Tuple<String, String> nodeIdByPrimaryTermAndGen = fn.apply(mdFile);
if (nodeIdByPrimaryTermAndGen != null) {
if (nodesByPrimaryTermAndGen.containsKey(nodeIdByPrimaryTermAndGen.v1())
&& (!nodesByPrimaryTermAndGen.get(nodeIdByPrimaryTermAndGen.v1()).equals(nodeIdByPrimaryTermAndGen.v2()))) {
throw new IllegalStateException(
"Multiple metadata files from different nodes"
+ nodeIdByPrimaryTermAndGen.v1()
+ " and "
+ nodeIdByPrimaryTermAndGen.v2()
+ "having same primary term and generations detected"
);
}
nodesByPrimaryTermAndGen.put(nodeIdByPrimaryTermAndGen.v1(), nodeIdByPrimaryTermAndGen.v2());
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ public IndexShard(
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
final String nodeId
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -415,7 +416,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down Expand Up @@ -558,6 +559,10 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() {
return remoteStoreStatsTrackerFactory;
}

public String getNodeId() {
return translogConfig.getNodeId();
}

@Override
public void updateShardState(
final ShardRouting newRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
segmentInfosSnapshot,
storeDirectory,
translogFileGeneration,
replicationCheckpoint
replicationCheckpoint,
indexShard.getNodeId()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
Expand Down Expand Up @@ -114,6 +115,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final AtomicLong metadataUploadCounter = new AtomicLong(0);

public static final int METADATA_FILES_TO_FETCH = 10;

public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
Expand Down Expand Up @@ -187,9 +190,11 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {

List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
1
METADATA_FILES_TO_FETCH
);

RemoteStoreUtils.verifyNoMultipleWriters(metadataFiles, MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen);

if (metadataFiles.isEmpty() == false) {
String latestMetadataFile = metadataFiles.get(0);
logger.trace("Reading latest Metadata file {}", latestMetadataFile);
Expand Down Expand Up @@ -306,12 +311,13 @@ static String getMetadataFilePrefixForCommit(long primaryTerm, long generation)
}

// Visible for testing
static String getMetadataFilename(
public static String getMetadataFilename(
long primaryTerm,
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion
int metadataVersion,
String nodeId
) {
return String.join(
SEPARATOR,
Expand All @@ -320,6 +326,7 @@ static String getMetadataFilename(
RemoteStoreUtils.invertLong(generation),
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
nodeId,
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(metadataVersion)
);
Expand All @@ -334,6 +341,19 @@ static long getPrimaryTerm(String[] filenameTokens) {
static long getGeneration(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[2]);
}

public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {
String[] tokens = filename.split(SEPARATOR);
if (tokens.length < 8) {
// For versions < 2.11, we don't have node id.
return null;
}
String primaryTermAndGen = String.join(SEPARATOR, tokens[1], tokens[2], tokens[3]);

String nodeId = tokens[5];
return new Tuple<>(primaryTermAndGen, nodeId);
}

}

/**
Expand Down Expand Up @@ -593,22 +613,25 @@ public boolean containsFile(String localFilename, String checksum) {
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
* @param translogGeneration translog generation
* @param replicationCheckpoint ReplicationCheckpoint of primary shard
* @param nodeId node id
* @throws IOException in case of I/O error while uploading the metadata file
*/
public void uploadMetadata(
Collection<String> segmentFiles,
SegmentInfos segmentInfosSnapshot,
Directory storeDirectory,
long translogGeneration,
ReplicationCheckpoint replicationCheckpoint
ReplicationCheckpoint replicationCheckpoint,
String nodeId
) throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
replicationCheckpoint.getPrimaryTerm(),
segmentInfosSnapshot.getGeneration(),
translogGeneration,
metadataUploadCounter.incrementAndGet(),
RemoteSegmentMetadata.CURRENT_VERSION
RemoteSegmentMetadata.CURRENT_VERSION,
nodeId
);
try {
try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
generation,
location,
readers,
Translog::getCommitCheckpointFileName
Translog::getCommitCheckpointFileName,
config.getNodeId()
).build()
) {
return translogTransferManager.transferSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class TranslogConfig {
private final ShardId shardId;
private final Path translogPath;
private final ByteSizeValue bufferSize;
private final String nodeId;

/**
* Creates a new TranslogConfig instance
Expand All @@ -64,16 +65,24 @@ public final class TranslogConfig {
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE);
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
}

TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) {
TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
}

/**
Expand Down Expand Up @@ -110,4 +119,8 @@ public Path getTranslogPath() {
public ByteSizeValue getBufferSize() {
return bufferSize;
}

public String getNodeId() {
return nodeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
shardPath.getShardId(),
translogPath,
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
""
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo
private final long primaryTerm;
private long minTranslogGeneration;

TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) {
private String nodeId;

TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size, String nodeId) {
translogCheckpointFileInfoTupleSet = new HashSet<>(size);
this.size = size;
this.generation = generation;
this.primaryTerm = primaryTerm;
this.nodeId = nodeId;
}

private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) {
Expand All @@ -63,7 +66,13 @@ public Set<TransferFileSnapshot> getTranslogFileSnapshots() {

@Override
public TranslogTransferMetadata getTranslogTransferMetadata() {
return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, translogCheckpointFileInfoTupleSet.size() * 2);
return new TranslogTransferMetadata(
primaryTerm,
generation,
minTranslogGeneration,
translogCheckpointFileInfoTupleSet.size() * 2,
nodeId
);
}

@Override
Expand Down Expand Up @@ -110,19 +119,22 @@ public static class Builder {
private final List<TranslogReader> readers;
private final Function<Long, String> checkpointGenFileNameMapper;
private final Path location;
private final String nodeId;

public Builder(
long primaryTerm,
long generation,
Path location,
List<TranslogReader> readers,
Function<Long, String> checkpointGenFileNameMapper
Function<Long, String> checkpointGenFileNameMapper,
String nodeId
) {
this.primaryTerm = primaryTerm;
this.generation = generation;
this.readers = readers;
this.checkpointGenFileNameMapper = checkpointGenFileNameMapper;
this.location = location;
this.nodeId = nodeId;
}

public TranslogCheckpointTransferSnapshot build() throws IOException {
Expand All @@ -134,7 +146,8 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
TranslogCheckpointTransferSnapshot translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(
primaryTerm,
generation,
readers.size()
readers.size(),
nodeId
);
for (TranslogReader reader : readers) {
final long readerGeneration = reader.getGeneration();
Expand Down
Loading

0 comments on commit 9c678ab

Please sign in to comment.