Skip to content

Commit

Permalink
Merge pull request #2 from Traiana/unitests
Browse files Browse the repository at this point in the history
Unit testing
  • Loading branch information
adamglt authored Jul 31, 2018
2 parents a7f43de + 2ebfcf9 commit c148e6e
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 38 deletions.
25 changes: 21 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,34 @@
</plugins>
</build>


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
82 changes: 48 additions & 34 deletions src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package io.okro.kafka;

import org.apache.kafka.common.security.auth.*;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.security.cert.Certificate;
Expand All @@ -10,56 +16,64 @@
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.commons.lang3.StringUtils.startsWith;

public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);

private static final String SPIFFE_TYPE = "SPIFFE";

public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof PlaintextAuthenticationContext) {
if (!(context instanceof SslAuthenticationContext)) {
LOG.trace("non-SSL connection coerced to ANONYMOUS");
return KafkaPrincipal.ANONYMOUS;
}

if (!(context instanceof SslAuthenticationContext)) {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
SSLSession session = ((SslAuthenticationContext) context).session();
X509Certificate cert = firstX509(session);
if (cert == null) {
LOG.trace("first peer certificate missing / not x509");
return KafkaPrincipal.ANONYMOUS;
}

SSLSession sslSession = ((SslAuthenticationContext) context).session();
String spiffeId = spiffeId(cert);
if (spiffeId == null) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
}

return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
}

private @Nullable X509Certificate firstX509(SSLSession session) {
try {
Certificate[] peerCerts = sslSession.getPeerCertificates();
if (peerCerts == null || peerCerts.length == 0) {
return KafkaPrincipal.ANONYMOUS;
Certificate[] peerCerts = session.getPeerCertificates();
if (peerCerts.length == 0) {
return null;
}
if (!(peerCerts[0] instanceof X509Certificate)) {
return KafkaPrincipal.ANONYMOUS;
Certificate first = peerCerts[0];
if (!(first instanceof X509Certificate)) {
return null;
}
X509Certificate cert = (X509Certificate) peerCerts[0];

Collection<List<?>> sanCollection = cert.getSubjectAlternativeNames();
KafkaPrincipal principal;
return (X509Certificate) first;
} catch (SSLPeerUnverifiedException e) {
LOG.warn("failed to extract certificate", e);
return null;
}
}

if (sanCollection != null) {
principal = sanCollection.stream()
.map(san -> (String) san.get(1))
.filter(uri -> startsWith(uri, "spiffe://"))
.findFirst()
.map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
.orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
} else {
principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
private @Nullable String spiffeId(X509Certificate cert) {
try {
Collection<List<?>> sans = cert.getSubjectAlternativeNames();
if (sans == null) {
return null;
}

LOG.debug("PrincipalBuilder found principal: {}", principal.toString());

return principal;
} catch (SSLPeerUnverifiedException | CertificateParsingException se) {
LOG.warn("Unhandled exception: " + se.toString());
return KafkaPrincipal.ANONYMOUS;
return sans.stream()
.map(san -> (String) san.get(1))
.filter(uri -> uri.startsWith("spiffe://"))
.findFirst()
.orElse(null);
} catch (CertificateParsingException e) {
LOG.warn("failed to parse SAN", e);
return null;
}
}
}
87 changes: 87 additions & 0 deletions src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.okro.kafka;

import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.junit.Test;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SpiffePrincipalBuilderTest {

private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
// load cert
ClassLoader classLoader = getClass().getClassLoader();
InputStream in = classLoader.getResourceAsStream(certPath);
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);

// mock ssl session
SSLSession session = mock(SSLSession.class);
when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
return new SslAuthenticationContext(session, InetAddress.getLocalHost());
}

/**
* X509 V3 with a SPIFFE-based SAN extension.
* Should result in 'SPIFFE:[spiffe://uri]'
*/
@Test
public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals("SPIFFE", principal.getPrincipalType());
assertEquals(principal.getName(), "spiffe://srv1.okro.io");
}

