Skip to content

Commit

Permalink
Initial POC for segment replication.
Browse files Browse the repository at this point in the history
In this POC replicas are configured as read only by not creating an indexwriter.
After primary shards refresh, a checkpoint is sent over the transport layer to replicas.
Once received, replicas fetch files in the checkpoint from the primary shard.
This initial commit ignores failover, retention leases, and shard allocation.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 9, 2022
1 parent 4822c28 commit 507bdab
Show file tree
Hide file tree
Showing 49 changed files with 3,525 additions and 159 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.TransportPublishCheckpointAction;
import org.opensearch.persistent.CompletionPersistentTaskAction;
import org.opensearch.persistent.RemovePersistentTaskAction;
import org.opensearch.persistent.StartPersistentTaskAction;
Expand Down Expand Up @@ -588,6 +590,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class);
actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(PublishCheckpointAction.INSTANCE, TransportPublishCheckpointAction.class);
actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void shardOperationOnPrimary(
@Override
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
replica.flush(request.getRequest());
// replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class RefreshResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}

RefreshResponse(StreamInput in) throws IOException {
public RefreshResponse(StreamInput in) throws IOException {
super(in);
}

RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
public RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.metadata.IndexMetadata.APIBlock;
import org.opensearch.common.Nullable;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;

/**
* Administrative actions/operations against indices.
Expand Down Expand Up @@ -405,6 +406,13 @@ public interface IndicesAdminClient extends OpenSearchClient {
*/
void refresh(RefreshRequest request, ActionListener<RefreshResponse> listener);

/**
* Publish the latest primary checkpoint to replica shards.
* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest
* @param listener A listener to be notified with a result
*/
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener);

/**
* Explicitly refresh one or more indices (making the content indexed since the last refresh searchable).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1632,6 +1634,11 @@ public void refresh(final RefreshRequest request, final ActionListener<RefreshRe
execute(RefreshAction.INSTANCE, request, listener);
}

@Override
public void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener) {
execute(PublishCheckpointAction.INSTANCE, request, listener);
}

@Override
public RefreshRequestBuilder prepareRefresh(String... indices) {
return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices);
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -466,7 +467,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
TransportCheckpointPublisher checkpointPublisher
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -520,8 +522,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
);
recoveryStateFactory,
checkpointPublisher);
success = true;
return indexService;
} finally {
Expand Down
15 changes: 10 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -165,6 +166,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final TransportCheckpointPublisher checkpointPublisher;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -195,8 +197,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
) {
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
TransportCheckpointPublisher checkpointPublisher) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand All @@ -206,6 +208,7 @@ public IndexService(
this.circuitBreakerService = circuitBreakerService;
this.expressionResolver = expressionResolver;
this.valuesSourceRegistry = valuesSourceRegistry;
this.checkpointPublisher = checkpointPublisher;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(
Expand Down Expand Up @@ -520,8 +523,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
);
circuitBreakerService,
checkpointPublisher);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -911,7 +914,9 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
shard.scheduledRefresh();
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public final class IndexSettings {
*/
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting(
"index.replication.segment_replication",
false,
true,
Property.IndexScope,
Property.Final
);
Expand Down
29 changes: 29 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen) throws IOException {};


/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down Expand Up @@ -1196,6 +1199,10 @@ public abstract void forceMerge(
*/
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

public SegmentInfosRef getLatestSegmentInfosSafe() { return null; };

public SegmentInfos getLatestSegmentInfos() { return null; };

/**
* Snapshots the most recent safe index commit from the engine.
*/
Expand Down Expand Up @@ -1999,6 +2006,28 @@ public IndexCommit getIndexCommit() {
}
}

public static class SegmentInfosRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final SegmentInfos segmentInfos;

public SegmentInfosRef(SegmentInfos segmentInfos, CheckedRunnable<IOException> onClose) {
this.segmentInfos = segmentInfos;
this.onClose = onClose;
}

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}

public SegmentInfos getSegmentInfos() {
return segmentInfos;
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}
Expand Down
14 changes: 10 additions & 4 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -169,8 +170,8 @@ public EngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary) {
this(
shardId,
threadPool,
Expand All @@ -193,7 +194,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
isPrimary, primaryTermSupplier,
tombstoneDocSupplier
);
}
Expand Down Expand Up @@ -223,7 +224,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
boolean isPrimary, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this.shardId = shardId;
Expand All @@ -237,6 +238,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -458,6 +460,10 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public EngineConfig newEngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
Boolean isPrimary
) {

return new EngineConfig(
Expand All @@ -137,6 +138,7 @@ public EngineConfig newEngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down
Loading

0 comments on commit 507bdab

Please sign in to comment.