diff --git a/pom.xml b/pom.xml
index 9c03685..691103e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,6 @@
-
org.apache.kafka
@@ -28,41 +27,27 @@
1.1.1
- org.apache.commons
- commons-lang3
- 3.7
-
-
-
- commons-io
- commons-io
- 2.6
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
-
junit
junit
4.12
test
-
- org.easymock
- easymock
- 3.6
+ org.mockito
+ mockito-core
+ 2.20.1
test
-
- org.slf4j
- slf4j-api
- 1.7.5
-
-
org.slf4j
slf4j-simple
1.6.4
-
+ test
diff --git a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
index 4560f51..aa9ede8 100644
--- a/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
+++ b/src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
@@ -1,6 +1,11 @@
package io.okro.kafka;
-import org.apache.kafka.common.security.auth.*;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@@ -10,56 +15,64 @@
import java.util.Collection;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.commons.lang3.StringUtils.startsWith;
-
public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);
private static final String SPIFFE_TYPE = "SPIFFE";
public KafkaPrincipal build(AuthenticationContext context) {
- if (context instanceof PlaintextAuthenticationContext) {
+ if (!(context instanceof SslAuthenticationContext)) {
+ LOG.trace("non-SSL connection coerced to ANONYMOUS");
return KafkaPrincipal.ANONYMOUS;
}
- if (!(context instanceof SslAuthenticationContext)) {
- throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
+ SSLSession session = ((SslAuthenticationContext) context).session();
+ X509Certificate cert = firstX509(session);
+ if (cert == null) {
+ LOG.trace("first peer certificate missing / not x509");
+ return KafkaPrincipal.ANONYMOUS;
}
- SSLSession sslSession = ((SslAuthenticationContext) context).session();
+ String spiffeId = spiffeId(cert);
+ if (spiffeId == null) {
+ return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
+ }
+
+ return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
+ }
+
+ private X509Certificate firstX509(SSLSession session) {
try {
- Certificate[] peerCerts = sslSession.getPeerCertificates();
- if (peerCerts == null || peerCerts.length == 0) {
- return KafkaPrincipal.ANONYMOUS;
+ Certificate[] peerCerts = session.getPeerCertificates();
+ if (peerCerts.length == 0) {
+ return null;
}
- if (!(peerCerts[0] instanceof X509Certificate)) {
- return KafkaPrincipal.ANONYMOUS;
+ Certificate first = peerCerts[0];
+ if (!(first instanceof X509Certificate)) {
+ return null;
}
- X509Certificate cert = (X509Certificate) peerCerts[0];
-
- Collection> sanCollection = cert.getSubjectAlternativeNames();
- KafkaPrincipal principal;
+ return (X509Certificate) first;
+ } catch (SSLPeerUnverifiedException e) {
+ LOG.warn("failed to extract certificate", e);
+ return null;
+ }
+ }
- if (sanCollection != null) {
- principal = sanCollection.stream()
- .map(san -> (String) san.get(1))
- .filter(uri -> startsWith(uri, "spiffe://"))
- .findFirst()
- .map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
- .orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
- } else {
- principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
+ private String spiffeId(X509Certificate cert) {
+ try {
+ Collection> sans = cert.getSubjectAlternativeNames();
+ if (sans == null) {
+ return null;
}
- LOG.debug("PrincipalBuilder found principal: {}", principal.toString());
-
- return principal;
- } catch (SSLPeerUnverifiedException | CertificateParsingException se) {
- LOG.warn("Unhandled exception: " + se.toString());
- return KafkaPrincipal.ANONYMOUS;
+ return sans.stream()
+ .map(san -> (String) san.get(1))
+ .filter(uri -> uri.startsWith("spiffe://"))
+ .findFirst()
+ .orElse(null);
+ } catch (CertificateParsingException e) {
+ LOG.warn("failed to parse SAN", e);
+ return null;
}
}
}
\ No newline at end of file
diff --git a/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java b/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
index da8d5fd..dfcdde2 100644
--- a/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
+++ b/src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
@@ -1,162 +1,87 @@
package io.okro.kafka;
-import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
+import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.junit.Test;
-import java.io.ByteArrayInputStream;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
-import java.net.InetAddress;
-import javax.net.ssl.SSLSession;
-
-import org.apache.kafka.common.security.auth.*;
-import org.apache.commons.io.IOUtils;
-
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
-import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class SpiffePrincipalBuilderTest extends EasyMockSupport {
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
- private X509Certificate getResourceAsCert(String resourcePath)
- throws java.io.IOException, java.security.cert.CertificateException {
+public class SpiffePrincipalBuilderTest {
+ private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ // load cert
ClassLoader classLoader = getClass().getClassLoader();
- try {
- // Read cert
- ByteArrayInputStream certInputStream =
- new ByteArrayInputStream(IOUtils.toByteArray(classLoader.getResourceAsStream(resourcePath)));
-
- // Parse as X509 certificate
- CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
- return (X509Certificate) certificateFactory.generateCertificate(certInputStream);
-
- } catch (java.io.IOException | java.security.cert.CertificateException e) {
- System.out.println("Problem with reading the certificate file. " + e.toString());
- throw e;
- }
+ InputStream in = classLoader.getResourceAsStream(certPath);
+ CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);
+
+ // mock ssl session
+ SSLSession session = mock(SSLSession.class);
+ when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
+ return new SslAuthenticationContext(session, InetAddress.getLocalHost());
}
+ /**
+ * X509 V3 with a SPIFFE-based SAN extension.
+ * Should result in 'SPIFFE:[spiffe://uri]'
+ */
@Test
- public void TestSubjectOnlyCert() {
- // Tests an X509 V1 certificate with no SAN extension
+ public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
- try {
- X509Certificate cert = getResourceAsCert("subject-only-cert.pem");
-
- // Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
- SSLSession session = mock(SSLSession.class);
- EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
-
- replayAll();
-
- // Build KafkaPrincipal
- SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
-
- KafkaPrincipal principal = builder.build(
- new SslAuthenticationContext(session, InetAddress.getLocalHost()));
-
- // Identity type should be "User"
- assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
-
- // Identity should be a string
- assertNotNull(principal.getName());
-
- System.out.println("Principal: " + principal.toString());
-
- } catch (java.io.IOException | java.security.cert.CertificateException e) {
- System.out.println("Problem with reading the certificate file. " + e.toString());
- }
+ assertEquals("SPIFFE", principal.getPrincipalType());
+ assertEquals(principal.getName(), "spiffe://srv1.okro.io");
}
+ /**
+ * X509 V1 certificate with no SAN extension.
+ * Should fall back to 'User:CN=[CN]'
+ */
@Test
- public void TestSpiffeCert() {
- // Tests an X509 V3 with SAN extension holding a SPIFFE ID
-
- try {
- X509Certificate cert = getResourceAsCert("spiffe-cert.pem");
-
- // Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
- SSLSession session = mock(SSLSession.class);
- EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
+ public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
- replayAll();
-
- // Build KafkaPrincipal
- SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
-
- KafkaPrincipal principal = builder.build(
- new SslAuthenticationContext(session, InetAddress.getLocalHost()));
-
- // Identity type should be "SPIFFE"
- assertEquals("SPIFFE", principal.getPrincipalType());
-
- // Identity should be a string
- assertNotNull(principal.getName());
-
- System.out.println("Principal: " + principal.toString());
-
- } catch (java.io.IOException | java.security.cert.CertificateException e) {
- System.out.println("Problem with reading the certificate file. " + e.toString());
- }
+ assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+ assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}
+ /**
+ * X509 V3 with a non-SPIFFE SAN extension.
+ * Should fall back to 'User:CN=[CN]'
+ */
@Test
- public void TestSanNoSpiffeCert() {
- // Tests an X509 V3 with SAN extension holding a regular FQDN
-
- try {
- X509Certificate cert = getResourceAsCert("san-no-spiffe-cert.pem");
-
- // Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
- SSLSession session = mock(SSLSession.class);
- EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
-
- replayAll();
-
- // Build KafkaPrincipal
- SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
-
- KafkaPrincipal principal = builder.build(
- new SslAuthenticationContext(session, InetAddress.getLocalHost()));
+ public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
+ SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
- // Identity type should be "User"
- assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
-
- // Identity should be a string
- assertNotNull(principal.getName());
-
- System.out.println("Principal: " + principal.toString());
-
- } catch (java.io.IOException | java.security.cert.CertificateException e) {
- System.out.println("Problem with reading the certificate file. " + e.toString());
- }
+ assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+ assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}
+ /**
+ * Non-SSL context.
+ * Should be unauthenticated.
+ */
@Test
public void TestNoSSLContext() throws java.net.UnknownHostException {
- // Tests non-SSL context behavior
-
- SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();
-
- KafkaPrincipal principal = builder.build(
- new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
+ PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
+ KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);
- // Identity type should be KafkaPrincipal.ANONYMOUS
assertEquals(KafkaPrincipal.ANONYMOUS, principal);
-
- System.out.println("Principal: " + principal.toString());
- }
-
- @Test
- public void TestAwareness() throws InterruptedException {
- // Tests a reviewer's awareness
- TimeUnit.SECONDS.sleep(1);
-
- // Identity type should be KafkaPrincipal.ANONYMOUS
- assertEquals(42, 42);
}
}