Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use source partition count and replica count as default for sink topics. #1930

Conversation

hjafarpour
Copy link
Contributor

Description

Currently we use the defaults from the KsqlConfig for number of partitions (default is 4) and number of replicas (default is 1) for sink topics if they are not specified in WITH clause.
This PR changes this behavior by using the source partition count and replica count for sink topic if we don't have them specified in the WITH clause.
In case of JOIN we will use the left hand side stream/table topic properties.

Testing done

Unit tests were added.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@hjafarpour hjafarpour requested a review from a team September 21, 2018 22:53
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour - left one comment.

@big-andy-coates big-andy-coates requested a review from a team September 24, 2018 17:06
final int partitions = (Integer) outputProperties.getOrDefault(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY,
ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY));
sourceTopicProperties.getPartitions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our admin client wrapper will validate that this number matches the partition count of the underlying topic. We should disable that check when creating sink topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it as we talked offline.

@hjafarpour hjafarpour force-pushed the Use-Source-Topic-Props-If-Not-Specified branch from ab9b4c7 to e794ca2 Compare September 25, 2018 21:05
@big-andy-coates
Copy link
Contributor

@hjafarpour, in case you've missed it... #1930 (comment)

@big-andy-coates
Copy link
Contributor

@hjafarpour any thoughts on this?

@hjafarpour
Copy link
Contributor Author

@big-andy-coates I updated this PR. Also refactored the test to use Mockito.

@hjafarpour hjafarpour requested review from big-andy-coates and a team December 20, 2018 00:41
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour, let's some comments.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour ! LGTM, minus a few minor things in comments.


