Skip to content

Commit

Permalink
Add support for Cassandra cluster using SSL
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and martint committed Oct 7, 2019
1 parent ba3c4db commit a053dfc
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 3 deletions.
5 changes: 5 additions & 0 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>security</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -71,6 +72,11 @@ public class CassandraClientConfig
private int speculativeExecutionLimit = 1;
private Duration speculativeExecutionDelay = new Duration(500, MILLISECONDS);
private ProtocolVersion protocolVersion;
private boolean tlsEnabled;
private File keystorePath;
private String keystorePassword;
private File truststorePath;
private String truststorePassword;

@NotNull
@Size(min = 1)
Expand Down Expand Up @@ -409,4 +415,66 @@ public CassandraClientConfig setProtocolVersion(ProtocolVersion version)
this.protocolVersion = version;
return this;
}

public boolean isTlsEnabled()
{
return tlsEnabled;
}

@Config("cassandra.tls.enabled")
public CassandraClientConfig setTlsEnabled(boolean tlsEnabled)
{
this.tlsEnabled = tlsEnabled;
return this;
}

public Optional<File> getKeystorePath()
{
return Optional.ofNullable(keystorePath);
}

@Config("cassandra.tls.keystore-path")
public CassandraClientConfig setKeystorePath(File keystorePath)
{
this.keystorePath = keystorePath;
return this;
}

public Optional<String> getKeystorePassword()
{
return Optional.ofNullable(keystorePassword);
}

@Config("cassandra.tls.keystore-password")
@ConfigSecuritySensitive
public CassandraClientConfig setKeystorePassword(String keystorePassword)
{
this.keystorePassword = keystorePassword;
return this;
}

public Optional<File> getTruststorePath()
{
return Optional.ofNullable(truststorePath);
}

@Config("cassandra.tls.truststore-path")
public CassandraClientConfig setTruststorePath(File truststorePath)
{
this.truststorePath = truststorePath;
return this;
}

public Optional<String> getTruststorePassword()
{
return Optional.ofNullable(truststorePassword);
}

@Config("cassandra.tls.truststore-password")
@ConfigSecuritySensitive
public CassandraClientConfig setTruststorePassword(String truststorePassword)
{
this.truststorePassword = truststorePassword;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.plugin.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy;
Expand All @@ -28,17 +29,40 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import io.airlift.security.pem.PemReader;
import io.prestosql.spi.PrestoException;

import javax.inject.Singleton;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import javax.security.auth.x500.X500Principal;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateExpiredException;
import java.security.cert.CertificateNotYetValidException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.prestosql.plugin.cassandra.CassandraErrorCode.CASSANDRA_SSL_INITIALIZATION_FAILURE;
import static java.lang.Math.toIntExact;
import static java.util.Collections.list;
import static java.util.Objects.requireNonNull;

public class CassandraClientModule
Expand Down Expand Up @@ -115,6 +139,10 @@ public static CassandraSession createCassandraSession(CassandraClientConfig conf
if (config.getClientSoLinger() != null) {
socketOptions.setSoLinger(config.getClientSoLinger());
}
if (config.isTlsEnabled()) {
buildSslContext(config.getKeystorePath(), config.getKeystorePassword(), config.getTruststorePath(), config.getTruststorePassword())
.ifPresent(context -> clusterBuilder.withSSL(JdkSSLOptions.builder().withSSLContext(context).build()));
}
clusterBuilder.withSocketOptions(socketOptions);

if (config.getUsername() != null && config.getPassword() != null) {
Expand All @@ -140,4 +168,116 @@ public static CassandraSession createCassandraSession(CassandraClientConfig conf
}),
config.getNoHostAvailableRetryTimeout());
}

private static Optional<SSLContext> buildSslContext(
Optional<File> keystorePath,
Optional<String> keystorePassword,
Optional<File> truststorePath,
Optional<String> truststorePassword)
{
if (!keystorePath.isPresent() && !truststorePath.isPresent()) {
return Optional.empty();
}

try {
// load KeyStore if configured and get KeyManagers
KeyStore keystore = null;
KeyManager[] keyManagers = null;
if (keystorePath.isPresent()) {
char[] keyManagerPassword;
try {
// attempt to read the key store as a PEM file
keystore = PemReader.loadKeyStore(keystorePath.get(), keystorePath.get(), keystorePassword);
// for PEM encoded keys, the password is used to decrypt the specific key (and does not protect the keystore itself)
keyManagerPassword = new char[0];
}
catch (IOException | GeneralSecurityException ignored) {
keyManagerPassword = keystorePassword.map(String::toCharArray).orElse(null);

keystore = KeyStore.getInstance(KeyStore.getDefaultType());
try (InputStream in = new FileInputStream(keystorePath.get())) {
keystore.load(in, keyManagerPassword);
}
}
validateCertificates(keystore);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keystore, keyManagerPassword);
keyManagers = keyManagerFactory.getKeyManagers();
}

// load TrustStore if configured, otherwise use KeyStore
KeyStore truststore = keystore;
if (truststorePath.isPresent()) {
truststore = loadTrustStore(truststorePath.get(), truststorePassword);
}

// create TrustManagerFactory
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(truststore);

// get X509TrustManager
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
if ((trustManagers.length != 1) || !(trustManagers[0] instanceof X509TrustManager)) {
throw new RuntimeException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
}
X509TrustManager trustManager = (X509TrustManager) trustManagers[0];

// create SSLContext
SSLContext result = SSLContext.getInstance("SSL");
result.init(keyManagers, new TrustManager[] {trustManager}, null);
return Optional.of(result);
}
catch (GeneralSecurityException | IOException e) {
throw new PrestoException(CASSANDRA_SSL_INITIALIZATION_FAILURE, e);
}
}

