Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: /query-stream endpoint now defaults to auto.offset.reset=latest #5588

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ public class ClientTlsMutualAuthTest extends ClientTlsTest {

@Override
protected KsqlRestConfig createServerConfig() {
final Map<String, Object> config = serverConfigWithTls();
config.put(
KsqlRestConfig config = super.createServerConfig();
Map<String, Object> origs = config.originals();
origs.put(
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_CONFIG,
KsqlRestConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED
);
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUST_STORE_PATH);
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, TRUST_STORE_PASSWORD);
return new KsqlRestConfig(config);
origs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUST_STORE_PATH);
origs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, TRUST_STORE_PASSWORD);
return new KsqlRestConfig(origs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public class ClientTlsTest extends ClientTest {

@Override
protected KsqlRestConfig createServerConfig() {
return new KsqlRestConfig(serverConfigWithTls());
KsqlRestConfig config = super.createServerConfig();
Map<String, Object> origs = config.originals();
origs.put(KsqlRestConfig.LISTENERS_CONFIG, "https://localhost:0");
origs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEY_STORE_PATH);
origs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, KEY_STORE_PASSWORD);
return new KsqlRestConfig(origs);
}

@Override
Expand Down Expand Up @@ -85,14 +90,4 @@ private ClientOptions clientOptionsWithoutTrustStore() {
.setVerifyHost(false)
.setUseAlpn(true);
}

protected static Map<String, Object> serverConfigWithTls() {
Map<String, Object> config = new HashMap<>();
config.put(KsqlRestConfig.LISTENERS_CONFIG, "https://localhost:0");
config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEY_STORE_PATH);
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, KEY_STORE_PASSWORD);
config.put(KsqlRestConfig.VERTICLE_INSTANCES, 4);

return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public QueryPublisher createQueryPublisher(
// Must be run on worker as all this stuff is slow
VertxUtils.checkIsWorker();

if (!properties.containsKey("auto.offset.reset")
&& !properties.containsKey("ksql.streams.auto.offset.reset")) {
properties.put("auto.offset.reset", "earliest");
}

final ConfiguredStatement<Query> statement = createStatement(sql, properties.getMap());

if (statement.getStatement().isPullQuery()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void main(String[] args) {
private static final String DEFAULT_PULL_QUERY = "select * from foo where rowkey=123;";
private static final JsonObject DEFAULT_PULL_QUERY_REQUEST_BODY = new JsonObject()
.put("sql", DEFAULT_PULL_QUERY)
.put("properties", new JsonObject());
.put("properties", new JsonObject().put("auto.offset.reset", "earliest"));
private static final List<GenericRow> DEFAULT_ROWS = generateResults();
private static final int MAX_CONCURRENT_REQUESTS = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class QueryStreamRunner extends BasePerfRunner {
private static final String DEFAULT_PUSH_QUERY = "select * from foo emit changes;";
private static final JsonObject DEFAULT_PUSH_QUERY_REQUEST_BODY = new JsonObject()
.put("sql", DEFAULT_PUSH_QUERY)
.put("properties", new JsonObject());
.put("properties", new JsonObject().put("auto.offset.reset", "earliest"));

public static void main(String[] args) {
new QueryStreamRunner().go();
Expand Down