public class KsqlStructuredDataOutputNodeTest {
private final KafkaTopicClient topicClient = EasyMock.createNiceMock(KafkaTopicClient.class);

private KafkaTopicClient topicClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move below static constants, e.g., to line 117.

@hjafarpour hjafarpour requested a review from a team January 16, 2019 04:38
@hjafarpour
Copy link
Contributor Author

@vcrfxia please check this: #1930 (comment)

@big-andy-coates big-andy-coates requested review from big-andy-coates and rodesai and removed request for a team January 22, 2019 15:48
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour,

It seems that KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY and SINK_NUMBER_OF_REPLICAS_PROPERTY are still used in the code. But I thought this PR was change the behaviour to copy the partition count and rf from the source. So in what situations are these two config items still used?

Also, it's unclear to me how we're maintaining backwards compatibility with existing running queries i.e. say the source topic has 8 partitions, and the sink was created with the default 4, then once the user upgrades won't KSQL complain because it expects the sink to have 8 partitions?

Otherwise the code is looking good.

Thanks,

Andy

Copy link
Contributor Author

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates I added backward compatibility using a new config property, sink.legacy.properties. Also since we have backward compatibility now we still use KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY and SINK_NUMBER_OF_REPLICAS_PROPERTY.

@@ -93,6 +93,14 @@
+ "'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement "
+ "'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'.";

public static final String KSQL_SINK_TOPIC_PROPERTIES_LEGACY_CONFIG =
KSQL_CONFIG_PROPERTY_PREFIX + "sink.legacy.properties";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather call this something like sink.topic.static.default.properties. It expresses the semantics more cleanly. legacy doesn't mean much over time.

Copy link
Contributor

@big-andy-coates big-andy-coates Feb 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider removing KSQL_SINK_TOPIC_PROPERTIES_LEGACY_CONFIG and instead moving both SINK_NUMBER_OF_PARTITIONS_PROPERTY and SINK_NUMBER_OF_REPLICAS_PROPERTY to be CompatibilityBreakingConfigDef instances. The new default for these properties could be null, meaning they are not set. The documentation should say they should not be set going forward. (Ideally, we need a way to hide these internal configs from users)

Going forward, we don't really want people setting either of these properties in their config or in their SET statements, i.e. we're not looking to support "static counts" as a feature. We want them to age out over time. Hence moving them out of the main config and into CompatibilityBreakingConfigDef. This also avoids adding a new config to manage something we'd like to get rid of.

private static final String KSQL_SINK_TOPIC_PROPERTIES_LEGACY_DOCS = "Use the default sink topic"
+ " properties instead of inferring from source when the properties are not set in the "
+ " WITH clause. Until version 5.1.x if the partition number and replicas were not specified"
+ " in the WITH cluase, KSQL used the default value. After version 5.1.x, KSQL will use the "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: cluase

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour,

Please see my comment above about dropping this new LEGACY config you've added in favour of moving the two existing configs into 'ComatibilityBreakingConfigDef's.

Also, can you add a json based test that proves things work as expected when these legacy properties are, and are not, set please?

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour, this is looking good.

Usual nits below:

@@ -361,7 +362,10 @@ public void shouldThrowIfSinkTopicHasDifferentPropertiesThanRequested() {
@Test
public void shouldUseLegacySinkPropertiesIfLegacyIsTrue() {
// Given:
when(ksqlConfig.getBoolean(KsqlConfig.KSQL_SINK_TOPIC_PROPERTIES_LEGACY_CONFIG)).thenReturn(true);
Mockito.<Object>when(ksqlConfig.values()).thenReturn((Map<String, ?>) ImmutableMap.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: drop the Mockito.<Object> and use the ImmutableMap.of generic args rather than casting:

Suggested change
Mockito.<Object>when(ksqlConfig.values()).thenReturn((Map<String, ?>) ImmutableMap.of(
when(ksqlConfig.values()).thenReturn(ImmutableMap.<Map<String, Object>of(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the casting but still needed Mockito.<Object>!

@@ -256,7 +256,11 @@ private static SourceTopicProperties getSourceTopicProperties(
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig
) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_SINK_TOPIC_PROPERTIES_LEGACY_CONFIG)) {
final Map ksqlProperties = ksqlConfig.values();
if ((ksqlProperties.containsKey(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this not just be:

if (ksqlConfig.get(SINK_NUMBER_OF_PARTITIONS_PROPERTY) != null
   || ksqlConfig.get(SINK_NUMBER_OF_REPLICAS_PROPERTY) != null) {
  ...
}

???

I don't think the containsKey adds anything as you're checking its not null and get returns null for any value that is not present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removed the contains check.

{
"tests": [
{
"name": "Set correct sink partitions from the source topic",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ambiguous name - it doesn't really explain what its testing ;)

Also, given this test isn't really about the inputs and outputs , it might benefit from a description field explaining how its testing this.

   "name":  "should copy partition and replica count from source topic",
   "description": "note: this isn't about input and outputs. It would fail if the counts are not copied across because the output topic 'S' already exists with the correct counts, and the execution path checks this"

Same for the other tests in this file. Alternatively, add this explanation once at the topic of the file.

 {
     "description":  "as above",
     "tests" : [

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the names.

when(ksqlConfig.getBoolean(KsqlConfig.KSQL_SINK_TOPIC_PROPERTIES_LEGACY_CONFIG)).thenReturn(true);
Mockito.<Object>when(ksqlConfig.values()).thenReturn((Map<String, ?>) ImmutableMap.of(
KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, KsqlConstants.defaultSinkNumberOfPartitions,
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, KsqlConstants.defaultSinkNumberOfReplications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth splitting this into two tests, where each test only has one of these two properties set. This then tests the cases where the other is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splited it into two tests, however, we cannot have one of them null and the other legacy. If we use legacy both should be null.

Type.INT,
4,
null,
Importance.MEDIUM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Importance.MEDIUM,
Importance.LOW,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


// Then:
verify(mockTopicClient).createTopic(SINK_KAFKA_TOPIC_NAME, 1, (short) 2, Collections.emptyMap());
assertThat(schemaKStream, instanceOf(SchemaKStream.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pointless assert: schemaKStream is declared as an instance of SchemaKStream.

// Given:
doThrow(KsqlException.class).when(mockTopicClient).createTopic(SINK_KAFKA_TOPIC_NAME, 1, (short) 2, Collections.emptyMap());
createOutputNode(Collections.singletonMap(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 2), true);
expectedException.expect(KsqlException.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: Add a //Then on the line above the `expectedException. e.g. a negative test might look like:

// Given
... set up preconditions

// Then:
... set up expected post conditions, i.e. the expected exception.

// When:
... the line on test.

KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, KsqlConstants.defaultSinkNumberOfPartitions,
KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, KsqlConstants.defaultSinkNumberOfReplications
));
when(ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)).thenReturn(KsqlConstants.defaultSinkNumberOfPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird double mocking of KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY in this test - but I think this will go away once you simplify the if statement in the prod code this is testing.

@@ -110,6 +114,8 @@ public void setUp() {
statementExecutor = new StatementExecutor(ksqlConfig, ksqlEngine, statementParser);
statementExecutorWithMocks
= new StatementExecutor(ksqlConfig, mockEngine, mockParser);
// doReturn(new TopicDescription("test1", true, Collections.singletonList(mock(TopicPartitionInfo.class))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tidy up the commented out code please?

@@ -54,6 +57,14 @@ public boolean isTopicExists(final String topic) {
return Collections.emptyMap();
}

@Override
public TopicDescription describeTopic(final String topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than hard coding your test requirements into the generic MockKafkaTopicClient you might be better of switching your test to use FakeKafkaTopicClient, which allows you to set up preconditions, like topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @hjafarpour - meant to approve! Please take a look at my comments above, as they still need addressing, but assuming you do, then this LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants