Skip to content

Commit

Permalink
Implement write and read flow for shard diff file.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shailendra Singh committed Jul 9, 2024
1 parent 51af2e2 commit c0265a2
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand All @@ -37,6 +38,7 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTableDiff;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand Down Expand Up @@ -92,9 +94,15 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
);

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";

public static final String INDEX_ROUTING_DIFF_PATH_TOKEN = "index-routing-diff";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";

public static final String INDEX_ROUTING_DIFF_FILE_PREFIX = "index_routing_diff";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

public static final String INDEX_ROUTING_DIFF_METADATA_PREFIX = "indexRoutingDiff--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -149,6 +157,18 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
);
}

public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapIncrementalDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER
);
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
Expand Down Expand Up @@ -190,6 +210,39 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

public CheckedRunnable<IOException> getIndexRoutingDiffAsyncAction(
ClusterState clusterState,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) {

BlobPath indexRoutingDiffPath = clusterBasePath.add(INDEX_ROUTING_DIFF_PATH_TOKEN);
BlobPath path = pathType.path(
RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingDiffPath).indexUUID(clusterState.getMetadata().clusterUUID()).build(),
pathHashAlgo
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingDiffFileName(clusterState.term(), clusterState.version());

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(
clusterState.getClusterName().toString(),
clusterState.getMetadata().clusterUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_DIFF_METADATA_PREFIX
)
),
ex -> latchedActionListener.onFailure(
new RemoteStateTransferException("Exception in writing index routing diff to remote store", ex)
)
);

return () -> uploadIndexRoutingDiff(indexRoutingTableDiff, fileName, blobContainer, completionListener);
}

/**
* Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices.
* @param previousManifest previous manifest, used to get all existing indices routing paths
Expand Down Expand Up @@ -273,6 +326,63 @@ private void uploadIndex(
}
}

private void uploadIndexRoutingDiff(
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
String fileName,
BlobContainer blobContainer,
ActionListener<Void> completionListener
) {
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
RemoteIndexRoutingTableDiff remoteIndexRoutingTableDiff = new RemoteIndexRoutingTableDiff(indexRoutingTableDiff);
remoteIndexRoutingTableDiff.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
logger.error(() -> "Failed to serialize IndexRoutingTableDiff: " + e.getMessage(), e);
completionListener.onFailure(e);
return;
}

if (!(blobContainer instanceof AsyncMultiStreamBlobContainer)) {
try {
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
completionListener.onResponse(null);
} catch (IOException e) {
logger.error(() -> "Failed to write IndexRoutingTableDiff to remote store: " + e.getMessage(), e);
completionListener.onFailure(e);
}
return;
}

try (IndexInput input = new ByteArrayIndexInput("indexroutingdiff", BytesReference.toBytes(bytesInput))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
} catch (IOException e) {
logger.error(() -> "Failed to write IndexRoutingTableDiff to remote store: " + e.getMessage(), e);
completionListener.onFailure(e);
}
} catch (IOException e) {
logger.error(() -> "Failed to create transfer object for IndexRoutingTableDiff for remote store upload: " + e.getMessage(), e);
completionListener.onFailure(e);
}
}



@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String uploadedFilename,
Expand Down Expand Up @@ -321,6 +431,48 @@ private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, I
}
}

//@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx + 1);
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readDiffAsync(
blobContainer,
blobFileName,
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.getDiffs()),
latchedActionListener::onFailure
)
);
}

private void readDiffAsync(
BlobContainer blobContainer,
String name,
ActionListener<RemoteIndexRoutingTableDiff> listener
) {
try {
listener.onResponse(readDiff(blobContainer, name));
} catch (Exception e) {
listener.onFailure(e);
}
}

private RemoteIndexRoutingTableDiff readDiff(BlobContainer blobContainer, String path) {
try {
logger.info("Reading from file {}", blobContainer.path().buildAsString() + path);
return new RemoteIndexRoutingTableDiff(blobContainer.readBlob(path));
} catch (IOException | AssertionError e) {
logger.error(() -> new ParameterizedMessage("RoutingTableDiff read failed for path {}", blobContainer.path().buildAsString() + path), e);
throw new RemoteStateTransferException("Failed to read RemoteRoutingTableDiff from Manifest with error ", e);
}
}

@Override
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
Expand All @@ -345,6 +497,16 @@ private String getIndexRoutingFileName(long term, long version) {
);
}

private String getIndexRoutingDiffFileName(long term, long version) {
return String.join(
DELIMITER,
INDEX_ROUTING_DIFF_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -41,6 +42,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER);
}

@Override
public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapIncrementalDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER);
}

@Override
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
Expand All @@ -52,6 +61,16 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
return () -> {};
}

public CheckedRunnable<IOException> getIndexRoutingDiffAsyncAction(
ClusterState clusterState,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
){
// noop
return () -> {};
}

@Override
public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(
ClusterMetadataManifest previousManifest,
Expand All @@ -72,6 +91,14 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
return () -> {};
}

public CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
) {
// noop
return () -> {};
}

@Override
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -44,6 +45,24 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
}
};

public static final DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER =
new DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}

@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public Diff<IndexRoutingTable> readDiff(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readDiffFrom(in);
}
};

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);

CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
Expand All @@ -52,6 +71,11 @@ CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
LatchedActionListener<IndexRoutingTable> latchedActionListener
);

CheckedRunnable<IOException> getAsyncIndexRoutingTableDiffReadAction(
String uploadedFilename,
LatchedActionListener<Map<String, Diff<IndexRoutingTable>>> latchedActionListener
);

List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
List<String> updatedIndicesRouting,
List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting
Expand All @@ -62,13 +86,25 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
RoutingTable after
);

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapIncrementalDiff(
RoutingTable before,
RoutingTable after
);

CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
);

CheckedRunnable<IOException> getIndexRoutingDiffAsyncAction(
ClusterState clusterState,
Map<String, Diff<IndexRoutingTable>> indexRoutingTableDiff,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
);

List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(
ClusterMetadataManifest previousManifest,
List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingUploaded,
Expand Down
Loading

0 comments on commit c0265a2

Please sign in to comment.