Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Add KoP's own callback handlers for OAuth 2.0 authentication #405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ protected void authenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest,
if (authenticator != null) {
authenticator.authenticate(
kafkaHeaderAndRequest.getHeader(), kafkaHeaderAndRequest.getRequest(), responseFuture);
final String role = authenticator.getRole();
if (role == null) {
return;
}
// TODO: role is used for authorization, but KoP doesn't support authorization currently.
// See https://github.com/streamnative/kop/issues/236
if (log.isDebugEnabled()) {
log.debug("[{}] has authenticated successfully with role {}",
getRemoteAddress(), authenticator.getRole());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;

import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -49,6 +50,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;

/**
Expand All @@ -59,12 +61,16 @@ public class SaslAuthenticator {

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

private final AuthenticationService authenticationService;
@Getter
private static volatile AuthenticationService authenticationService = null;
private final BrokerService brokerService;
private final PulsarAdmin admin;
private final Set<String> allowedMechanisms;
private final AuthenticateCallbackHandler oauth2CallbackHandler;
private State state = State.HANDSHAKE_OR_VERSIONS_REQUEST;
private SaslServer saslServer;
@Getter
private String role = null;

private enum State {
HANDSHAKE_OR_VERSIONS_REQUEST,
Expand Down Expand Up @@ -96,7 +102,7 @@ public UnsupportedSaslMechanismException(String mechanism) {
public SaslAuthenticator(PulsarService pulsarService,
Set<String> allowedMechanisms,
KafkaServiceConfiguration config) throws PulsarServerException {
this.authenticationService = pulsarService.getBrokerService().getAuthenticationService();
this.brokerService = pulsarService.getBrokerService();
this.admin = pulsarService.getAdminClient();
this.allowedMechanisms = allowedMechanisms;
this.oauth2CallbackHandler = createOauth2CallbackHandler(config);
Expand All @@ -105,6 +111,9 @@ public SaslAuthenticator(PulsarService pulsarService,
public void authenticate(RequestHeader header,
AbstractRequest request,
CompletableFuture<AbstractResponse> response) throws AuthenticationException {
if (authenticationService == null) {
authenticationService = brokerService.getAuthenticationService();
}
switch (state) {
case HANDSHAKE_OR_VERSIONS_REQUEST:
case HANDSHAKE_REQUEST:
Expand Down Expand Up @@ -235,6 +244,7 @@ private void handleAuthenticate(RequestHeader header,
try {
byte[] responseToken = saslServer.evaluateResponse(Utils.toArray(saslAuthenticateRequest.saslAuthBytes()));
ByteBuffer responseBuf = (responseToken == null) ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
this.role = saslServer.getAuthorizationID();
responseFuture.complete(new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
} catch (SaslException e) {
responseFuture.complete(new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.naming.AuthenticationException;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;

import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerValidationResult;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

/**
* OAuth 2.0 server callback handler.
*/
public class OauthValidatorCallbackHandler implements AuthenticateCallbackHandler {

private ServerConfig config = null;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) {
throw new IllegalArgumentException("Unexpected SASL mechanism: " + saslMechanism);
}
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) {
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
}
final Map<String, String> options = (Map<String, String>) jaasConfigEntries.get(0).getOptions();
if (options == null) {
throw new IllegalArgumentException("JAAS configuration options is null");
}
this.config = new ServerConfig(options);
}

@Override
public void close() {
// empty
}

@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback) {
OAuthBearerValidatorCallback validatorCallback = (OAuthBearerValidatorCallback) callback;
try {
handleCallback(validatorCallback);
} catch (OAuthBearerIllegalTokenException e) {
final OAuthBearerValidationResult failureReason = e.reason();
final String failureScope = failureReason.failureScope();
validatorCallback.error(failureScope != null ? "insufficient_scope" : "invalid_token",
failureScope, failureReason.failureOpenIdConfig());
}
} else {
throw new UnsupportedCallbackException(callback);
}
}
}

private void handleCallback(OAuthBearerValidatorCallback callback) {
if (callback.tokenValue() == null) {
throw new IllegalArgumentException("Callback has null token value!");
}
if (SaslAuthenticator.getAuthenticationService() == null) {
throw new IllegalStateException("AuthenticationService is null during token validation");
}
final AuthenticationProvider authenticationProvider =
SaslAuthenticator.getAuthenticationService().getAuthenticationProvider(config.getValidateMethod());
if (authenticationProvider == null) {
throw new IllegalStateException("No AuthenticationProvider found for method " + config.getValidateMethod());
}

final String token = callback.tokenValue();
try {
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(token.getBytes(StandardCharsets.UTF_8)), null, null);
final String role = authState.getAuthRole();
callback.token(new OAuthBearerToken() {
@Override
public String value() {
return token;
}

@Override
public Set<String> scope() {
return null;
}

@Override
public long lifetimeMs() {
// TODO: convert "exp" claim to ms.
return Long.MAX_VALUE;
}

@Override
public String principalName() {
return role;
}

@Override
public Long startTimeMs() {
// TODO: convert "iat" claim to ms.
return Long.MAX_VALUE;
}
});
} catch (AuthenticationException e) {
throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult.newFailure(e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import java.util.Map;
import lombok.Getter;

/**
* The server configs associated with OauthValidatorCallbackHandler.
*
* @see OauthValidatorCallbackHandler
*/
@Getter
public class ServerConfig {

public static final String OAUTH_VALIDATE_METHOD = "oauth.validate.method";

private final String validateMethod;

public ServerConfig(Map<String, String> configs) {
this.validateMethod = configs.get(OAUTH_VALIDATE_METHOD);
if (this.validateMethod == null) {
throw new IllegalArgumentException("no key for " + OAUTH_VALIDATE_METHOD);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import java.util.HashMap;
import java.util.Map;

import org.testng.Assert;
import org.testng.annotations.Test;

/**
* Test ServerConfig.
*
* @see ServerConfig
*/
public class ServerConfigTest {

@Test
public void testValidConfig() {
final Map<String, String> configs = new HashMap<>();
configs.put(ServerConfig.OAUTH_VALIDATE_METHOD, "token");

final ServerConfig serverConfig = new ServerConfig(configs);
Assert.assertEquals(serverConfig.getValidateMethod(), "token");
}

@Test
public void testInvalidConfig() {
try {
new ServerConfig(new HashMap<>());
} catch (IllegalArgumentException e) {
Assert.assertEquals(e.getMessage(), "no key for " + ServerConfig.OAUTH_VALIDATE_METHOD);
}
}
}
36 changes: 36 additions & 0 deletions oauth-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>pulsar-protocol-handler-kafka-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.8.0-SNAPSHOT</version>
</parent>

<artifactId>oauth-client</artifactId>
<name>StreamNative :: Pulsar Protocol Handler :: OAuth 2.0 Client</name>
<description>OAuth 2.0 login callback handler for Kafka client</description>

<!-- include the dependencies -->
<dependencies>
<!-- runtime dependencies -->
</dependencies>
</project>
Loading