Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Adds a check to skip serialization-deserialization if request is for same node (#2765) #2973

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.action.support.ActionFilter;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.component.Lifecycle.State;
Expand Down Expand Up @@ -207,6 +208,7 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin
private volatile ConfigurationRepository cr;
private volatile AdminDNs adminDns;
private volatile ClusterService cs;
private static volatile DiscoveryNode localNode;
private volatile AuditLog auditLog;
private volatile BackendRegistry backendRegistry;
private volatile SslExceptionHandler sslExceptionHandler;
Expand Down Expand Up @@ -1790,11 +1792,12 @@ public List<String> getSettingsFilter() {
}

@Override
public void onNodeStarted() {
public void onNodeStarted(DiscoveryNode localNode) {
log.info("Node started");
if (!SSLConfig.isSslOnlyMode() && !client && !disabled) {
cr.initOnNodeStart();
}
this.localNode = localNode;
final Set<ModuleInfo> securityModules = ReflectionHelper.getModulesLoaded();
log.info("{} OpenSearch Security modules loaded so far: {}", securityModules.size(), securityModules);
}
Expand Down Expand Up @@ -1874,6 +1877,14 @@ private static String handleKeyword(final String field) {
return field;
}

public static DiscoveryNode getLocalNode() {
return localNode;
}

public static void setLocalNode(DiscoveryNode node) {
localNode = node;
}

public static class GuiceHolder implements LifecycleComponent {

private static RepositoriesService repositoriesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -131,7 +132,6 @@ public <T extends TransportResponse> void sendRequestDecorate(
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {

final Map<String, String> origHeaders0 = getThreadContext().getHeaders();
final User user0 = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER);
final String injectedUserString = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER);
Expand All @@ -146,6 +146,9 @@ public <T extends TransportResponse> void sendRequestDecorate(
final String origCCSTransientMf = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_CCS);

final boolean isDebugEnabled = log.isDebugEnabled();
final DiscoveryNode localNode = OpenSearchSecurityPlugin.getLocalNode();
boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode());

try (ThreadContext.StoredContext stashedContext = getThreadContext().stashContext()) {
final TransportResponseHandler<T> restoringHandler = new RestoringTransportResponseHandler<T>(handler, stashedContext);
getThreadContext().putHeader("_opendistro_security_remotecn", cs.getClusterName().value());
Expand Down Expand Up @@ -223,7 +226,7 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROL

getThreadContext().putHeader(headerMap);

ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString);
ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString, isSameNodeRequest);

if (isActionTraceEnabled()) {
getThreadContext().putHeader(
Expand All @@ -249,7 +252,8 @@ private void ensureCorrectHeaders(
final User origUser,
final String origin,
final String injectedUserString,
final String injectedRolesString
final String injectedRolesString,
boolean isSameNodeRequest
) {
// keep original address

Expand All @@ -263,30 +267,49 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADE
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADER, Origin.LOCAL.toString());
}

TransportAddress transportAddress = null;
if (remoteAdr != null && remoteAdr instanceof TransportAddress) {

String remoteAddressHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);

if (remoteAddressHeader == null) {
getThreadContext().putHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER,
Base64Helper.serializeObject(((TransportAddress) remoteAdr).address())
);
transportAddress = (TransportAddress) remoteAdr;
}
}

String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
// we put headers as transient for same node requests
if (isSameNodeRequest) {
if (transportAddress != null) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, transportAddress);
}

if (userHeader == null) {
if (origUser != null) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser));
// if request is going to be handled by same node, we directly put transient value as the thread context is not going to be
// stah.
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER, origUser);
} else if (StringUtils.isNotEmpty(injectedRolesString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString);
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesString);
} else if (StringUtils.isNotEmpty(injectedUserString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString);
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserString);
}
} else {
if (transportAddress != null) {
getThreadContext().putHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER,
Base64Helper.serializeObject(transportAddress.address())
);
}
}

final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
if (userHeader == null) {
// put as headers for other requests
if (origUser != null) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser));
} else if (StringUtils.isNotEmpty(injectedRolesString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString);
} else if (StringUtils.isNotEmpty(injectedUserString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString);
}
}
}
}

private ThreadContext getThreadContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

package org.opensearch.security.transport;

// CS-SUPPRESS-SINGLE: RegexpSingleline Extensions manager used to allow/disallow TLS connections to extensions
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Objects;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.opensearch.transport.TransportRequestHandler;

import static org.opensearch.security.OpenSearchSecurityPlugin.isActionTraceEnabled;
// CS-ENFORCE-SINGLE

public class SecurityRequestHandler<T extends TransportRequest> extends SecuritySSLRequestHandler<T> {

Expand Down Expand Up @@ -140,7 +142,31 @@ protected void messageReceivedDecorate(
}

// bypass non-netty requests
if (channelType.equals("direct")) {
if (getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS) != null) {

final String rolesValidation = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER
);
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}