/**
* X509 V1 certificate with no SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* X509 V3 with a non-SPIFFE SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* Non-SSL context.
* Should be unauthenticated.
*/
@Test
public void TestNoSSLContext() throws java.net.UnknownHostException {
PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.ANONYMOUS, principal);
}
}
26 changes: 26 additions & 0 deletions src/test/resources/san-no-spiffe-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEYTCCAkmgAwIBAgIJAMGzBF0pPMySMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzMwMjAy
NjA0WhcNMjgwNzI3MjAyNjA0WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYzMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEA1lWqlsmeDKL1yR4aR1Swvb3nW3VGmQs9VejpVWh1Yh/g
paj51zuSHVXYIhfC26mtOdsJkDq5sNyemxGGKDyhnsJQ6w3RVhtI1t89myut0wjZ
qY/mXNkpfZhPKRI31N/UFx5LMr9PZY51t4+5Q362H3YwRiP497xeJ0SUbG+Hm5z5
nrsY5ENecHFF4KATL6/topYd0rns4IEjvbaocOIv+yhyOPAXEEyD7KIozGuPf09h
3M50ne5oMjjTlFk7+lJ6gMSZdnyKUMc01sP9Jvu80Pbq6prXqV0akSAWo8BGUX8+
0RQMktivMyjhyTD/LNUzEdlOPuKrYxODjrtAxTCLpwIDAQABoxswGTAXBgNVHREE
EDAOggxzcnYzLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBAJl/MXI7JVccfaQB
VnnYiLLjX/auW0BWh06Pg0vNWfyfXBbHWJWKps7k1SBtkSBsuqd6ecB4LJAYvbWf
51+j/aw0I71xXw3rXdhuh7mOwi0rg3t/RRNs5EJz/GPk734MecuJcNLrWS/YRq0a
TojJFW22rmlem9vL7OkHPoNDh4LbHN9Wyxgdjt+VmVK+o7Gx4DSHZxo/Dmz6Gp0F
dmKNCUG6yYcxdgDvr0mz4rJeLlqXwb/gjLFmYfnyVWZRB3WOUQuejY7W17W16LYP
11fqlOJSuhyj7E5NekikxO9asOclTlw7qDG7i0PpBTLuLgqDppOo9xZOHrK+DUrf
r3CP9d7P86g2oTv4d6btKUGez3wIU5qfrb4mAZDp6/nngR9hpcFaeGqtjtgJj7Bo
elbJEPy2f3Uwocbatr7nI1szNDJuEoXi0nnAT/h1K4m3EuTRuUsEn8ySJJYwB2fc
ExBhHSrljm111VN/a5TLhT5sZJYKDiVYQ0SJEZRo9rJg8P+G6Xa30JyFa1r6iYOp
UUE8nsWPGcdC6HKu7Tr1YsvbjHTG900O+Rco8at1EWu7s+b278VcYzRJy5z+HM3N
+0/EDFpu9xsSt3PtRGVk57gtf1nAECSV1OQSBCGy+E/MUpBQEqq3VL4vM9H3ANSz
stX902cJF9ZYpG8Hn5Xt4q8AnFIw
-----END CERTIFICATE-----
26 changes: 26 additions & 0 deletions src/test/resources/spiffe-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEajCCAlKgAwIBAgIJAMGzBF0pPMyOMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzI5MTMx
OTU4WhcNMjgwNzI2MTMxOTU4WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYxMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAw0VF+zmGt6q8WGeoIwzyAyWrzRIMGwsUyMK04PkVlHji
Tl6TxW8twISdTiHTOHTuIvwSuVE5PcqtrPLCAvOU5MgGv0BLG92z344peHELAVRI
akgQkBEBIehr9Km2vdQtsYRMRJ8nDQiK8A7jiKVI0YBVpGjFREG9yBGG/DnUu95r
7vJVE3KrWC/zIAS+e83j4DuMHoPXGInDD2dCbw86XSj9EDbGCrPZb9op8TwSP4ym
f61DoeSBca6AzLeY3fsScZKSHd9p4BA69eYKgRVxI+fMYzKAysd3wX+mrqt1eT+w
Lhmyw9/8V+TaMq3Q8eRnVD4zJ65YFushANm37da38wIDAQABoyQwIjAgBgNVHREE
GTAXghVzcGlmZmU6Ly9zcnYxLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBABxQ
vYqXGsCB1gbBpgNFELQ1I6jcPqA/7VCRPgB7l1SvXNw70zBjMgIIgPMdCm+ZnoMK
dd4loizOygU3pWIUJYKo2BobcxdqV4fLvmZkVJQtqlhrxN36DA1sgI7D+TYQJGSR
QFY2aEvlzCZnBatRCQ3c4DPiz7qoB9H1JsRejCipzMvGxXu+aOBjKM7HiFLTznsy
sBh01UHCIxIMdSJTtdGs3PIIgKMlAC5TQR4nqDCiqgzTo0JPCeyPIYmFy8iwigM+
CC280ZHWIkvOYMOBM4VswcSzDF8XRZG/vIf7tV3jp0XtJ1EddL+GM18eLh4vIWt8
b7PZnabtIBvITzjjW5Np05kQu1W3riqBveyQHz2fB1IpwTO2mXPyal1vw1Wrj5WQ
QlFJjUTNxk0J4yVNHQbErrtBnGE0Zl4brcq7HNGsd0d0knOJ8F/edSB+USdeqNMH
qBd1GTI1a/dNQ2oC34zvq+cUSzz4L+pK+2nJSeWpb/WAn0V/EgnH6APluxwpUs6i
QqTqyCu2ecrvR0hhEiRNmBZW4+QaZe4aLrLClCEED+/nuvZf/9qLhIdEsq8bbHtP
SIHhTv8pMfT0uej0JxpLKJDLvsmOUUVKCRMW//kArCZHmHKKOPKKwuNCNCRbIzib
EN4HE4RU0BwVRZscMz7nTuSGqucN/Nrae1yC/k9C
-----END CERTIFICATE-----
25 changes: 25 additions & 0 deletions src/test/resources/subject-only-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-----BEGIN CERTIFICATE-----
MIIEPzCCAicCCQDBswRdKTzMkDANBgkqhkiG9w0BAQsFADBaMQswCQYDVQQGEwJJ
TDERMA8GA1UECAwIVGVsLUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQK
DAdva3JvLmlvMRMwEQYDVQQDDApva3JvLmlvIENBMB4XDTE4MDcyOTEzMzE1MFoX
DTI4MDcyNjEzMzE1MFowaTELMAkGA1UEBhMCSUwxETAPBgNVBAgMCFRlbC1Bdml2
MREwDwYDVQQHDAhUZWwtQXZpdjEQMA4GA1UECgwHb2tyby5pbzETMBEGA1UECwwK
YXJjaGl0ZWN0czENMAsGA1UEAwwEc3J2MjCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBANxdN6zBI4lm86BX9dJ3tsT3OoJ6Ei6AxcfZmVydQJZ/XlOOWIKl
zceANkSeRV4XKfy6qePHHNIMY1jD1I/tLsCWXMy0RuNMmU9ea+ZIY9s3+WkMzXTD
2vdtMM+8nxnwa3aR276r3Bna0ics/KEZG7GrmnQ3OzFMJojGLfowKXIMuZXUkTZE
vGmvpwuaxUVmUsiogMGTIGtCCKCjzSrsrJc6yEK7gygiBInUtZj/2fqHita2hN6M
e1MyCDkIATtO6Q4ahrlWrKwEW1Zn4bgjvXUjyFVlGUfOlm4UeICsp2T1zHPNWGqc
yz+Drp3mWHCWJZBnXLc7xZiXPazw2fRUuMECAwEAATANBgkqhkiG9w0BAQsFAAOC
AgEAD+DOZqMUFF1CboIX9YVokXuWwINLYAYruhQF5kgwU1hLaWDJh+FGOKO4doNO
G4DDGkXFXM+5u6gl7R84sF2thx+mcPABsUheWqOOToEeHpe5+sgjtdMlnOutRHuD
q445OseLARd2XtiGFC7SYFk/mrmeH9N29rA5x/w0uS4eFPQYtVcnhQd8LkUExKjL
2KnYPr0lD5TIHN20mlf0AxRbMa9Mjz2vUTLlqwqG7IQJWYco+VbI76i/walOv0+Z
SQNsoKPF+du10Tg0NGQtcQ9CY/C3lNZLxofKXI0JiAlcjJ5emFYh8MsbUS4ESfjH
dut5RyiTDLa0/LZa5w2CBi0fGSFhGK7+q7xPtvGczD+YIWDePQEkcmsKSoyyRzlu
xl6F8C1nT5xj0qAaxcRpfNuTUhkaqnEvzw0BJRS4Kajd/s5/sTCzqcAdK1Rwy6AI
M+7nqZM4iHE/yx/uxapu7RWiGBi48SBIThOYOizrYktH/eIYYJ0fEGLAqGu3mdV0
bBQxcFXSzMypfzAcwIXUNedpqsHCucbJHcxsMYymm2jAjlgU8VcJRvmW4u2JNizf
yzEttSYG2Zti7hJJtWCf9Ek7kaXusC6Uhii6qnCqEWulE8DrZl4Y6ZYKYpe0UMsm
V7kOqym53MchmkaBrc8yVjDE9R6USb/1OvQrjf6PjyFLIkE=
-----END CERTIFICATE-----

0 comments on commit c148e6e

Please sign in to comment.