Skip to content

Commit

Permalink
Pass compatibility versions in ClusterCoordinationPlugin (#99396)
Browse files Browse the repository at this point in the history
In #99321, we passed
CompatibilityVersions into GatewayMetaState so that a node's initial
cluster state will hold correct initial values for CompatibilityVersions
for a single node, and not have to wait until a node join event to
populate that part of cluster state.

However, a ClusterCoordinationPlugin may provide a different
`PersistedClusterStateService`, which might also need its own access to
CompatibilityVersions. Here, we add a `CompatibilityVersions` argument
to `PersistedClusterStateServiceFactory#newPersistedClusterStateService`
so that an implementation of `PersistedClusterStateServiceFactory` can
pass CompatibilityVersions into whatever mechanism it uses for creating
the initial cluster state.
  • Loading branch information
williamrandolph authored Sep 11, 2023
1 parent 79aba8c commit e386a3f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
10 changes: 6 additions & 4 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -622,14 +622,16 @@ protected Node(
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());

CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(
xContentRegistry,
clusterService.getClusterSettings(),
threadPool
threadPool,
compatibilityVersions
);

// collect engine factory providers from plugins
Expand Down Expand Up @@ -927,7 +929,6 @@ protected Node(
);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
final DiscoveryModule discoveryModule = new DiscoveryModule(
settings,
transportService,
Expand Down Expand Up @@ -1359,7 +1360,8 @@ private WriteLoadForecaster getWriteLoadForecaster(ThreadPool threadPool, Settin
private PersistedClusterStateService newPersistedClusterStateService(
NamedXContentRegistry xContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool
ThreadPool threadPool,
CompatibilityVersions compatibilityVersions
) {
final List<ClusterCoordinationPlugin.PersistedClusterStateServiceFactory> persistedClusterStateServiceFactories = pluginsService
.filterPlugins(ClusterCoordinationPlugin.class)
Expand All @@ -1374,7 +1376,7 @@ private PersistedClusterStateService newPersistedClusterStateService(

if (persistedClusterStateServiceFactories.size() == 1) {
return persistedClusterStateServiceFactories.get(0)
.newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
.newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool, compatibilityVersions);
}

return new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -76,12 +77,24 @@ CoordinationState.PersistedState createPersistedState(
}

interface PersistedClusterStateServiceFactory {

@Deprecated(forRemoval = true)
PersistedClusterStateService newPersistedClusterStateService(
NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool
);

default PersistedClusterStateService newPersistedClusterStateService(
NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool,
CompatibilityVersions compatibilityVersions
) {
return newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
}
}

interface ReconfiguratorFactory {
Expand Down
30 changes: 25 additions & 5 deletions server/src/test/java/org/elasticsearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.core.RestApiVersion;
Expand Down Expand Up @@ -633,15 +635,33 @@ private static class BaseTestClusterCoordinationPlugin extends Plugin implements

@Override
public Optional<PersistedClusterStateServiceFactory> getPersistedClusterStateServiceFactory() {
return Optional.of(
(nodeEnvironment, namedXContentRegistry, clusterSettings, threadPool) -> persistedClusterStateService =
new PersistedClusterStateService(
return Optional.of(new PersistedClusterStateServiceFactory() {
@Override
public PersistedClusterStateService newPersistedClusterStateService(
NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool
) {
throw new AssertionError("not called");
}

@Override
public PersistedClusterStateService newPersistedClusterStateService(
NodeEnvironment nodeEnvironment,
NamedXContentRegistry namedXContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool,
CompatibilityVersions compatibilityVersions
) {
return persistedClusterStateService = new PersistedClusterStateService(
nodeEnvironment,
namedXContentRegistry,
clusterSettings,
threadPool::relativeTimeInMillis
)
);
);
}
});
}
}

Expand Down

0 comments on commit e386a3f

Please sign in to comment.