Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for segment replication. #2075

Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.TransportPublishCheckpointAction;
import org.opensearch.persistent.CompletionPersistentTaskAction;
import org.opensearch.persistent.RemovePersistentTaskAction;
import org.opensearch.persistent.StartPersistentTaskAction;
Expand Down Expand Up @@ -588,6 +590,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class);
actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(PublishCheckpointAction.INSTANCE, TransportPublishCheckpointAction.class);
actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class RefreshResponse extends BroadcastResponse {
declareBroadcastFields(PARSER);
}

RefreshResponse(StreamInput in) throws IOException {
public RefreshResponse(StreamInput in) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you only use the other constructor. Does this one need to be public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to inside of PublishCheckpointAction to wire up the superclass with a response constructor.

private PublishCheckpointAction() {super(NAME, RefreshResponse::new);}

With that said - this was me being quick and dirty again for the poc. We shouldn't be reusing RefreshResponse here and instead create a new response type. Will make a separate task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super(in);
}

RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
public RefreshResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.metadata.IndexMetadata.APIBlock;
import org.opensearch.common.Nullable;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;

/**
* Administrative actions/operations against indices.
Expand Down Expand Up @@ -405,6 +406,13 @@ public interface IndicesAdminClient extends OpenSearchClient {
*/
void refresh(RefreshRequest request, ActionListener<RefreshResponse> listener);

/**
* Publish the latest primary checkpoint to replica shards.
* @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest
* @param listener A listener to be notified with a result
*/
void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you're re-using RefreshResponse as the response for this request type? Would there be value in declaring a separate CheckpointResponse (or similar) type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, I was cutting corners. Same as comment above making a separate issue to clean this up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Explicitly refresh one or more indices (making the content indexed since the last refresh searchable).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1632,6 +1634,11 @@ public void refresh(final RefreshRequest request, final ActionListener<RefreshRe
execute(RefreshAction.INSTANCE, request, listener);
}

@Override
public void publishCheckpoint(PublishCheckpointRequest request, ActionListener<RefreshResponse> listener) {
execute(PublishCheckpointAction.INSTANCE, request, listener);
}

@Override
public RefreshRequestBuilder prepareRefresh(String... indices) {
return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices);
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -466,7 +467,8 @@ public IndexService newIndexService(
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry
ValuesSourceRegistry valuesSourceRegistry,
TransportCheckpointPublisher checkpointPublisher
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -520,8 +522,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
);
recoveryStateFactory,
checkpointPublisher);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooof, we should really think about refactoring this constructor, and the newIndexService method 🤢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - I think we should revisit this after we have separate implementations of IndexShard and address how to load things as a module.

success = true;
return indexService;
} finally {
Expand Down
15 changes: 10 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -165,6 +166,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final TransportCheckpointPublisher checkpointPublisher;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -195,8 +197,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
) {
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
TransportCheckpointPublisher checkpointPublisher) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand All @@ -206,6 +208,7 @@ public IndexService(
this.circuitBreakerService = circuitBreakerService;
this.expressionResolver = expressionResolver;
this.valuesSourceRegistry = valuesSourceRegistry;
this.checkpointPublisher = checkpointPublisher;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(
Expand Down Expand Up @@ -520,8 +523,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
);
circuitBreakerService,
checkpointPublisher);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -911,7 +914,9 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
shard.scheduledRefresh();
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
Comment on lines +917 to +919
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retain code as-is and move the gating into IndexShard, either based on isSegmentReplicationEnabled or by using a separate IndexShard subclass that implements/overrides scheduleRefresh as a no-op

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack - #2197

} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an interface - does the presence of seqNo make sense for this APi, or is this a POC signature driven by the need to call markSeqNoAsProcessed in the implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter, but I don't know of another way than passing this down or explicitly calling a method on engine to set the processed cp here. Local checkpoints are only stored in user data on commit -

Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));


public long getProcessedLocalCheckpoint() { return 0L; };

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down Expand Up @@ -1196,6 +1200,20 @@ public abstract void forceMerge(
*/
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory
* until closed.
* @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted.
*/
public SegmentInfosRef getLatestSegmentInfosSafe() { return null; };

/**
* Fetch a snapshot of the latest SegmentInfos from the engine.
* This method does not ensure that segment files are retained in the directory.
* @return {@link SegmentInfos}
*/
public SegmentInfos getLatestSegmentInfos() { return null; };

/**
* Snapshots the most recent safe index commit from the engine.
*/
Expand Down Expand Up @@ -1999,6 +2017,28 @@ public IndexCommit getIndexCommit() {
}
}

public static class SegmentInfosRef implements Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move to top-level class.

As a broader note, we should see if there's an opportunity to generalize this Decorator design pattern instead of having *Ref classes everywhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack - #2202

private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
private final SegmentInfos segmentInfos;

public SegmentInfosRef(SegmentInfos segmentInfos, CheckedRunnable<IOException> onClose) {
this.segmentInfos = segmentInfos;
this.onClose = onClose;
}

@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
onClose.run();
}
}

public SegmentInfos getSegmentInfos() {
return segmentInfos;
}
}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: move this attribute to IndexShard

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexShard actually populates this when wiring up the engine so we can conditionally make InternalEngine readOnly for replicas. I think a better route here is wiring up a different engine class for replicas.


/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -169,8 +170,8 @@ public EngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary) {
this(
shardId,
threadPool,
Expand All @@ -193,7 +194,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
isPrimary, primaryTermSupplier,
tombstoneDocSupplier
);
}
Expand Down Expand Up @@ -223,7 +224,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
boolean isPrimary, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this.shardId = shardId;
Expand All @@ -237,6 +238,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -458,6 +460,10 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
}
Comment on lines +463 to +465
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(See my other comment) The notion should of being a "primary" should be encapsulated in the IndexShard, not the Engine.


/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public EngineConfig newEngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
Boolean isPrimary
) {

return new EngineConfig(
Expand All @@ -137,6 +138,7 @@ public EngineConfig newEngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down
Loading