Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Profiling] Support index migrations #97773

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/97773.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 97773
summary: "[Profiling] Support index migrations"
area: Application
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -21,12 +27,21 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public abstract class AbstractProfilingPersistenceManager<T extends AbstractProfilingPersistenceManager.ProfilingIndexAbstraction>
implements
Expand All @@ -35,11 +50,14 @@ public abstract class AbstractProfilingPersistenceManager<T extends AbstractProf
protected final Logger logger = LogManager.getLogger(getClass());

private final AtomicBoolean inProgress = new AtomicBoolean(false);

private final ClusterService clusterService;
protected final ThreadPool threadPool;
protected final Client client;
private volatile boolean templatesEnabled;

public AbstractProfilingPersistenceManager(ClusterService clusterService) {
public AbstractProfilingPersistenceManager(ThreadPool threadPool, Client client, ClusterService clusterService) {
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
}

Expand Down Expand Up @@ -90,9 +108,9 @@ public final void clusterChanged(ClusterChangedEvent event) {
try (var refs = new RefCountingRunnable(() -> inProgress.set(false))) {
ClusterState clusterState = event.state();
for (T index : getManagedIndices()) {
Status status = getStatus(clusterState, index);
if (status.actionable) {
onStatus(clusterState, status, index, ActionListener.releasing(refs.acquire()));
IndexState<T> state = getIndexState(clusterState, index);
if (state.getStatus().actionable) {
onIndexState(clusterState, state, ActionListener.releasing(refs.acquire()));
}
}
}
Expand Down Expand Up @@ -120,29 +138,32 @@ protected boolean isAllResourcesCreated(ClusterChangedEvent event) {
* Handler that takes appropriate action for a certain index status.
*
* @param clusterState The current cluster state. Never <code>null</code>.
* @param status Status of the current index.
* @param index The current index.
* @param indexState The state of the current index.
* @param listener Listener to be called on completion / errors.
*/
protected abstract void onStatus(ClusterState clusterState, Status status, T index, ActionListener<? super ActionResponse> listener);
protected abstract void onIndexState(
ClusterState clusterState,
IndexState<T> indexState,
ActionListener<? super ActionResponse> listener
);

private Status getStatus(ClusterState state, T index) {
private IndexState<T> getIndexState(ClusterState state, T index) {
IndexMetadata metadata = indexMetadata(state, index);
if (metadata == null) {
return Status.NEEDS_CREATION;
return new IndexState<>(index, null, Status.NEEDS_CREATION);
}
if (metadata.getState() == IndexMetadata.State.CLOSE) {
logger.warn(
"Index [{}] is closed. This is likely to prevent Universal Profiling from functioning correctly",
metadata.getIndex()
);
return Status.CLOSED;
return new IndexState<>(index, metadata.getIndex(), Status.CLOSED);
}
final IndexRoutingTable routingTable = state.getRoutingTable().index(metadata.getIndex());
ClusterHealthStatus indexHealth = new ClusterIndexHealth(metadata, routingTable).getStatus();
if (indexHealth == ClusterHealthStatus.RED) {
logger.debug("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex());
return Status.UNHEALTHY;
logger.trace("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex());
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
MappingMetadata mapping = metadata.mapping();
if (mapping != null) {
Expand All @@ -159,23 +180,108 @@ private Status getStatus(ClusterState state, T index) {
currentIndexVersion = getVersionField(metadata.getIndex(), meta, "index-version");
currentTemplateVersion = getVersionField(metadata.getIndex(), meta, "index-template-version");
if (currentIndexVersion == -1 || currentTemplateVersion == -1) {
return Status.UNHEALTHY;
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
}
if (index.getVersion() > currentIndexVersion) {
return Status.NEEDS_VERSION_BUMP;
} else if (ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION > currentTemplateVersion) {
// TODO 8.10+: Check if there are any pending migrations. If none are pending we can consider the index up to date.
return Status.NEEDS_MAPPINGS_UPDATE;
return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_VERSION_BUMP);
} else if (getIndexTemplateVersion() > currentTemplateVersion) {
// if there are no migrations we can consider the index up-to-date even if the index template version does not match.
List<Migration> pendingMigrations = index.getMigrations(currentTemplateVersion);
if (pendingMigrations.isEmpty()) {
logger.trace(
"Index [{}] with index template version [{}] (current is [{}]) is up-to-date (no pending migrations).",
metadata.getIndex(),
currentTemplateVersion,
getIndexTemplateVersion()
);
return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE);
}
logger.trace(
"Index [{}] with index template version [{}] (current is [{}]) has [{}] pending migrations.",
metadata.getIndex(),
currentTemplateVersion,
getIndexTemplateVersion(),
pendingMigrations.size()
);
return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_MAPPINGS_UPDATE, pendingMigrations);
} else {
return Status.UP_TO_DATE;
return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE);
}
} else {
logger.warn("No mapping found for existing index [{}]. Index cannot be migrated.", metadata.getIndex());
return Status.UNHEALTHY;
return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY);
}
}

// overridable for testing
protected int getIndexTemplateVersion() {
return ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION;
}

protected final void applyMigrations(IndexState<T> indexState, ActionListener<? super ActionResponse> listener) {
String writeIndex = indexState.getWriteIndex().getName();
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
for (Migration migration : indexState.getPendingMigrations()) {
logger.debug("Applying migration [{}] for [{}].", migration, writeIndex);
migration.apply(
writeIndex,
(r -> updateMapping(r, ActionListener.releasing(refs.acquire()))),
(r -> updateSettings(r, ActionListener.releasing(refs.acquire())))
);
}
}
}

protected final void updateMapping(PutMappingRequest request, ActionListener<AcknowledgedResponse> listener) {
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsync("put mapping", request, listener, (req, l) -> client.admin().indices().putMapping(req, l));
}

protected final void updateSettings(UpdateSettingsRequest request, ActionListener<AcknowledgedResponse> listener) {
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsync("update settings", request, listener, (req, l) -> client.admin().indices().updateSettings(req, l));
}

protected final <Request extends ActionRequest & IndicesRequest, Response extends AcknowledgedResponse> void executeAsync(
final String actionName,
final Request request,
final ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ClientHelper.PROFILING_ORIGIN, request, new ActionListener<>() {
@Override
public void onResponse(Response response) {
if (response.isAcknowledged() == false) {
logger.error(
"Could not execute action [{}] for indices [{}] for [{}], request was not acknowledged",
actionName,
request.indices(),
ClientHelper.PROFILING_ORIGIN
);
}
listener.onResponse(response);
}

@Override
public void onFailure(Exception ex) {
logger.error(
() -> format(
"Could not execute action [%s] for indices [%s] for [%s]",
actionName,
request.indices(),
ClientHelper.PROFILING_ORIGIN
),
ex
);
listener.onFailure(ex);
}
}, consumer);
});
}

private int getVersionField(Index index, Map<String, Object> meta, String fieldName) {
Object value = meta.get(fieldName);
if (value instanceof Integer) {
Expand All @@ -189,6 +295,40 @@ private int getVersionField(Index index, Map<String, Object> meta, String fieldN
return -1;
}

protected static final class IndexState<T extends ProfilingIndexAbstraction> {
private final T index;
private final Index writeIndex;
private final Status status;
private final List<Migration> pendingMigrations;

IndexState(T index, Index writeIndex, Status status) {
this(index, writeIndex, status, null);
}

IndexState(T index, Index writeIndex, Status status, List<Migration> pendingMigrations) {
this.index = index;
this.writeIndex = writeIndex;
this.status = status;
this.pendingMigrations = pendingMigrations;
}

public T getIndex() {
return index;
}

public Index getWriteIndex() {
return writeIndex;
}

public Status getStatus() {
return status;
}

public List<Migration> getPendingMigrations() {
return pendingMigrations;
}
}

enum Status {
CLOSED(false),
UNHEALTHY(false),
Expand All @@ -214,5 +354,7 @@ interface ProfilingIndexAbstraction {
String getName();

int getVersion();

List<Migration> getMigrations(int currentIndexTemplateVersion);
}
}
Loading