Skip to content

Commit

Permalink
fix: Authentication changes to better support FlightSQL clients (#6032)
Browse files Browse the repository at this point in the history
Handshake calls previously required the client to send at least one
message before sending a response, even if the required auth information
was included in headers, now the server will respond and close the
stream right away. This change could cause problems for older DH clients
(specifically JS and Go clients), but the fix for this was released in
0.36.

Bearer authentication would fail if the token wasn't in the form of a
UUID, even if an alternative authentication handler was going to accept
the token.

Some Flight clients are unable to send auth types other than Basic and
Bearer, our authentication interceptor can now support the auth type as
a space-separated prefix of the auth token.

Fixes #5922
  • Loading branch information
niloc132 authored Oct 8, 2024
1 parent 3c6f593 commit 39a2fc0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.server.arrow;

import com.github.f4b6a3.uuid.UuidCreator;
import com.github.f4b6a3.uuid.exception.InvalidUuidException;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteStringAccess;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -27,6 +29,7 @@
import io.deephaven.util.SafeCloseable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.jetbrains.annotations.NotNull;
Expand All @@ -35,8 +38,10 @@
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;

@Singleton
Expand Down Expand Up @@ -73,6 +78,30 @@ public FlightServiceGrpcImpl(
@Override
public StreamObserver<Flight.HandshakeRequest> handshake(
@NotNull final StreamObserver<Flight.HandshakeResponse> responseObserver) {
// handle the scenario where authentication headers initialized a session
SessionState session = sessionService.getOptionalSession();
if (session != null) {
// Do not reply over the stream, some clients will break if they receive a message here - but since the
// session was already created, our "200 OK" will include the Bearer response already. Do not close
// yet to avoid hitting https://github.com/envoyproxy/envoy/issues/30149.
return new StreamObserver<>() {
@Override
public void onNext(Flight.HandshakeRequest value) {
// noop, already sent response
}

@Override
public void onError(Throwable t) {
// ignore, already closed
}

@Override
public void onCompleted() {
GrpcUtil.safelyComplete(responseObserver);
}
};
}

return new HandshakeObserver(responseObserver);
}

Expand All @@ -87,13 +116,6 @@ private HandshakeObserver(StreamObserver<Flight.HandshakeResponse> responseObser

@Override
public void onNext(final Flight.HandshakeRequest value) {
// handle the scenario where authentication headers initialized a session
SessionState session = sessionService.getOptionalSession();
if (session != null) {
respondWithAuthTokenBin(session);
return;
}

final AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener =
(protocol, response) -> {
GrpcUtil.safelyComplete(responseObserver, Flight.HandshakeResponse.newBuilder()
Expand All @@ -109,6 +131,23 @@ public void onNext(final Flight.HandshakeRequest value) {
auth = login(BasicAuthMarshaller.AUTH_TYPE, protocolVersion, payload, handshakeResponseListener);
if (auth.isEmpty()) {
final WrappedAuthenticationRequest req = WrappedAuthenticationRequest.parseFrom(payload);
// If the auth request is bearer, the v1 auth might be trying to renew an existing session
if (req.getType().equals(Auth2Constants.BEARER_PREFIX.trim())) {
try {
UUID uuid = UuidCreator.fromString(req.getPayload().toString(StandardCharsets.US_ASCII));
SessionState session = sessionService.getSessionForToken(uuid);
if (session != null) {
SessionService.TokenExpiration expiration = session.getExpiration();
if (expiration != null) {
respondWithAuthTokenBin(expiration);
}
}
return;
} catch (IllegalArgumentException | InvalidUuidException ignored) {
}
}

// Attempt to log in with the given type and token
auth = login(req.getType(), protocolVersion, req.getPayload(), handshakeResponseListener);
}
} catch (final AuthenticationException | InvalidProtocolBufferException err) {
Expand All @@ -122,8 +161,8 @@ public void onNext(final Flight.HandshakeRequest value) {
return;
}

session = sessionService.newSession(auth.get());
respondWithAuthTokenBin(session);
SessionState session = sessionService.newSession(auth.get());
respondWithAuthTokenBin(session.getExpiration());
}

private Optional<AuthContext> login(String type, long version, ByteString payload,
Expand All @@ -137,10 +176,10 @@ private Optional<AuthContext> login(String type, long version, ByteString payloa
}

/** send the bearer token as an AuthTokenBin, as headers might have already been sent */
private void respondWithAuthTokenBin(SessionState session) {
private void respondWithAuthTokenBin(SessionService.TokenExpiration expiration) {
isComplete = true;
responseObserver.onNext(Flight.HandshakeResponse.newBuilder()
.setPayload(session.getExpiration().getTokenAsByteString())
.setPayload(expiration.getTokenAsByteString())
.build());
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.server.session;

import com.github.f4b6a3.uuid.UuidCreator;
import com.github.f4b6a3.uuid.exception.InvalidUuidException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -325,6 +327,9 @@ public long getExpirationDelayMs() {
* @return the session or null if the session is invalid
*/
public SessionState getSessionForAuthToken(final String token) throws AuthenticationException {
String bearerKey = null;
String bearerPayload = null;

if (token.startsWith(Auth2Constants.BEARER_PREFIX)) {
final String authToken = token.substring(Auth2Constants.BEARER_PREFIX.length());
try {
Expand All @@ -333,21 +338,41 @@ public SessionState getSessionForAuthToken(final String token) throws Authentica
if (session != null) {
return session;
}
} catch (IllegalArgumentException ignored) {
} catch (InvalidUuidException ignored) {
}

// In case we don't have another handler for Bearer, look for nested tokens to try later
int offset = authToken.indexOf(' ');
bearerKey = authToken.substring(0, offset < 0 ? authToken.length() : offset);
bearerPayload = offset < 0 ? "" : authToken.substring(offset + 1);
}

int offset = token.indexOf(' ');
final String key = token.substring(0, offset < 0 ? token.length() : offset);
final String payload = offset < 0 ? "" : token.substring(offset + 1);
// Use the auth type to look up a handler. If this happens to be Bearer, this gives a chance for an auth handler
// to claim that type
AuthenticationRequestHandler handler = authRequestHandlers.get(key);
if (handler == null) {
log.info().append("No AuthenticationRequestHandler registered for type ").append(key).endl();
throw new AuthenticationException();
if (handler != null) {
Optional<AuthContext> s = handler.login(payload, SessionServiceGrpcImpl::insertCallHeader);
if (s.isPresent()) {
return newSession(s.get());
}
}
return handler.login(payload, SessionServiceGrpcImpl::insertCallHeader)
.map(this::newSession)
.orElseThrow(AuthenticationException::new);
// If nothing succeeded or errored yet, and we found a key in the bearer value, try that next
if (bearerKey != null) {
handler = authRequestHandlers.get(bearerKey);
if (handler != null) {
Optional<AuthContext> s = handler.login(bearerPayload, SessionServiceGrpcImpl::insertCallHeader);
if (s.isPresent()) {
return newSession(s.get());
}
}
}

// No more options, log an error and return
log.info().append("No AuthenticationRequestHandler registered for type ").append(key).endl();
throw new AuthenticationException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.server.session;

import com.github.f4b6a3.uuid.UuidCreator;
import com.github.f4b6a3.uuid.exception.InvalidUuidException;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import io.deephaven.auth.AuthContext;
Expand Down Expand Up @@ -38,10 +40,10 @@
import javax.inject.Singleton;
import java.io.Closeable;
import java.lang.Object;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

public class SessionServiceGrpcImpl extends SessionServiceGrpc.SessionServiceImplBase {
/**
Expand Down Expand Up @@ -344,8 +346,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<Re
final byte[] altToken = metadata.get(AuthConstants.TOKEN_KEY);
if (altToken != null) {
try {
session = service.getSessionForToken(UUID.fromString(new String(altToken)));
} catch (IllegalArgumentException ignored) {
session = service.getSessionForToken(
UuidCreator.fromString(new String(altToken, StandardCharsets.US_ASCII)));
} catch (IllegalArgumentException | InvalidUuidException ignored) {
}
}

Expand Down

0 comments on commit 39a2fc0

Please sign in to comment.