Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit testing #2

Merged
merged 2 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,34 @@
</plugins>
</build>


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commons not needed here

<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
82 changes: 48 additions & 34 deletions src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package io.okro.kafka;

import org.apache.kafka.common.security.auth.*;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.security.cert.Certificate;
Expand All @@ -10,56 +16,64 @@
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.commons.lang3.StringUtils.startsWith;

public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);

private static final String SPIFFE_TYPE = "SPIFFE";

public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof PlaintextAuthenticationContext) {
if (!(context instanceof SslAuthenticationContext)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip the plaintext/sasl difference

LOG.trace("non-SSL connection coerced to ANONYMOUS");
return KafkaPrincipal.ANONYMOUS;
}

if (!(context instanceof SslAuthenticationContext)) {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
SSLSession session = ((SslAuthenticationContext) context).session();
X509Certificate cert = firstX509(session);
if (cert == null) {
LOG.trace("first peer certificate missing / not x509");
return KafkaPrincipal.ANONYMOUS;
}

SSLSession sslSession = ((SslAuthenticationContext) context).session();
String spiffeId = spiffeId(cert);
if (spiffeId == null) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
}

return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
}

private @Nullable X509Certificate firstX509(SSLSession session) {
try {
Certificate[] peerCerts = sslSession.getPeerCertificates();
if (peerCerts == null || peerCerts.length == 0) {
return KafkaPrincipal.ANONYMOUS;
Certificate[] peerCerts = session.getPeerCertificates();
if (peerCerts.length == 0) {
return null;
}
if (!(peerCerts[0] instanceof X509Certificate)) {
return KafkaPrincipal.ANONYMOUS;
Certificate first = peerCerts[0];
if (!(first instanceof X509Certificate)) {
return null;
}
X509Certificate cert = (X509Certificate) peerCerts[0];

Collection<List<?>> sanCollection = cert.getSubjectAlternativeNames();
KafkaPrincipal principal;
return (X509Certificate) first;
} catch (SSLPeerUnverifiedException e) {
LOG.warn("failed to extract certificate", e);
return null;
}
}

if (sanCollection != null) {
principal = sanCollection.stream()
.map(san -> (String) san.get(1))
.filter(uri -> startsWith(uri, "spiffe://"))
.findFirst()
.map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
.orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
} else {
principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
private @Nullable String spiffeId(X509Certificate cert) {
try {
Collection<List<?>> sans = cert.getSubjectAlternativeNames();
if (sans == null) {
return null;
}

LOG.debug("PrincipalBuilder found principal: {}", principal.toString());

return principal;
} catch (SSLPeerUnverifiedException | CertificateParsingException se) {
LOG.warn("Unhandled exception: " + se.toString());
return KafkaPrincipal.ANONYMOUS;
return sans.stream()
.map(san -> (String) san.get(1))
.filter(uri -> uri.startsWith("spiffe://"))
.findFirst()
.orElse(null);
} catch (CertificateParsingException e) {
LOG.warn("failed to parse SAN", e);
return null;
}
}
}
87 changes: 87 additions & 0 deletions src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.okro.kafka;

import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.junit.Test;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SpiffePrincipalBuilderTest {

private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
// load cert
ClassLoader classLoader = getClass().getClassLoader();
InputStream in = classLoader.getResourceAsStream(certPath);
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);

// mock ssl session
SSLSession session = mock(SSLSession.class);
when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
return new SslAuthenticationContext(session, InetAddress.getLocalHost());
}

/**
* X509 V3 with a SPIFFE-based SAN extension.
* Should result in 'SPIFFE:[spiffe://uri]'
*/
@Test
public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals("SPIFFE", principal.getPrincipalType());
assertEquals(principal.getName(), "spiffe://srv1.okro.io");
}

