Skip to content

Commit

Permalink
cleanup code/deps
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Gilat committed Jul 31, 2018
1 parent 996763b commit 7789210
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 187 deletions.
29 changes: 7 additions & 22 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,34 @@
</plugins>
</build>


<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.easymock/easymock -->
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.6</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>

</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>

<scope>test</scope>
</dependency>
</dependencies>

Expand Down
81 changes: 47 additions & 34 deletions src/main/java/io/okro/kafka/SpiffePrincipalBuilder.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.okro.kafka;

import org.apache.kafka.common.security.auth.*;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
Expand All @@ -10,56 +15,64 @@
import java.util.Collection;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.commons.lang3.StringUtils.startsWith;

public class SpiffePrincipalBuilder implements KafkaPrincipalBuilder {
private static final Logger LOG = LoggerFactory.getLogger(SpiffePrincipalBuilder.class);

private static final String SPIFFE_TYPE = "SPIFFE";

public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof PlaintextAuthenticationContext) {
if (!(context instanceof SslAuthenticationContext)) {
LOG.trace("non-SSL connection coerced to ANONYMOUS");
return KafkaPrincipal.ANONYMOUS;
}

if (!(context instanceof SslAuthenticationContext)) {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
SSLSession session = ((SslAuthenticationContext) context).session();
X509Certificate cert = firstX509(session);
if (cert == null) {
LOG.trace("first peer certificate missing / not x509");
return KafkaPrincipal.ANONYMOUS;
}

SSLSession sslSession = ((SslAuthenticationContext) context).session();
String spiffeId = spiffeId(cert);
if (spiffeId == null) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
}

return new KafkaPrincipal(SPIFFE_TYPE, spiffeId);
}

private X509Certificate firstX509(SSLSession session) {
try {
Certificate[] peerCerts = sslSession.getPeerCertificates();
if (peerCerts == null || peerCerts.length == 0) {
return KafkaPrincipal.ANONYMOUS;
Certificate[] peerCerts = session.getPeerCertificates();
if (peerCerts.length == 0) {
return null;
}
if (!(peerCerts[0] instanceof X509Certificate)) {
return KafkaPrincipal.ANONYMOUS;
Certificate first = peerCerts[0];
if (!(first instanceof X509Certificate)) {
return null;
}
X509Certificate cert = (X509Certificate) peerCerts[0];

Collection<List<?>> sanCollection = cert.getSubjectAlternativeNames();
KafkaPrincipal principal;
return (X509Certificate) first;
} catch (SSLPeerUnverifiedException e) {
LOG.warn("failed to extract certificate", e);
return null;
}
}

if (sanCollection != null) {
principal = sanCollection.stream()
.map(san -> (String) san.get(1))
.filter(uri -> startsWith(uri, "spiffe://"))
.findFirst()
.map(s -> new KafkaPrincipal(SPIFFE_TYPE, s))
.orElse(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName()));
} else {
principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, cert.getSubjectX500Principal().getName());
private String spiffeId(X509Certificate cert) {
try {
Collection<List<?>> sans = cert.getSubjectAlternativeNames();
if (sans == null) {
return null;
}

LOG.debug("PrincipalBuilder found principal: {}", principal.toString());

return principal;
} catch (SSLPeerUnverifiedException | CertificateParsingException se) {
LOG.warn("Unhandled exception: " + se.toString());
return KafkaPrincipal.ANONYMOUS;
return sans.stream()
.map(san -> (String) san.get(1))
.filter(uri -> uri.startsWith("spiffe://"))
.findFirst()
.orElse(null);
} catch (CertificateParsingException e) {
LOG.warn("failed to parse SAN", e);
return null;
}
}
}
187 changes: 56 additions & 131 deletions src/test/java/io.okro.kafka/SpiffePrincipalBuilderTest.java
Original file line number Diff line number Diff line change
@@ -1,162 +1,87 @@
package io.okro.kafka;

import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.net.InetAddress;
import javax.net.ssl.SSLSession;

import org.apache.kafka.common.security.auth.*;

import org.apache.commons.io.IOUtils;

