diff --git a/pom.xml b/pom.xml
index 79029f3..691103e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,6 @@
-
org.apache.kafka
@@ -28,9 +27,27 @@
1.1.1
- org.apache.commons
- commons-lang3
- 3.7
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ org.mockito
+ mockito-core
+ 2.20.1
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ 1.6.4
+ test
diff --git a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
index 4560f51..65b2f1f 100644
--- a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
+++ b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
@@ -1,7 +1,13 @@
package io.okro.kafka;
-import org.apache.kafka.common.security.auth.*;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.security.cert.Certificate;
@@ -10,56 +16,64 @@
import java.util.Collection;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.commons.lang3.StringUtils.startsWith;
-
public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);
private static final String SPIFFE_TYPE = "SPIFFE";
public KafkaPrincipal build(AuthenticationContext context) {
- if (context instanceof PlaintextAuthenticationContext) {
+ if (!(context instanceof SslAuthenticationContext)) {
+ LOG.trace("non-SSL connection coerced to ANONYMOUS");
return KafkaPrincipal.ANONYMOUS;
}
- if (!(context instanceof SslAuthenticationContext)) {
- throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
+ SSLSession session = ((SslAuthenticationContext) context).session();
+ X509Certificate cert = firstX509(session);
+ if (cert == null) {
+ LOG.trace("first peer certificate missing / not x509");
+ return KafkaPrincipal.ANONYMOUS;
}
- SSLSession sslSession = ((SslAuthenticationContext) context).session();
+ String spiffeId = spiffeId(cert);
+ if (spiffeId == null) {
+ return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
+ }
+
+ return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
+ }
+
+ private @Nullable X509Certificate firstX509(SSLSession session) {
try {
- Certificate[] peerCerts = sslSession.getPeerCertificates();
- if (peerCerts == null || peerCerts.length == 0) {
- return KafkaPrincipal.ANONYMOUS;
+ Certificate[] peerCerts = session.getPeerCertificates();
+ if (peerCerts.length == 0) {
+ return null;
}
- if (!(peerCerts[0] instanceof X509Certificate)) {
- return KafkaPrincipal.ANONYMOUS;
+ Certificate first = peerCerts[0];
+ if (!(first instanceof X509Certificate)) {
+ return null;
}
- X509Certificate cert = (X509Certificate) peerCerts[0];
-
- Collection> sanCollection = cert.getSubjectAlternativeNames();
- KafkaPrincipal principal;
+ return (X509Certificate) first;
+ } catch (SSLPeerUnverifiedException e) {
+ LOG.warn("failed to extract certificate", e);
+ return null;
+ }
+ }
- if (sanCollection != null) {
- principal = sanCollection.stream()
- .map(san -> (String) san.get(1))
- .filter(uri -> startsWith(uri, "spiffe://"))
- .findFirst()
- .map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
- .orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
- } else {
- principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
+ private @Nullable String spiffeId(X509Certificate cert) {
+ try {
+ Collection> sans = cert.getSubjectAlternativeNames();
+ if (sans == null) {
+ return null;
}
- LOG.debug("PrincipalBuilder found principal: {}", principal.toString());
-
- return principal;
- } catch (SSLPeerUnverifiedException | CertificateParsingException se) {
- LOG.warn("Unhandled exception: " + se.toString());
- return KafkaPrincipal.ANONYMOUS;
+ return sans.stream()
+ .map(san -> (String) san.get(1))
+ .filter(uri -> uri.startsWith("spiffe://"))
+ .findFirst()
+ .orElse(null);
+ } catch (CertificateParsingException e) {
+ LOG.warn("failed to parse SAN", e);
+ return null;
}
}
}
\ No newline at end of file
diff --git a/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java b/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
new file mode 100644
index 0000000..dfcdde2
--- /dev/null
+++ b/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
@@ -0,0 +1,87 @@
+package io.okro.kafka;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.junit.Test;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SpiffePrincipalBuilderTest {
+
+ private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ // load cert
+ ClassLoader classLoader = getClass().getClassLoader();
+ InputStream in = classLoader.getResourceAsStream(certPath);
+ CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);
+
+ // mock ssl session
+ SSLSession session = mock(SSLSession.class);
+ when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
+ return new SslAuthenticationContext(session, InetAddress.getLocalHost());
+ }
+
+ /**
+ * X509 V3 with a SPIFFE-based SAN extension.
+ * Should result in 'SPIFFE:[spiffe://uri]'
+ */
+ @Test
+ public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
+
+ assertEquals("SPIFFE", principal.getPrincipalType());
+ assertEquals(principal.getName(), "spiffe://srv1.okro.io");
+ }
+
+ /**
+ * X509 V1 certificate with no SAN extension.
+ * Should fall back to 'User:CN=[CN]'
+ */
+ @Test
+ public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
+
+ assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+ assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
+ }
+
+ /**
+ * X509 V3 with a non-SPIFFE SAN extension.
+ * Should fall back to 'User:CN=[CN]'
+ */
+ @Test
+ public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
+
+ assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+ assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
+ }
+
+ /**
+ * Non-SSL context.
+ * Should be unauthenticated.
+ */
+ @Test
+ public void TestNoSSLContext() throws java.net.UnknownHostException {
+ PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
+
+ assertEquals(KafkaPrincipal.ANONYMOUS, principal);
+ }
+}
diff --git a/src/test/resources/san-no-spiffe-cert.pem b/src/test/resources/san-no-spiffe-cert.pem
new file mode 100644
index 0000000..e4bd81b
--- /dev/null
+++ b/src/test/resources/san-no-spiffe-cert.pem
@@ -0,0 +1,26 @@
+-----BEGIN CERTIFICATE-----
+MIIEYTCCAkmgAwIBAgIJAMGzBF0pPMySMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
+BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
+BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzMwMjAy
+NjA0WhcNMjgwNzI3MjAyNjA0WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
+LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
+VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYzMIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEA1lWqlsmeDKL1yR4aR1Swvb3nW3VGmQs9VejpVWh1Yh/g
+paj51zuSHVXYIhfC26mtOdsJkDq5sNyemxGGKDyhnsJQ6w3RVhtI1t89myut0wjZ
+qY/mXNkpfZhPKRI31N/UFx5LMr9PZY51t4+5Q362H3YwRiP497xeJ0SUbG+Hm5z5
+nrsY5ENecHFF4KATL6/topYd0rns4IEjvbaocOIv+yhyOPAXEEyD7KIozGuPf09h
+3M50ne5oMjjTlFk7+lJ6gMSZdnyKUMc01sP9Jvu80Pbq6prXqV0akSAWo8BGUX8+
+0RQMktivMyjhyTD/LNUzEdlOPuKrYxODjrtAxTCLpwIDAQABoxswGTAXBgNVHREE
+EDAOggxzcnYzLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBAJl/MXI7JVccfaQB
+VnnYiLLjX/auW0BWh06Pg0vNWfyfXBbHWJWKps7k1SBtkSBsuqd6ecB4LJAYvbWf
+51+j/aw0I71xXw3rXdhuh7mOwi0rg3t/RRNs5EJz/GPk734MecuJcNLrWS/YRq0a
+TojJFW22rmlem9vL7OkHPoNDh4LbHN9Wyxgdjt+VmVK+o7Gx4DSHZxo/Dmz6Gp0F
+dmKNCUG6yYcxdgDvr0mz4rJeLlqXwb/gjLFmYfnyVWZRB3WOUQuejY7W17W16LYP
+11fqlOJSuhyj7E5NekikxO9asOclTlw7qDG7i0PpBTLuLgqDppOo9xZOHrK+DUrf
+r3CP9d7P86g2oTv4d6btKUGez3wIU5qfrb4mAZDp6/nngR9hpcFaeGqtjtgJj7Bo
+elbJEPy2f3Uwocbatr7nI1szNDJuEoXi0nnAT/h1K4m3EuTRuUsEn8ySJJYwB2fc
+ExBhHSrljm111VN/a5TLhT5sZJYKDiVYQ0SJEZRo9rJg8P+G6Xa30JyFa1r6iYOp
+UUE8nsWPGcdC6HKu7Tr1YsvbjHTG900O+Rco8at1EWu7s+b278VcYzRJy5z+HM3N
++0/EDFpu9xsSt3PtRGVk57gtf1nAECSV1OQSBCGy+E/MUpBQEqq3VL4vM9H3ANSz
+stX902cJF9ZYpG8Hn5Xt4q8AnFIw
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/src/test/resources/spiffe-cert.pem b/src/test/resources/spiffe-cert.pem
new file mode 100644
index 0000000..6b4c311
--- /dev/null
+++ b/src/test/resources/spiffe-cert.pem
@@ -0,0 +1,26 @@
+-----BEGIN CERTIFICATE-----
+MIIEajCCAlKgAwIBAgIJAMGzBF0pPMyOMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
+BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
+BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzI5MTMx
+OTU4WhcNMjgwNzI2MTMxOTU4WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
+LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
+VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYxMIIBIjANBgkqhkiG9w0BAQEF
+AAOCAQ8AMIIBCgKCAQEAw0VF+zmGt6q8WGeoIwzyAyWrzRIMGwsUyMK04PkVlHji
+Tl6TxW8twISdTiHTOHTuIvwSuVE5PcqtrPLCAvOU5MgGv0BLG92z344peHELAVRI
+akgQkBEBIehr9Km2vdQtsYRMRJ8nDQiK8A7jiKVI0YBVpGjFREG9yBGG/DnUu95r
+7vJVE3KrWC/zIAS+e83j4DuMHoPXGInDD2dCbw86XSj9EDbGCrPZb9op8TwSP4ym
+f61DoeSBca6AzLeY3fsScZKSHd9p4BA69eYKgRVxI+fMYzKAysd3wX+mrqt1eT+w
+Lhmyw9/8V+TaMq3Q8eRnVD4zJ65YFushANm37da38wIDAQABoyQwIjAgBgNVHREE
+GTAXghVzcGlmZmU6Ly9zcnYxLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBABxQ
+vYqXGsCB1gbBpgNFELQ1I6jcPqA/7VCRPgB7l1SvXNw70zBjMgIIgPMdCm+ZnoMK
+dd4loizOygU3pWIUJYKo2BobcxdqV4fLvmZkVJQtqlhrxN36DA1sgI7D+TYQJGSR
+QFY2aEvlzCZnBatRCQ3c4DPiz7qoB9H1JsRejCipzMvGxXu+aOBjKM7HiFLTznsy
+sBh01UHCIxIMdSJTtdGs3PIIgKMlAC5TQR4nqDCiqgzTo0JPCeyPIYmFy8iwigM+
+CC280ZHWIkvOYMOBM4VswcSzDF8XRZG/vIf7tV3jp0XtJ1EddL+GM18eLh4vIWt8
+b7PZnabtIBvITzjjW5Np05kQu1W3riqBveyQHz2fB1IpwTO2mXPyal1vw1Wrj5WQ
+QlFJjUTNxk0J4yVNHQbErrtBnGE0Zl4brcq7HNGsd0d0knOJ8F/edSB+USdeqNMH
+qBd1GTI1a/dNQ2oC34zvq+cUSzz4L+pK+2nJSeWpb/WAn0V/EgnH6APluxwpUs6i
+QqTqyCu2ecrvR0hhEiRNmBZW4+QaZe4aLrLClCEED+/nuvZf/9qLhIdEsq8bbHtP
+SIHhTv8pMfT0uej0JxpLKJDLvsmOUUVKCRMW//kArCZHmHKKOPKKwuNCNCRbIzib
+EN4HE4RU0BwVRZscMz7nTuSGqucN/Nrae1yC/k9C
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/src/test/resources/subject-only-cert.pem b/src/test/resources/subject-only-cert.pem
new file mode 100644
index 0000000..ad4cc32
--- /dev/null
+++ b/src/test/resources/subject-only-cert.pem
@@ -0,0 +1,25 @@
+-----BEGIN CERTIFICATE-----
+MIIEPzCCAicCCQDBswRdKTzMkDANBgkqhkiG9w0BAQsFADBaMQswCQYDVQQGEwJJ
+TDERMA8GA1UECAwIVGVsLUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQK
+DAdva3JvLmlvMRMwEQYDVQQDDApva3JvLmlvIENBMB4XDTE4MDcyOTEzMzE1MFoX
+DTI4MDcyNjEzMzE1MFowaTELMAkGA1UEBhMCSUwxETAPBgNVBAgMCFRlbC1Bdml2
+MREwDwYDVQQHDAhUZWwtQXZpdjEQMA4GA1UECgwHb2tyby5pbzETMBEGA1UECwwK
+YXJjaGl0ZWN0czENMAsGA1UEAwwEc3J2MjCCASIwDQYJKoZIhvcNAQEBBQADggEP
+ADCCAQoCggEBANxdN6zBI4lm86BX9dJ3tsT3OoJ6Ei6AxcfZmVydQJZ/XlOOWIKl
+zceANkSeRV4XKfy6qePHHNIMY1jD1I/tLsCWXMy0RuNMmU9ea+ZIY9s3+WkMzXTD
+2vdtMM+8nxnwa3aR276r3Bna0ics/KEZG7GrmnQ3OzFMJojGLfowKXIMuZXUkTZE
+vGmvpwuaxUVmUsiogMGTIGtCCKCjzSrsrJc6yEK7gygiBInUtZj/2fqHita2hN6M
+e1MyCDkIATtO6Q4ahrlWrKwEW1Zn4bgjvXUjyFVlGUfOlm4UeICsp2T1zHPNWGqc
+yz+Drp3mWHCWJZBnXLc7xZiXPazw2fRUuMECAwEAATANBgkqhkiG9w0BAQsFAAOC
+AgEAD+DOZqMUFF1CboIX9YVokXuWwINLYAYruhQF5kgwU1hLaWDJh+FGOKO4doNO
+G4DDGkXFXM+5u6gl7R84sF2thx+mcPABsUheWqOOToEeHpe5+sgjtdMlnOutRHuD
+q445OseLARd2XtiGFC7SYFk/mrmeH9N29rA5x/w0uS4eFPQYtVcnhQd8LkUExKjL
+2KnYPr0lD5TIHN20mlf0AxRbMa9Mjz2vUTLlqwqG7IQJWYco+VbI76i/walOv0+Z
+SQNsoKPF+du10Tg0NGQtcQ9CY/C3lNZLxofKXI0JiAlcjJ5emFYh8MsbUS4ESfjH
+dut5RyiTDLa0/LZa5w2CBi0fGSFhGK7+q7xPtvGczD+YIWDePQEkcmsKSoyyRzlu
+xl6F8C1nT5xj0qAaxcRpfNuTUhkaqnEvzw0BJRS4Kajd/s5/sTCzqcAdK1Rwy6AI
+M+7nqZM4iHE/yx/uxapu7RWiGBi48SBIThOYOizrYktH/eIYYJ0fEGLAqGu3mdV0
+bBQxcFXSzMypfzAcwIXUNedpqsHCucbJHcxsMYymm2jAjlgU8VcJRvmW4u2JNizf
+yzEttSYG2Zti7hJJtWCf9Ek7kaXusC6Uhii6qnCqEWulE8DrZl4Y6ZYKYpe0UMsm
+V7kOqym53MchmkaBrc8yVjDE9R6USb/1OvQrjf6PjyFLIkE=
+-----END CERTIFICATE-----
\ No newline at end of file