if (isActionTraceEnabled()) {
getThreadContext().putHeader(
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(),
Thread.currentThread().getName()
+ " DIR -> "
+ transportChannel.getChannelType()
+ " "
+ getThreadContext().getHeaders()
);
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);
} else {
final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
final String injectedRolesHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER);
final String injectedUserHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER);
Expand All @@ -160,15 +186,15 @@ protected void messageReceivedDecorate(
);
}

final String originalRemoteAddress = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER
);
String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);

if (!Strings.isNullOrEmpty(originalRemoteAddress)) {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS,
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress))
);
} else {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress());
}

final String rolesValidation = getThreadContext().getHeader(
Expand All @@ -177,20 +203,9 @@ protected void messageReceivedDecorate(
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}
}

if (isActionTraceEnabled()) {
getThreadContext().putHeader(
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(),
Thread.currentThread().getName()
+ " DIR -> "
+ transportChannel.getChannelType()
+ " "
+ getThreadContext().getHeaders()
);
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);

if (channelType.equals("direct")) {
super.messageReceivedDecorate(request, handler, transportChannel, task);
return;
}
Expand Down Expand Up @@ -223,11 +238,13 @@ protected void messageReceivedDecorate(
// if transport channel is not a netty channel but a direct or local channel (e.g. send via network) then allow it (regardless
// of beeing a internal: or shard request)
// also allow when issued from a remote cluster for cross cluster search
// CS-SUPPRESS-SINGLE: RegexpSingleline Used to allow/disallow TLS connections to extensions
if (!HeaderHelper.isInterClusterRequest(getThreadContext())
&& !HeaderHelper.isTrustedClusterRequest(getThreadContext())
&& !HeaderHelper.isExtensionRequest(getThreadContext())
&& !task.getAction().equals("internal:transport/handshake")
&& (task.getAction().startsWith("internal:") || task.getAction().contains("["))) {
// CS-ENFORCE-SINGLE

auditLog.logMissingPrivileges(task.getAction(), request, task);
log.error(
Expand Down Expand Up @@ -267,57 +284,11 @@ protected void messageReceivedDecorate(
}

// network intercluster request or cross search cluster request
if (HeaderHelper.isInterClusterRequest(getThreadContext())
// CS-SUPPRESS-SINGLE: RegexpSingleline Used to allow/disallow TLS connections to extensions
if (!(HeaderHelper.isInterClusterRequest(getThreadContext())
|| HeaderHelper.isTrustedClusterRequest(getThreadContext())
|| HeaderHelper.isExtensionRequest(getThreadContext())) {

final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
final String injectedRolesHeader = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER
);
final String injectedUserHeader = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER
);

if (Strings.isNullOrEmpty(userHeader)) {
// Keeping role injection with higher priority as plugins under OpenSearch will be using this
// on transport layer
if (!Strings.isNullOrEmpty(injectedRolesHeader)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesHeader);
} else if (!Strings.isNullOrEmpty(injectedUserHeader)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserHeader);
}
} else {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_USER,
Objects.requireNonNull((User) Base64Helper.deserializeObject(userHeader))
);
}

String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);

if (!Strings.isNullOrEmpty(originalRemoteAddress)) {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS,
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress))
);
} else {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress());
}

final String rolesValidation = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER
);
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}

} else {
// this is a netty request from a non-server node (maybe also be internal: or a shard request)
// and therefore issued by a transport client

// since OS 2.0 we do not support this any longer because transport client no longer available

|| HeaderHelper.isExtensionRequest(getThreadContext()))) {
// CS-ENFORCE-SINGLE
final OpenSearchException exception = ExceptionUtils.createTransportClientNoLongerSupportedException();
log.error(exception.toString());
transportChannel.sendResponse(exception);
Expand All @@ -340,9 +311,8 @@ protected void messageReceivedDecorate(
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);

super.messageReceivedDecorate(request, handler, transportChannel, task);
}
super.messageReceivedDecorate(request, handler, transportChannel, task);
} finally {

if (isActionTraceEnabled()) {
Expand Down Expand Up @@ -404,13 +374,15 @@ protected void addAdditionalContextValues(
}
}

// CS-SUPPRESS-SINGLE: RegexpSingleline Extensions manager used to allow/disallow TLS connections to extensions
String extensionUniqueId = getThreadContext().getHeader("extension_unique_id");
if (extensionUniqueId != null) {
ExtensionsManager extManager = OpenSearchSecurityPlugin.GuiceHolder.getExtensionsManager();
if (extManager.lookupExtensionSettingsById(extensionUniqueId).isPresent()) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_SSL_TRANSPORT_EXTENSION_REQUEST, Boolean.TRUE);
}
}
// CS-ENFORCE-SINGLE

super.addAdditionalContextValues(action, request, localCerts, peerCerts, principal);
}
Expand Down
Loading