Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent flaky behavior when determining if an request will be executed on the current node. #3066

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin
private volatile ConfigurationRepository cr;
private volatile AdminDNs adminDns;
private volatile ClusterService cs;
private static volatile DiscoveryNode localNode;
private volatile AtomicReference<DiscoveryNode> localNode = new AtomicReference<>();
private volatile AuditLog auditLog;
private volatile BackendRegistry backendRegistry;
private volatile SslExceptionHandler sslExceptionHandler;
Expand Down Expand Up @@ -776,7 +776,7 @@ public <T extends TransportResponse> void sendRequest(
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
si.sendRequestDecorate(sender, connection, action, request, options, handler);
si.sendRequestDecorate(sender, connection, action, request, options, handler, localNode.get());
}
};
}
Expand Down Expand Up @@ -1806,7 +1806,7 @@ public void onNodeStarted(DiscoveryNode localNode) {
if (!SSLConfig.isSslOnlyMode() && !client && !disabled) {
cr.initOnNodeStart();
peternied marked this conversation as resolved.
Show resolved Hide resolved
}
this.localNode = localNode;
this.localNode.set(localNode);
final Set<ModuleInfo> securityModules = ReflectionHelper.getModulesLoaded();
log.info("{} OpenSearch Security modules loaded so far: {}", securityModules.size(), securityModules);
}
Expand Down Expand Up @@ -1886,14 +1886,6 @@ private static String handleKeyword(final String field) {
return field;
}

public static DiscoveryNode getLocalNode() {
return localNode;
}

public static void setLocalNode(DiscoveryNode node) {
localNode = node;
}

public static class GuiceHolder implements LifecycleComponent {

private static RepositoriesService repositoriesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public <T extends TransportResponse> void sendRequestDecorate(
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
TransportResponseHandler<T> handler,
DiscoveryNode localNode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a parameter for sendRequestDecorate makes more sense IMO

) {
final Map<String, String> origHeaders0 = getThreadContext().getHeaders();
final User user0 = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER);
Expand All @@ -146,8 +147,7 @@ public <T extends TransportResponse> void sendRequestDecorate(
final String origCCSTransientMf = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_CCS);

final boolean isDebugEnabled = log.isDebugEnabled();
final DiscoveryNode localNode = OpenSearchSecurityPlugin.getLocalNode();
boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode());
final boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode());

try (ThreadContext.StoredContext stashedContext = getThreadContext().stashContext()) {
final TransportResponseHandler<T> restoringHandler = new RestoringTransportResponseHandler<T>(handler, stashedContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ public void testSendRequestDecorate() {
DiscoveryNode otherNode = new DiscoveryNode("local-node", OpenSearchTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
Connection connection2 = transportService.getConnection(otherNode);

// setting localNode value explicitly
OpenSearchSecurityPlugin.setLocalNode(localNode);

// isSameNodeRequest = true
securityInterceptor.sendRequestDecorate(sender, connection1, action, request, options, handler);
securityInterceptor.sendRequestDecorate(sender, connection1, action, request, options, handler, localNode);
// from thread context inside sendRequestDecorate
doAnswer(i -> {
User transientUser = threadPool.getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER);
Expand All @@ -165,7 +162,7 @@ public void testSendRequestDecorate() {
assertEquals(threadPool.getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER), null);

// isSameNodeRequest = false
securityInterceptor.sendRequestDecorate(sender, connection2, action, request, options, handler);
securityInterceptor.sendRequestDecorate(sender, connection2, action, request, options, handler, otherNode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is now fully deterministic 👍

// checking thread context inside sendRequestDecorate
doAnswer(i -> {
String serializedUserHeader = threadPool.getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
Expand Down