Skip to content

Commit

Permalink
* Add a fallback to Subject principal when SAN principal could not be…
Browse files Browse the repository at this point in the history
… determined.

* Add logging for debug
  • Loading branch information
royantman committed Jul 30, 2018
1 parent 458cada commit f1da6e2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Kafka SPIFFE Principal Builder

A custom `KafkaPrincipalBuilder` implementation for Apache Kafka.
This class and documentation deals only with `SslAuthenticationContext`, we do not support any other context at the moment (Kerberos, SASL, Oauth)

#### Default behavior
The default `DefaultKafkaPrincipalBuilder` class that comes with Apache Kafka builds a principal
name according to the x509 Subject in the SSL certificate. Since there is no logic that deals with *Subject Alternative Name*,
this approach cannot handle a *SPIFFE ID*.

#### New behavior
The principal builder first looks for any valid *SPIFFE ID* in the certificate, if found, the *KafkaPrincipal* that will
be returned would be seen by an *ACL Authorizer* as **SPIFFE:spiffe://some.spiffe.id.uri**. If that fails, a normal usage of the Subject will
used with a normal **USER:CN=...**

36 changes: 29 additions & 7 deletions src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@
import java.security.cert.Certificate;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.commons.lang3.StringUtils.startsWith;

public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);

private static final String SPIFFE_TYPE = "SPIFFE";

public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof PlaintextAuthenticationContext) {
return KafkaPrincipal.ANONYMOUS;
Expand All @@ -30,14 +39,27 @@ public KafkaPrincipal build(AuthenticationContext context) {
return KafkaPrincipal.ANONYMOUS;
}
X509Certificate cert = (X509Certificate) peerCerts[0];
return cert.getSubjectAlternativeNames().stream()
.map(san -> (String) san.get(1))
.filter(uri -> startsWith(uri, "spiffe://"))
.findFirst()
.map(s -> new KafkaPrincipal(KafkaPrincipal.USER_TYPE, s))
.orElse(KafkaPrincipal.ANONYMOUS);

Collection<List<?>> sanCollection = cert.getSubjectAlternativeNames();
KafkaPrincipal principal;

if (sanCollection != null) {
principal = sanCollection.stream()
.map(san -> (String) san.get(1))
.filter(uri -> startsWith(uri, "spiffe://"))
.findFirst()
.map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
.orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
} else {
principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
}

LOG.debug("PrincipalBuilder found principal: {}", principal.toString());

return principal;
} catch (SSLPeerUnverifiedException | CertificateParsingException se) {
LOG.warn("Unhandled exception: " + se.toString());
return KafkaPrincipal.ANONYMOUS;
}
}
}
}

0 comments on commit f1da6e2

Please sign in to comment.