-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds a check to skip serialization-deserialization if request is for same node #2765
Changes from all commits
b76e804
4a8dda5
2e693cf
25796cb
d8a4f9c
44d5048
d60bbb2
a55a0a9
73ff9cb
62b6172
8a09ea8
fbb6041
ab7177d
4a0125b
79ad17f
20af819
0d4aa13
2ae647a
fc1234d
f114b72
35de56d
219b0b3
691019b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,6 +43,7 @@ | |||||
import org.opensearch.action.get.GetRequest; | ||||||
import org.opensearch.action.search.SearchAction; | ||||||
import org.opensearch.action.search.SearchRequest; | ||||||
import org.opensearch.cluster.node.DiscoveryNode; | ||||||
import org.opensearch.cluster.service.ClusterService; | ||||||
import org.opensearch.common.io.stream.StreamInput; | ||||||
import org.opensearch.common.settings.Settings; | ||||||
|
@@ -131,7 +132,6 @@ public <T extends TransportResponse> void sendRequestDecorate( | |||||
TransportRequestOptions options, | ||||||
TransportResponseHandler<T> handler | ||||||
) { | ||||||
|
||||||
final Map<String, String> origHeaders0 = getThreadContext().getHeaders(); | ||||||
final User user0 = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER); | ||||||
final String injectedUserString = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER); | ||||||
|
@@ -146,6 +146,9 @@ public <T extends TransportResponse> void sendRequestDecorate( | |||||
final String origCCSTransientMf = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_CCS); | ||||||
|
||||||
final boolean isDebugEnabled = log.isDebugEnabled(); | ||||||
final DiscoveryNode localNode = OpenSearchSecurityPlugin.getLocalNode(); | ||||||
boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode()); | ||||||
|
||||||
try (ThreadContext.StoredContext stashedContext = getThreadContext().stashContext()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could not yet find a solution to not stash the context for same node. Tried with these changes:
But the test fails with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is perhaps good idea to continue stashing from security point of view too. That way, any existing transients assumed to be cleared are not carry forwarded. For example let us assume current thread context had a,b,c entries and we explicitly passed around only a & b. Now carry forwarding c may be a security concern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would guess that this is related to the wrong check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we say |
||||||
final TransportResponseHandler<T> restoringHandler = new RestoringTransportResponseHandler<T>(handler, stashedContext); | ||||||
getThreadContext().putHeader("_opendistro_security_remotecn", cs.getClusterName().value()); | ||||||
|
@@ -223,7 +226,7 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROL | |||||
|
||||||
getThreadContext().putHeader(headerMap); | ||||||
|
||||||
ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString); | ||||||
ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString, isSameNodeRequest); | ||||||
|
||||||
if (isActionTraceEnabled()) { | ||||||
getThreadContext().putHeader( | ||||||
|
@@ -249,7 +252,8 @@ private void ensureCorrectHeaders( | |||||
final User origUser, | ||||||
final String origin, | ||||||
final String injectedUserString, | ||||||
final String injectedRolesString | ||||||
final String injectedRolesString, | ||||||
boolean isSameNodeRequest | ||||||
) { | ||||||
// keep original address | ||||||
|
||||||
|
@@ -263,30 +267,49 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADE | |||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADER, Origin.LOCAL.toString()); | ||||||
} | ||||||
|
||||||
TransportAddress transportAddress = null; | ||||||
if (remoteAdr != null && remoteAdr instanceof TransportAddress) { | ||||||
|
||||||
String remoteAddressHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER); | ||||||
|
||||||
if (remoteAddressHeader == null) { | ||||||
getThreadContext().putHeader( | ||||||
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER, | ||||||
Base64Helper.serializeObject(((TransportAddress) remoteAdr).address()) | ||||||
); | ||||||
transportAddress = (TransportAddress) remoteAdr; | ||||||
} | ||||||
} | ||||||
|
||||||
String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER); | ||||||
// we put headers as transient for same node requests | ||||||
if (isSameNodeRequest) { | ||||||
if (transportAddress != null) { | ||||||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, transportAddress); | ||||||
} | ||||||
|
||||||
if (userHeader == null) { | ||||||
if (origUser != null) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser)); | ||||||
// if request is going to be handled by same node, we directly put transient value as the thread context is not going to be | ||||||
// stah. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER, origUser); | ||||||
} else if (StringUtils.isNotEmpty(injectedRolesString)) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString); | ||||||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesString); | ||||||
} else if (StringUtils.isNotEmpty(injectedUserString)) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString); | ||||||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserString); | ||||||
} | ||||||
} else { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not intend to retain the existing |
||||||
if (transportAddress != null) { | ||||||
getThreadContext().putHeader( | ||||||
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER, | ||||||
Base64Helper.serializeObject(transportAddress.address()) | ||||||
); | ||||||
} | ||||||
} | ||||||
|
||||||
final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER); | ||||||
if (userHeader == null) { | ||||||
// put as headers for other requests | ||||||
if (origUser != null) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser)); | ||||||
} else if (StringUtils.isNotEmpty(injectedRolesString)) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString); | ||||||
} else if (StringUtils.isNotEmpty(injectedUserString)) { | ||||||
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString); | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
private ThreadContext getThreadContext() { | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,7 +95,6 @@ protected void messageReceivedDecorate( | |
final TransportChannel transportChannel, | ||
Task task | ||
) throws Exception { | ||
|
||
String resolvedActionClass = request.getClass().getSimpleName(); | ||
|
||
if (request instanceof BulkShardRequest) { | ||
|
@@ -142,7 +141,31 @@ protected void messageReceivedDecorate( | |
} | ||
|
||
// bypass non-netty requests | ||
if (channelType.equals("direct")) { | ||
if (getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER) != null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic in the receiver was simplified using @krishna-ggk 's suggestion here: #2765 (comment) The receiver does not need any logic to determine where the request came from, if the transient headers are present then there is no need to deserialize. If the serialized (non-transient) headers are present then it will deserialize them. |
||
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER) != null | ||
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES) != null | ||
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS) != null) { | ||
|
||
final String rolesValidation = getThreadContext().getHeader( | ||
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER | ||
); | ||
if (!Strings.isNullOrEmpty(rolesValidation)) { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation); | ||
} | ||
|
||
if (isActionTraceEnabled()) { | ||
getThreadContext().putHeader( | ||
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(), | ||
Thread.currentThread().getName() | ||
+ " DIR -> " | ||
+ transportChannel.getChannelType() | ||
+ " " | ||
+ getThreadContext().getHeaders() | ||
); | ||
} | ||
|
||
putInitialActionClassHeader(initialActionClassValue, resolvedActionClass); | ||
} else { | ||
final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER); | ||
final String injectedRolesHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER); | ||
final String injectedUserHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER); | ||
|
@@ -162,15 +185,15 @@ protected void messageReceivedDecorate( | |
); | ||
} | ||
|
||
final String originalRemoteAddress = getThreadContext().getHeader( | ||
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER | ||
); | ||
String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER); | ||
|
||
if (!Strings.isNullOrEmpty(originalRemoteAddress)) { | ||
getThreadContext().putTransient( | ||
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, | ||
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress)) | ||
); | ||
} else { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress()); | ||
} | ||
|
||
final String rolesValidation = getThreadContext().getHeader( | ||
|
@@ -179,20 +202,9 @@ protected void messageReceivedDecorate( | |
if (!Strings.isNullOrEmpty(rolesValidation)) { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation); | ||
} | ||
} | ||
|
||
if (isActionTraceEnabled()) { | ||
getThreadContext().putHeader( | ||
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(), | ||
Thread.currentThread().getName() | ||
+ " DIR -> " | ||
+ transportChannel.getChannelType() | ||
+ " " | ||
+ getThreadContext().getHeaders() | ||
); | ||
} | ||
|
||
putInitialActionClassHeader(initialActionClassValue, resolvedActionClass); | ||
|
||
if (channelType.equals("direct")) { | ||
super.messageReceivedDecorate(request, handler, transportChannel, task); | ||
return; | ||
} | ||
|
@@ -272,58 +284,10 @@ protected void messageReceivedDecorate( | |
|
||
// network intercluster request or cross search cluster request | ||
// CS-SUPPRESS-SINGLE: RegexpSingleline Used to allow/disallow TLS connections to extensions | ||
if (HeaderHelper.isInterClusterRequest(getThreadContext()) | ||
if (!(HeaderHelper.isInterClusterRequest(getThreadContext()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We either need to update the comment above this or remove the exclamation point I think. |
||
|| HeaderHelper.isTrustedClusterRequest(getThreadContext()) | ||
|| HeaderHelper.isExtensionRequest(getThreadContext())) { | ||
|| HeaderHelper.isExtensionRequest(getThreadContext()))) { | ||
// CS-ENFORCE-SINGLE | ||
|
||
final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER); | ||
final String injectedRolesHeader = getThreadContext().getHeader( | ||
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER | ||
); | ||
final String injectedUserHeader = getThreadContext().getHeader( | ||
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER | ||
); | ||
|
||
if (Strings.isNullOrEmpty(userHeader)) { | ||
// Keeping role injection with higher priority as plugins under OpenSearch will be using this | ||
// on transport layer | ||
if (!Strings.isNullOrEmpty(injectedRolesHeader)) { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesHeader); | ||
} else if (!Strings.isNullOrEmpty(injectedUserHeader)) { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserHeader); | ||
} | ||
} else { | ||
getThreadContext().putTransient( | ||
ConfigConstants.OPENDISTRO_SECURITY_USER, | ||
Objects.requireNonNull((User) Base64Helper.deserializeObject(userHeader)) | ||
); | ||
} | ||
|
||
String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER); | ||
|
||
if (!Strings.isNullOrEmpty(originalRemoteAddress)) { | ||
getThreadContext().putTransient( | ||
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, | ||
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress)) | ||
); | ||
} else { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress()); | ||
} | ||
|
||
final String rolesValidation = getThreadContext().getHeader( | ||
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER | ||
); | ||
if (!Strings.isNullOrEmpty(rolesValidation)) { | ||
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation); | ||
} | ||
|
||
} else { | ||
// this is a netty request from a non-server node (maybe also be internal: or a shard request) | ||
// and therefore issued by a transport client | ||
|
||
// since OS 2.0 we do not support this any longer because transport client no longer available | ||
|
||
final OpenSearchException exception = ExceptionUtils.createTransportClientNoLongerSupportedException(); | ||
log.error(exception.toString()); | ||
transportChannel.sendResponse(exception); | ||
|
@@ -346,9 +310,8 @@ protected void messageReceivedDecorate( | |
} | ||
|
||
putInitialActionClassHeader(initialActionClassValue, resolvedActionClass); | ||
|
||
super.messageReceivedDecorate(request, handler, transportChannel, task); | ||
} | ||
super.messageReceivedDecorate(request, handler, transportChannel, task); | ||
} finally { | ||
|
||
if (isActionTraceEnabled()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need this for security Interceptor tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add
localNode
toSecurityInterceptor
constructor and remove this static method only needed for testing?