import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class SpiffePrincipalBuilderTest extends EasyMockSupport {
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

private X509Certificate getResourceAsCert(String resourcePath)
throws java.io.IOException, java.security.cert.CertificateException {
public class SpiffePrincipalBuilderTest {

private SslAuthenticationContext mockedSslContext(String certPath) throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
// load cert
ClassLoader classLoader = getClass().getClassLoader();
try {
// Read cert
ByteArrayInputStream certInputStream =
new ByteArrayInputStream(IOUtils.toByteArray(classLoader.getResourceAsStream(resourcePath)));

// Parse as X509 certificate
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
return (X509Certificate) certificateFactory.generateCertificate(certInputStream);

} catch (java.io.IOException | java.security.cert.CertificateException e) {
System.out.println("Problem with reading the certificate file. " + e.toString());
throw e;
}
InputStream in = classLoader.getResourceAsStream(certPath);
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
X509Certificate cert = (X509Certificate) certificateFactory.generateCertificate(in);

// mock ssl session
SSLSession session = mock(SSLSession.class);
when(session.getPeerCertificates()).thenReturn(new Certificate[]{cert});
return new SslAuthenticationContext(session, InetAddress.getLocalHost());
}

/**
* X509 V3 with a SPIFFE-based SAN extension.
* Should result in 'SPIFFE:[spiffe://uri]'
*/
@Test
public void TestSubjectOnlyCert() {
// Tests an X509 V1 certificate with no SAN extension
public void TestSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

try {
X509Certificate cert = getResourceAsCert("subject-only-cert.pem");

// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
SSLSession session = mock(SSLSession.class);
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});

replayAll();

// Build KafkaPrincipal
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();

KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost()));

// Identity type should be "User"
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());

// Identity should be a string
assertNotNull(principal.getName());

System.out.println("Principal: " + principal.toString());

} catch (java.io.IOException | java.security.cert.CertificateException e) {
System.out.println("Problem with reading the certificate file. " + e.toString());
}
assertEquals("SPIFFE", principal.getPrincipalType());
assertEquals(principal.getName(), "spiffe://srv1.okro.io");
}

/**
* X509 V1 certificate with no SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSpiffeCert() {
// Tests an X509 V3 with SAN extension holding a SPIFFE ID

try {
X509Certificate cert = getResourceAsCert("spiffe-cert.pem");

// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
SSLSession session = mock(SSLSession.class);
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});
public void TestSubjectOnlyCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("subject-only-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

replayAll();

// Build KafkaPrincipal
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();

KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost()));

// Identity type should be "SPIFFE"
assertEquals("SPIFFE", principal.getPrincipalType());

// Identity should be a string
assertNotNull(principal.getName());

System.out.println("Principal: " + principal.toString());

} catch (java.io.IOException | java.security.cert.CertificateException e) {
System.out.println("Problem with reading the certificate file. " + e.toString());
}
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv2,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* X509 V3 with a non-SPIFFE SAN extension.
* Should fall back to 'User:CN=[CN]'
*/
@Test
public void TestSanNoSpiffeCert() {
// Tests an X509 V3 with SAN extension holding a regular FQDN

try {
X509Certificate cert = getResourceAsCert("san-no-spiffe-cert.pem");

// Mock SSLSession getPeerCertificates(), we bypass alllll the handshake parts because... out of scope.
SSLSession session = mock(SSLSession.class);
EasyMock.expect(session.getPeerCertificates()).andReturn(new Certificate[] {cert});

replayAll();

// Build KafkaPrincipal
SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();

KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost()));
public void TestSanNoSpiffeCert() throws CertificateException, SSLPeerUnverifiedException, UnknownHostException {
SslAuthenticationContext context = mockedSslContext("san-no-spiffe-cert.pem");
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

// Identity type should be "User"
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());

// Identity should be a string
assertNotNull(principal.getName());

System.out.println("Principal: " + principal.toString());

} catch (java.io.IOException | java.security.cert.CertificateException e) {
System.out.println("Problem with reading the certificate file. " + e.toString());
}
assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
assertEquals(principal.getName(), "CN=srv3,OU=architects,O=okro.io,L=Tel-Aviv,ST=Tel-Aviv,C=IL");
}

/**
* Non-SSL context.
* Should be unauthenticated.
*/
@Test
public void TestNoSSLContext() throws java.net.UnknownHostException {
// Tests non-SSL context behavior

SpiffePrincipalBuilder builder = new SpiffePrincipalBuilder();

KafkaPrincipal principal = builder.build(
new PlaintextAuthenticationContext(InetAddress.getLocalHost()));
PlaintextAuthenticationContext context = new PlaintextAuthenticationContext(InetAddress.getLocalHost());
KafkaPrincipal principal = new SpiffePrincipalBuilder().build(context);

// Identity type should be KafkaPrincipal.ANONYMOUS
assertEquals(KafkaPrincipal.ANONYMOUS, principal);

System.out.println("Principal: " + principal.toString());
}

@Test
public void TestAwareness() throws InterruptedException {
// Tests a reviewer's awareness
TimeUnit.SECONDS.sleep(1);

// Identity type should be KafkaPrincipal.ANONYMOUS
assertEquals(42, 42);
}
}

0 comments on commit 7789210

Please sign in to comment.