Skip to content

Commit

Permalink
fix: Make sure internal client is configured for TLS (#5059)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Apr 15, 2020
1 parent 0b27b3f commit 37c7713
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.RestResponse;
Expand Down Expand Up @@ -108,7 +107,7 @@ public static void classSetUp() {

@Before
public void setUp() {
clientProps = ImmutableMap.of(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true");
clientProps = ImmutableMap.of();
sslContextFactory = new Server();
}

Expand Down Expand Up @@ -174,7 +173,6 @@ private void givenClientConfiguredWithCertificate() {
.putAll(ClientTrustStore.trustStoreProps())
.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, clientCertPath)
.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, clientCertPassword)
.put(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true")
.build();

// WS:
Expand All @@ -190,7 +188,6 @@ private void givenClientConfiguredWithoutCertificate() {
// HTTP:
clientProps = ImmutableMap.<String, String>builder()
.putAll(ClientTrustStore.trustStoreProps())
.put(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true")
.build();

// WS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.client.RestResponse;
Expand Down Expand Up @@ -110,7 +109,7 @@ public static void classSetUp() {

@Before
public void setUp() {
clientProps = ImmutableMap.of(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true");
clientProps = ImmutableMap.of();
sslContextFactory = new Server();
}

Expand Down Expand Up @@ -174,7 +173,6 @@ private void givenTrustStoreConfigured() {
// HTTP:
clientProps = ImmutableMap.<String, String>builder()
.putAll(ClientTrustStore.trustStoreProps())
.put(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true")
.build();

// WS:
Expand All @@ -184,7 +182,7 @@ private void givenTrustStoreConfigured() {
}

private void givenClientConfguredWithoutTruststore() {
clientProps = ImmutableMap.of(KsqlClient.DISABLE_HOSTNAME_VERIFICATION_PROP_NAME, "true");
clientProps = ImmutableMap.of();
}

private Code canMakeCliRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.util.KsqlHostInfo;
import io.vertx.core.http.HttpClientOptions;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,11 +46,11 @@ final class DefaultKsqlClient implements SimpleKsqlClient {
private final Optional<String> authHeader;
private final KsqlClient sharedClient;

DefaultKsqlClient(final Optional<String> authHeader) {
DefaultKsqlClient(final Optional<String> authHeader, final Map<String, Object> clientProps) {
this(
authHeader,
new KsqlClient(
ImmutableMap.of(),
toClientProps(clientProps),
Optional.empty(),
new LocalProperties(ImmutableMap.of()),
createClientOptions()
Expand All @@ -74,7 +75,8 @@ public RestResponse<KsqlEntityList> makeKsqlRequest(
final KsqlTarget target = sharedClient
.target(serverEndPoint);

return getTarget(target, authHeader).postKsqlRequest(sql, requestProperties, Optional.empty());
return getTarget(target, authHeader)
.postKsqlRequest(sql, requestProperties, Optional.empty());
}

@Override
Expand Down Expand Up @@ -154,4 +156,13 @@ private static HttpClientOptions createClientOptions() {
return new HttpClientOptions().setMaxPoolSize(100);
}

private static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ public static ServiceContext create(
kafkaClientSupplier,
srClientFactory,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
authHeader),
() -> new DefaultKsqlClient(authHeader)
authHeader),
() -> new DefaultKsqlClient(authHeader, ksqlConfig.originals())
);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public void shouldHandleMalformedJsonInInsertsStream() throws Exception {
validateInsertStreamError(ERROR_CODE_MALFORMED_REQUEST, "Invalid JSON in inserts stream",
insertsResponse.error, (long) rows.size() - 1);

assertThat(testEndpoints.getInsertsSubscriber().isCompleted(), is(true));
assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isCompleted(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.integration;

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.Matchers.is;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.test.util.secure.ServerKeyStore;
import io.confluent.ksql.util.PageViewDataProvider;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import kafka.zookeeper.ZooKeeperClientException;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

@Category({IntegrationTest.class})
public class ShowQueriesMultiNodeWithTlsFunctionalTest {

private static final PageViewDataProvider PAGE_VIEWS_PROVIDER = new PageViewDataProvider();
private static final String PAGE_VIEW_TOPIC = PAGE_VIEWS_PROVIDER.topicName();
private static final String PAGE_VIEW_STREAM = PAGE_VIEWS_PROVIDER.kstreamName();
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlRestConfig.LISTENERS_CONFIG,
"http://localhost:8088,https://localhost:8089")
.withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "https://localhost:8089")
.withProperties(ServerKeyStore.keyStoreProps())
.withEnabledKsqlClient()
.build();
private static final TestKsqlRestApp REST_APP_1 = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlRestConfig.LISTENERS_CONFIG,
"http://localhost:8098,https://localhost:8099")
.withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "https://localhost:8099")
.withProperties(ServerKeyStore.keyStoreProps())
.withEnabledKsqlClient()
.build();

@ClassRule
public static final RuleChain CHAIN = RuleChain
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS)
.around(REST_APP_0)
.around(REST_APP_1);

@BeforeClass
public static void setUpClass() {
TEST_HARNESS.ensureTopics(2, PAGE_VIEW_TOPIC);
TEST_HARNESS.produceRows(PAGE_VIEW_TOPIC, PAGE_VIEWS_PROVIDER, FormatFactory.JSON);
RestIntegrationTestUtil.createStream(REST_APP_0, PAGE_VIEWS_PROVIDER);
RestIntegrationTestUtil.makeKsqlRequest(
REST_APP_0,
"CREATE STREAM S AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"
);
}

@Test
public void shouldShowAllQueries() {
// When:
final Supplier<String> app0Response = () -> getShowQueriesResult(REST_APP_0);
final Supplier<String> app1Response = () -> getShowQueriesResult(REST_APP_1);

// Then:
assertThatEventually("App0", app0Response, is("RUNNING:2"));
assertThatEventually("App1", app1Response, is("RUNNING:2"));
}

private static String getShowQueriesResult(final TestKsqlRestApp restApp) {
final List<KsqlEntity> results = RestIntegrationTestUtil.makeKsqlRequest(
restApp,
"Show Queries;"
);

if (results.size() != 1) {
return "Expected 1 response, got " + results.size();
}

final KsqlEntity result = results.get(0);

if (!(result instanceof Queries)) {
return "Expected Queries, got " + result;
}

final List<RunningQuery> runningQueries = ((Queries) result)
.getQueries();

if (runningQueries.size() != 1) {
return "Expected 1 running query, got " + runningQueries.size();
}

return runningQueries.get(0).getState().orElse("N/A");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public Builder withProperties(final Map<String, ?> props) {
public Builder withEnabledKsqlClient() {
this.serviceContext =
() -> defaultServiceContext(bootstrapServers, buildBaseConfig(additionalProps),
TestDefaultKsqlClientFactory::instance);
() -> TestDefaultKsqlClientFactory.instance(additionalProps));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.confluent.ksql.rest.server.services;

import io.confluent.ksql.services.SimpleKsqlClient;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -10,13 +11,8 @@
public class TestDefaultKsqlClientFactory {

// Creates an instance with no auth
public static SimpleKsqlClient instance() {
return new DefaultKsqlClient(Optional.empty());
}

// Creates an instance with the given auth headers
public static SimpleKsqlClient instance(final Optional<String> authHeader) {
return new DefaultKsqlClient(authHeader);
public static SimpleKsqlClient instance(Map<String, Object> clientProps) {
return new DefaultKsqlClient(Optional.empty(), clientProps);
}

}
Loading

0 comments on commit 37c7713

Please sign in to comment.