Skip to content

Commit

Permalink
test: fixed integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
nateab committed Sep 1, 2021
1 parent 97890c0 commit 910a7fa
Showing 1 changed file with 67 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@

package io.confluent.ksql.api.integration;

import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER2;
import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_ENABLED;
import static io.confluent.ksql.util.KsqlConfig.KSQL_QUERY_PUSH_SCALABLE_MAX_HOURLY_BANDWIDTH_MEGABYTES_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.api.utils.QueryResponse;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.integration.QueryStreamSubscriber;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.serde.FormatFactory;
Expand All @@ -42,17 +43,14 @@
import io.confluent.ksql.test.util.secure.SecureKafkaHelper;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -65,7 +63,7 @@

@Category({IntegrationTest.class})
public class ScalablePushBandwidthThrottleIntegrationTest {

private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for queries.";
private static final PageViewDataProvider TEST_DATA_PROVIDER = new PageViewDataProvider();
private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName();
private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();
Expand Down Expand Up @@ -98,7 +96,11 @@ public class ScalablePushBandwidthThrottleIntegrationTest {
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS)
.around(REST_APP);
private static final String RATE_LIMIT_MESSAGE = "Host is at bandwidth rate limit for queries.";

private Vertx vertx;
private KsqlRestClient restClient;
private StreamPublisher<StreamedRow> publisher;
private QueryStreamSubscriber subscriber;

@BeforeClass
public static void setUpClass() {
Expand All @@ -118,20 +120,14 @@ public static void classTearDown() {
REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";"));
}

private Vertx vertx;
private WebClient client;

@Before
public void setUp() {
vertx = Vertx.vertx();
client = createClient();
restClient = REST_APP.buildKsqlClient();
}

@After
public void tearDown() {
if (client != null) {
client.close();
}
if (vertx != null) {
vertx.close();
}
Expand All @@ -141,26 +137,31 @@ public void tearDown() {
@SuppressFBWarnings({"DLS_DEAD_LOCAL_STORE"})
@Test
public void scalablePushBandwidthThrottleTest() {
assertAllPersistentQueriesRunning();
String veryLong = createDataSize(100000);

final CompletableFuture<StreamedRow> header = new CompletableFuture<>();
final CompletableFuture<List<StreamedRow>> complete = new CompletableFuture<>();
String sql = "SELECT CONCAT(\'"+ veryLong + "\') as placeholder from " + AGG_TABLE + " EMIT CHANGES LIMIT 1;";

//the scalable push query should go through 2 times
for (int i = 0; i < 2; i += 1) {
AtomicReference<QueryResponse> atomicReference1 = new AtomicReference<>();
assertThatEventually(() -> {
QueryResponse queryResponse1 = executeQuery(sql);
atomicReference1.set(queryResponse1);
return queryResponse1.rows;
}, hasSize(5));
// scalable push query should succeed 10 times
for (int i = 0; i < 11; i += 1) {
makeRequestAndSetupSubscriber(sql,
ImmutableMap.of("auto.offset.reset", "latest"),
header, complete );
TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON, FormatFactory.JSON);
System.out.println(i);
}

//the third try should fail
try {
QueryResponse queryResponse3 = executeQuery(sql);
} catch (KsqlException e) {
assertEquals(RATE_LIMIT_MESSAGE, e.getMessage());
}
// scalable push query should fail on 11th try since it exceeds 1MB bandwidth limit
try {
makeQueryRequest(sql,
ImmutableMap.of("auto.offset.reset", "latest"));
TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON, FormatFactory.JSON);

throw new AssertionError("New scalable push query should have exceeded bandwidth limit ");
} catch (KsqlException e) {
assertThat(e.getMessage(), is(RATE_LIMIT_MESSAGE));
}
}

private static String createDataSize(int msgSize) {
Expand All @@ -171,56 +172,42 @@ private static String createDataSize(int msgSize) {
return sb.toString();
}


private void shouldFailToExecuteQuery(final String sql, final String message) {
// When:
QueryResponse response = executeQuery(sql);

// Then:
assertThat(response.rows, hasSize(0));
assertThat(response.responseObject.getInteger("error_code"),
is(ERROR_CODE_BAD_STATEMENT));
assertThat(response.responseObject.getString("message"),
startsWith(message));
}

private QueryResponse executeQuery(final String sql) {
return executeQueryWithVariables(sql, new JsonObject());
}

private QueryResponse executeQueryWithVariables(final String sql, final JsonObject variables) {
JsonObject properties = new JsonObject();
JsonObject requestBody = new JsonObject()
.put("sql", sql).put("properties", properties).put("sessionVariables", variables);
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
return new QueryResponse(response.bodyAsString());
private static void makeKsqlRequest(final String sql) {
RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
}

private WebClient createClient() {
WebClientOptions options = new WebClientOptions().
setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false)
.setDefaultHost("localhost").setDefaultPort(REST_APP.getListeners().get(0).getPort());
return WebClient.create(vertx, options);
private void makeRequestAndSetupSubscriber(
final String sql,
final Map<String, ?> properties,
final CompletableFuture<StreamedRow> header,
final CompletableFuture<List<StreamedRow>> future
) {
publisher = makeQueryRequest(sql, properties);
subscriber = new QueryStreamSubscriber(publisher.getContext(), future, header);
publisher.subscribe(subscriber);
}

private HttpResponse<Buffer> sendRequest(final String uri, final Buffer requestBody) {
return sendRequest(client, uri, requestBody);
}
StreamPublisher<StreamedRow> makeQueryRequest(
final String sql,
final Map<String, ?> properties
) {
final RestResponse<StreamPublisher<StreamedRow>> res =
restClient.makeQueryRequestStreamed(sql, null, properties);

private HttpResponse<Buffer> sendRequest(final WebClient client, final String uri,
final Buffer requestBody) {
VertxCompletableFuture<HttpResponse<Buffer>> requestFuture = new VertxCompletableFuture<>();
client
.post(uri)
.sendBuffer(requestBody, requestFuture);
try {
return requestFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
if (res.isErroneous()) {
throw new KsqlException(res.getErrorMessage().getMessage());
}
return res.getResponse();
}

private static void makeKsqlRequest(final String sql) {
RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
private void assertAllPersistentQueriesRunning() {
assertThatEventually(() -> {
for (final PersistentQueryMetadata metadata : REST_APP.getEngine().getPersistentQueries()) {
if (metadata.getState() != State.RUNNING) {
return false;
}
}
return true;
}, is(true));
}
}

0 comments on commit 910a7fa

Please sign in to comment.