/**
* X509 V1 certificate with no SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* X509 V3 with a non-SPIFFE SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* Non-SSL context.
* Should be unauthenticated.
*/
@Test
public void TestNoSSLContext() throws java.net.UnknownHostException {
PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

assertEquals(KafkaPrincipal.ANONYMOUS, principal);
}
}
26 changes: 26 additions & 0 deletions src/test/resources/san-no-spiffe-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEYTCCAkmgAwIBAgIJAMGzBF0pPMySMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzMwMjAy
NjA0WhcNMjgwNzI3MjAyNjA0WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYzMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEA1lWqlsmeDKL1yR4aR1Swvb3nW3VGmQs9VejpVWh1Yh/g
paj51zuSHVXYIhfC26mtOdsJkDq5sNyemxGGKDyhnsJQ6w3RVhtI1t89myut0wjZ
qY/mXNkpfZhPKRI31N/UFx5LMr9PZY51t4+5Q362H3YwRiP497xeJ0SUbG+Hm5z5
nrsY5ENecHFF4KATL6/topYd0rns4IEjvbaocOIv+yhyOPAXEEyD7KIozGuPf09h
3M50ne5oMjjTlFk7+lJ6gMSZdnyKUMc01sP9Jvu80Pbq6prXqV0akSAWo8BGUX8+
0RQMktivMyjhyTD/LNUzEdlOPuKrYxODjrtAxTCLpwIDAQABoxswGTAXBgNVHREE
EDAOggxzcnYzLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBAJl/MXI7JVccfaQB
VnnYiLLjX/auW0BWh06Pg0vNWfyfXBbHWJWKps7k1SBtkSBsuqd6ecB4LJAYvbWf
51+j/aw0I71xXw3rXdhuh7mOwi0rg3t/RRNs5EJz/GPk734MecuJcNLrWS/YRq0a
TojJFW22rmlem9vL7OkHPoNDh4LbHN9Wyxgdjt+VmVK+o7Gx4DSHZxo/Dmz6Gp0F
dmKNCUG6yYcxdgDvr0mz4rJeLlqXwb/gjLFmYfnyVWZRB3WOUQuejY7W17W16LYP
11fqlOJSuhyj7E5NekikxO9asOclTlw7qDG7i0PpBTLuLgqDppOo9xZOHrK+DUrf
r3CP9d7P86g2oTv4d6btKUGez3wIU5qfrb4mAZDp6/nngR9hpcFaeGqtjtgJj7Bo
elbJEPy2f3Uwocbatr7nI1szNDJuEoXi0nnAT/h1K4m3EuTRuUsEn8ySJJYwB2fc
ExBhHSrljm111VN/a5TLhT5sZJYKDiVYQ0SJEZRo9rJg8P+G6Xa30JyFa1r6iYOp
UUE8nsWPGcdC6HKu7Tr1YsvbjHTG900O+Rco8at1EWu7s+b278VcYzRJy5z+HM3N
+0/EDFpu9xsSt3PtRGVk57gtf1nAECSV1OQSBCGy+E/MUpBQEqq3VL4vM9H3ANSz
stX902cJF9ZYpG8Hn5Xt4q8AnFIw
-----END CERTIFICATE-----
26 changes: 26 additions & 0 deletions src/test/resources/spiffe-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEajCCAlKgAwIBAgIJAMGzBF0pPMyOMA0GCSqGSIb3DQEBCwUAMFoxCzAJBgNV
BAYTAklMMREwDwYDVQQIDAhUZWwtQXZpdjERMA8GA1UEBwwIVGVsLUF2aXYxEDAO
BgNVBAoMB29rcm8uaW8xEzARBgNVBAMMCm9rcm8uaW8gQ0EwHhcNMTgwNzI5MTMx
OTU4WhcNMjgwNzI2MTMxOTU4WjBpMQswCQYDVQQGEwJJTDERMA8GA1UECAwIVGVs
LUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQKDAdva3JvLmlvMRMwEQYD
VQQLDAphcmNoaXRlY3RzMQ0wCwYDVQQDDARzcnYxMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEAw0VF+zmGt6q8WGeoIwzyAyWrzRIMGwsUyMK04PkVlHji
Tl6TxW8twISdTiHTOHTuIvwSuVE5PcqtrPLCAvOU5MgGv0BLG92z344peHELAVRI
akgQkBEBIehr9Km2vdQtsYRMRJ8nDQiK8A7jiKVI0YBVpGjFREG9yBGG/DnUu95r
7vJVE3KrWC/zIAS+e83j4DuMHoPXGInDD2dCbw86XSj9EDbGCrPZb9op8TwSP4ym
f61DoeSBca6AzLeY3fsScZKSHd9p4BA69eYKgRVxI+fMYzKAysd3wX+mrqt1eT+w
Lhmyw9/8V+TaMq3Q8eRnVD4zJ65YFushANm37da38wIDAQABoyQwIjAgBgNVHREE
GTAXghVzcGlmZmU6Ly9zcnYxLm9rcm8uaW8wDQYJKoZIhvcNAQELBQADggIBABxQ
vYqXGsCB1gbBpgNFELQ1I6jcPqA/7VCRPgB7l1SvXNw70zBjMgIIgPMdCm+ZnoMK
dd4loizOygU3pWIUJYKo2BobcxdqV4fLvmZkVJQtqlhrxN36DA1sgI7D+TYQJGSR
QFY2aEvlzCZnBatRCQ3c4DPiz7qoB9H1JsRejCipzMvGxXu+aOBjKM7HiFLTznsy
sBh01UHCIxIMdSJTtdGs3PIIgKMlAC5TQR4nqDCiqgzTo0JPCeyPIYmFy8iwigM+
CC280ZHWIkvOYMOBM4VswcSzDF8XRZG/vIf7tV3jp0XtJ1EddL+GM18eLh4vIWt8
b7PZnabtIBvITzjjW5Np05kQu1W3riqBveyQHz2fB1IpwTO2mXPyal1vw1Wrj5WQ
QlFJjUTNxk0J4yVNHQbErrtBnGE0Zl4brcq7HNGsd0d0knOJ8F/edSB+USdeqNMH
qBd1GTI1a/dNQ2oC34zvq+cUSzz4L+pK+2nJSeWpb/WAn0V/EgnH6APluxwpUs6i
QqTqyCu2ecrvR0hhEiRNmBZW4+QaZe4aLrLClCEED+/nuvZf/9qLhIdEsq8bbHtP
SIHhTv8pMfT0uej0JxpLKJDLvsmOUUVKCRMW//kArCZHmHKKOPKKwuNCNCRbIzib
EN4HE4RU0BwVRZscMz7nTuSGqucN/Nrae1yC/k9C
-----END CERTIFICATE-----
25 changes: 25 additions & 0 deletions src/test/resources/subject-only-cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-----BEGIN CERTIFICATE-----
MIIEPzCCAicCCQDBswRdKTzMkDANBgkqhkiG9w0BAQsFADBaMQswCQYDVQQGEwJJ
TDERMA8GA1UECAwIVGVsLUF2aXYxETAPBgNVBAcMCFRlbC1Bdml2MRAwDgYDVQQK
DAdva3JvLmlvMRMwEQYDVQQDDApva3JvLmlvIENBMB4XDTE4MDcyOTEzMzE1MFoX
DTI4MDcyNjEzMzE1MFowaTELMAkGA1UEBhMCSUwxETAPBgNVBAgMCFRlbC1Bdml2
MREwDwYDVQQHDAhUZWwtQXZpdjEQMA4GA1UECgwHb2tyby5pbzETMBEGA1UECwwK
YXJjaGl0ZWN0czENMAsGA1UEAwwEc3J2MjCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBANxdN6zBI4lm86BX9dJ3tsT3OoJ6Ei6AxcfZmVydQJZ/XlOOWIKl
zceANkSeRV4XKfy6qePHHNIMY1jD1I/tLsCWXMy0RuNMmU9ea+ZIY9s3+WkMzXTD
2vdtMM+8nxnwa3aR276r3Bna0ics/KEZG7GrmnQ3OzFMJojGLfowKXIMuZXUkTZE
vGmvpwuaxUVmUsiogMGTIGtCCKCjzSrsrJc6yEK7gygiBInUtZj/2fqHita2hN6M
e1MyCDkIATtO6Q4ahrlWrKwEW1Zn4bgjvXUjyFVlGUfOlm4UeICsp2T1zHPNWGqc
yz+Drp3mWHCWJZBnXLc7xZiXPazw2fRUuMECAwEAATANBgkqhkiG9w0BAQsFAAOC
AgEAD+DOZqMUFF1CboIX9YVokXuWwINLYAYruhQF5kgwU1hLaWDJh+FGOKO4doNO
G4DDGkXFXM+5u6gl7R84sF2thx+mcPABsUheWqOOToEeHpe5+sgjtdMlnOutRHuD
q445OseLARd2XtiGFC7SYFk/mrmeH9N29rA5x/w0uS4eFPQYtVcnhQd8LkUExKjL
2KnYPr0lD5TIHN20mlf0AxRbMa9Mjz2vUTLlqwqG7IQJWYco+VbI76i/walOv0+Z
SQNsoKPF+du10Tg0NGQtcQ9CY/C3lNZLxofKXI0JiAlcjJ5emFYh8MsbUS4ESfjH
dut5RyiTDLa0/LZa5w2CBi0fGSFhGK7+q7xPtvGczD+YIWDePQEkcmsKSoyyRzlu
xl6F8C1nT5xj0qAaxcRpfNuTUhkaqnEvzw0BJRS4Kajd/s5/sTCzqcAdK1Rwy6AI
M+7nqZM4iHE/yx/uxapu7RWiGBi48SBIThOYOizrYktH/eIYYJ0fEGLAqGu3mdV0
bBQxcFXSzMypfzAcwIXUNedpqsHCucbJHcxsMYymm2jAjlgU8VcJRvmW4u2JNizf
yzEttSYG2Zti7hJJtWCf9Ek7kaXusC6Uhii6qnCqEWulE8DrZl4Y6ZYKYpe0UMsm
V7kOqym53MchmkaBrc8yVjDE9R6USb/1OvQrjf6PjyFLIkE=
-----END CERTIFICATE-----