diff --git a/README.md b/README.md new file mode 100644 index 0000000..5196a14 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +## Kafka SPIFFE Principal Builder + +A custom `KafkaPrincipalBuilder` implementation for Apache Kafka. +This class and documentation deals only with `SslAuthenticationContext`, we do not support any other context at the moment (Kerberos, SASL, Oauth) + +#### Default behavior +The default `DefaultKafkaPrincipalBuilder` class that comes with Apache Kafka builds a principal +name according to the x509 Subject in the SSL certificate. Since there is no logic that deals with *Subject Alternative Name*, +this approach cannot handle a *SPIFFE ID*. + +#### New behavior +The principal builder first looks for any valid *SPIFFE ID* in the certificate, if found, the *KafkaPrincipal* that will +be returned would be seen by an *ACL Authorizer* as **SPIFFE:spiffe://some.spiffe.id.uri**. If that fails, a normal usage of the Subject will +used with a normal **USER:CN=...** + \ No newline at end of file diff --git a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java index 26a0faf..4560f51 100644 --- a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java +++ b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java @@ -7,10 +7,19 @@ import java.security.cert.Certificate; import java.security.cert.CertificateParsingException; import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.commons.lang3.StringUtils.startsWith; public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder { + private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class); + + private static final String SPIFFE_TYPE = "SPIFFE"; + public KafkaPrincipal build(AuthenticationContext context) { if (context instanceof PlaintextAuthenticationContext) { return KafkaPrincipal.ANONYMOUS; @@ -30,14 +39,27 @@ public KafkaPrincipal build(AuthenticationContext context) { return KafkaPrincipal.ANONYMOUS; } X509Certificate cert = (X509Certificate) peerCerts[0]; - return cert.getSubjectAlternativeNames().stream() - .map(san -> (String) san.get(1)) - .filter(uri -> startsWith(uri, "spiffe://")) - .findFirst() - .map(s -> new KafkaPrincipal(KafkaPrincipal.USER_TYPE, s)) - .orElse(KafkaPrincipal.ANONYMOUS); + + Collection> sanCollection = cert.getSubjectAlternativeNames(); + KafkaPrincipal principal; + + if (sanCollection != null) { + principal = sanCollection.stream() + .map(san -> (String) san.get(1)) + .filter(uri -> startsWith(uri, "spiffe://")) + .findFirst() + .map(s -> new KafkaPrincipal(SPIFFE_TYPE, s)) + .orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName())); + } else { + principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()); + } + + LOG.debug("PrincipalBuilder found principal: {}", principal.toString()); + + return principal; } catch (SSLPeerUnverifiedException | CertificateParsingException se) { + LOG.warn("Unhandled exception: " + se.toString()); return KafkaPrincipal.ANONYMOUS; } } -} +} \ No newline at end of file