Skip to content

Commit

Permalink
[Backport 2.x] Adding Remote translog upload, download and failover c…
Browse files Browse the repository at this point in the history
…hanges (#5757)

* Introduce TranslogFactory for Local/Remote Translog support (#4172)

* Introduce TranslogFactory for Local/Remote Translog support

Signed-off-by: Bukhtawar Khan <[email protected]>

* [Remote Translog] Introduce remote translog with upload functionality (#5392)

* Introduce remote translog with upload functionality 

Signed-off-by: Gaurav Bafna <[email protected]>
Co-authored-by: Bukhtawar Khan <[email protected]>

* Enable creation of indices using Remote Translog    (#5638)

* Enable creation of indices using Remote Translog behind a setting and feature flag
Signed-off-by: Gaurav Bafna <[email protected]>

* [Remote Translog] Add support for downloading files from remote translog (#5649)

* Add support to download translog from remote store during recovery

Signed-off-by: Sachin Kale <[email protected]>

* Integrate remote translog download on failover (#5699)

* Integrate remote translog download on failover

Signed-off-by: Ashish Singh <[email protected]>

Signed-off-by: Bukhtawar Khan <[email protected]>
Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: Sachin Kale <[email protected]>
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
gbbafna authored and kotwanikunal committed Jan 25, 2023
1 parent ca6093e commit f9dcc85
Show file tree
Hide file tree
Showing 57 changed files with 3,020 additions and 345 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# CHANGELOG

All notable changes to this project are documented in this file.
Expand All @@ -20,7 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))
- Added Request level Durability using Remote Translog functionality ([#5757](https://github.com/opensearch-project/OpenSearch/pull/5757))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand All @@ -42,7 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))
- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -675,6 +676,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ public Iterator<Setting<?>> settings() {
public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository";

public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled";

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -406,6 +409,43 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<String> INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
new Setting.Validator<>() {

@Override
public void validate(final String value) {}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(
"Setting " + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID"
);
} else {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Settings "
+ INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey()
+ " can only be set/enabled when "
+ INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey()
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void close() {
// nothing to do here...
}

private synchronized Path buildAndCreate(BlobPath path) throws IOException {
protected synchronized Path buildAndCreate(BlobPath path) throws IOException {
Path f = buildPath(path);
if (readOnly == false) {
Files.createDirectories(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,19 @@ public void skipBytes(long count) {
pos += count;
}

// NOTE: AIOOBE not EOF if you read too much
@Override
public byte readByte() {
public byte readByte() throws EOFException {
if (eof()) {
throw new EOFException();
}
return bytes[pos++];
}

// NOTE: AIOOBE not EOF if you read too much
@Override
public void readBytes(byte[] b, int offset, int len) {
public void readBytes(byte[] b, int offset, int len) throws EOFException {
if (available() < len) {
throw new EOFException();
}
System.arraycopy(bytes, pos, b, offset, len);
pos += len;
}
Expand All @@ -111,6 +115,9 @@ protected void ensureCanReadBytes(int length) throws EOFException {

@Override
public int read() throws IOException {
if (eof()) {
throw new EOFException();
}
return bytes[pos++] & 0xFF;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FeatureFlags.REMOTE_STORE,
List.of(
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.Version;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.TriFunction;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -500,7 +502,8 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -555,7 +558,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
recoveryStateFactory,
translogFactorySupplier
);
success = true;
return indexService;
Expand Down
8 changes: 7 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand All @@ -113,6 +114,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -172,6 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -203,7 +206,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -275,6 +279,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.translogFactorySupplier = translogFactorySupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -547,6 +552,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,9 @@ public final class IndexSettings {
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final String remoteStoreRepository;
private final boolean isRemoteTranslogStoreEnabled;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private Version extendedCompatibilitySnapshotVersion;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -750,8 +751,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
Expand Down Expand Up @@ -1040,6 +1042,10 @@ public Version getExtendedCompatibilitySnapshotVersion() {
return extendedCompatibilitySnapshotVersion;
}

public String getRemoteStoreTranslogRepository() {
return remoteStoreTranslogRepository;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
19 changes: 17 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -150,6 +152,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

private final TranslogFactory translogFactory;

public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand Down Expand Up @@ -253,7 +257,8 @@ public EngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
false,
new InternalTranslogFactory()
);
}

Expand Down Expand Up @@ -284,7 +289,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
Expand Down Expand Up @@ -328,6 +334,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
this.translogFactory = translogFactory;
}

/**
Expand Down Expand Up @@ -532,6 +539,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying translog factory
* @return the translog factory
*/
public TranslogFactory getTranslogFactory() {
return translogFactory;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
isReadOnlyReplica
isReadOnlyReplica,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void onFailure(String reason, Exception ex) {
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
this::ensureOpen,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void onAfterTranslogSync() {
}
}
},
this
this,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Loading

0 comments on commit f9dcc85

Please sign in to comment.