private static KeyStore loadTrustStore(File trustStorePath, Optional<String> trustStorePassword)
throws IOException, GeneralSecurityException
{
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
try {
// attempt to read the trust store as a PEM file
List<X509Certificate> certificateChain = PemReader.readCertificateChain(trustStorePath);
if (!certificateChain.isEmpty()) {
trustStore.load(null, null);
for (X509Certificate certificate : certificateChain) {
X500Principal principal = certificate.getSubjectX500Principal();
trustStore.setCertificateEntry(principal.getName(), certificate);
}
return trustStore;
}
}
catch (IOException | GeneralSecurityException ignored) {
}

try (InputStream in = new FileInputStream(trustStorePath)) {
trustStore.load(in, trustStorePassword.map(String::toCharArray).orElse(null));
}
return trustStore;
}

private static void validateCertificates(KeyStore keyStore)
throws GeneralSecurityException
{
for (String alias : list(keyStore.aliases())) {
if (!keyStore.isKeyEntry(alias)) {
continue;
}
Certificate certificate = keyStore.getCertificate(alias);
if (!(certificate instanceof X509Certificate)) {
continue;
}

try {
((X509Certificate) certificate).checkValidity();
}
catch (CertificateExpiredException e) {
throw new CertificateExpiredException("KeyStore certificate is expired: " + e.getMessage());
}
catch (CertificateNotYetValidException e) {
throw new CertificateNotYetValidException("KeyStore certificate is not yet valid: " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
public enum CassandraErrorCode
implements ErrorCodeSupplier
{
CASSANDRA_METADATA_ERROR(0, EXTERNAL), CASSANDRA_VERSION_ERROR(1, EXTERNAL);
CASSANDRA_METADATA_ERROR(0, EXTERNAL),
CASSANDRA_VERSION_ERROR(1, EXTERNAL),
CASSANDRA_SSL_INITIALIZATION_FAILURE(2, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.io.File;
import java.util.Map;

import static com.datastax.driver.core.ProtocolVersion.V2;
Expand Down Expand Up @@ -60,7 +61,12 @@ public void testDefaults()
.setNoHostAvailableRetryTimeout(new Duration(1, MINUTES))
.setSpeculativeExecutionLimit(1)
.setSpeculativeExecutionDelay(new Duration(500, MILLISECONDS))
.setProtocolVersion(null));
.setProtocolVersion(null)
.setTlsEnabled(false)
.setKeystorePath(null)
.setKeystorePassword(null)
.setTruststorePath(null)
.setTruststorePassword(null));
}

@Test
Expand Down Expand Up @@ -93,6 +99,11 @@ public void testExplicitPropertyMappings()
.put("cassandra.speculative-execution.limit", "10")
.put("cassandra.speculative-execution.delay", "101s")
.put("cassandra.protocol-version", "V2")
.put("cassandra.tls.enabled", "true")
.put("cassandra.tls.keystore-path", "/tmp/keystore")
.put("cassandra.tls.keystore-password", "keystore-password")
.put("cassandra.tls.truststore-path", "/tmp/truststore")
.put("cassandra.tls.truststore-password", "truststore-password")
.build();

CassandraClientConfig expected = new CassandraClientConfig()
Expand Down Expand Up @@ -121,7 +132,12 @@ public void testExplicitPropertyMappings()
.setNoHostAvailableRetryTimeout(new Duration(3, MINUTES))
.setSpeculativeExecutionLimit(10)
.setSpeculativeExecutionDelay(new Duration(101, SECONDS))
.setProtocolVersion(V2);
.setProtocolVersion(V2)
.setTlsEnabled(true)
.setKeystorePath(new File("/tmp/keystore"))
.setKeystorePassword("keystore-password")
.setTruststorePath(new File("/tmp/truststore"))
.setTruststorePassword("truststore-password");

assertFullMapping(properties, expected);
}
Expand Down
10 changes: 10 additions & 0 deletions presto-docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ Property Name Description
``cassandra.speculative-execution.limit`` The number of speculative executions (defaults to ``1``).

``cassandra.speculative-execution.delay`` The delay between each speculative execution (defaults to ``500ms``).

``cassandra.tls.enabled`` Whether TLS security is enabled (defaults to ``false``).

``cassandra.tls.keystore-path`` Path to the PEM or JKS key store.

``cassandra.tls.truststore-path`` Path to the PEM or JKS trust store.

``cassandra.tls.keystore-password`` Password for the key store.

``cassandra.tls.truststore-password`` Password for the trust store.
============================================================= ======================================================================

Querying Cassandra Tables
Expand Down

0 comments on commit a053dfc

Please sign in to comment.