Skip to content

Commit

Permalink
feat: add 'show all topics' to include internal topics
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jan 15, 2020
1 parent b9cc6bc commit dab9bd6
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ singleExpression
statement
: query #queryStatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) TOPICS EXTENDED? #listTopics
| (LIST | SHOW) ALL? TOPICS EXTENDED? #listTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ public Node visitSelectSingle(final SqlBaseParser.SelectSingleContext context) {

@Override
public Node visitListTopics(final SqlBaseParser.ListTopicsContext context) {
return new ListTopics(getLocation(context), context.EXTENDED() != null);
return new ListTopics(getLocation(context),
context.ALL() != null, context.EXTENDED() != null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,23 @@

public class ListTopics extends Statement {

private final boolean showAll;
private final boolean showExtended;

public ListTopics(final Optional<NodeLocation> location, final boolean showExtended) {
public ListTopics(
final Optional<NodeLocation> location,
final boolean showAll,
final boolean showExtended
) {
super(location);
this.showAll = showAll;
this.showExtended = showExtended;
}

public boolean getShowAll() {
return showAll;
}

public boolean getShowExtended() {
return showExtended;
}
Expand All @@ -43,17 +53,18 @@ public boolean equals(final Object o) {
return false;
}
final ListTopics that = (ListTopics) o;
return showExtended == that.showExtended;
return showAll == that.showAll && showExtended == that.showExtended;
}

@Override
public int hashCode() {
return Objects.hash(showExtended);
return Objects.hash(showAll, showExtended);
}

@Override
public String toString() {
return toStringHelper(this)
.add("showAll", showAll)
.add("showExtended", showExtended)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public void shouldImplementHashCodeAndEqualsProperty() {
// Note: At the moment location does not take part in equality testing
new EqualsTester()
.addEqualityGroup(
new ListTopics(Optional.of(SOME_LOCATION), true),
new ListTopics(Optional.of(OTHER_LOCATION), true)
new ListTopics(Optional.of(SOME_LOCATION), true, true),
new ListTopics(Optional.of(OTHER_LOCATION), true, true)
)
.addEqualityGroup(
new ListTopics(Optional.of(SOME_LOCATION), false),
new ListTopics(Optional.of(OTHER_LOCATION), false)
new ListTopics(Optional.of(SOME_LOCATION), false, false),
new ListTopics(Optional.of(OTHER_LOCATION), false,false)
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ public static Optional<KsqlEntity> execute(
) {
final KafkaTopicClient client = serviceContext.getTopicClient();

final Map<String, TopicDescription> kafkaTopicDescriptions
= client.describeTopics(client.listNonInternalTopicNames());
final Map<String, TopicDescription> kafkaTopicDescriptions;
if (statement.getStatement().getShowAll()) {
kafkaTopicDescriptions = client.describeTopics(client.listTopicNames());
} else {
kafkaTopicDescriptions = client.describeTopics(client.listNonInternalTopicNames());
}

final Map<String, TopicDescription> filteredDescriptions = new TreeMap<>(
filterKsqlInternalTopics(kafkaTopicDescriptions, statement.getConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public void setUp() {
}

@Test
public void shouldListKafkaTopics() {
public void shouldListKafkaTopicsWithoutInternalTopics() {
// Given:
engine.givenKafkaTopic("topic1");
engine.givenKafkaTopic("topic2");
engine.givenKafkaTopic("_confluent_any_topic");

// When:
final KafkaTopicsList topicsList =
Expand All @@ -83,6 +84,30 @@ public void shouldListKafkaTopics() {
));
}

@Test
public void shouldListKafkaTopicsIncludingInternalTopics() {
// Given:
engine.givenKafkaTopic("topic1");
engine.givenKafkaTopic("topic2");
engine.givenKafkaTopic("_confluent_any_topic");

// When:
final KafkaTopicsList topicsList =
(KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute(
engine.configure("LIST ALL TOPICS;"),
ImmutableMap.of(),
engine.getEngine(),
serviceContext
).orElseThrow(IllegalStateException::new);

// Then:
assertThat(topicsList.getTopics(), containsInAnyOrder(
new KafkaTopicInfo("topic1", ImmutableList.of(1)),
new KafkaTopicInfo("topic2", ImmutableList.of(1)),
new KafkaTopicInfo("_confluent_any_topic", ImmutableList.of(1))
));
}

@Test
public void shouldListKafkaTopicsThatDifferByCase() {
// Given:
Expand Down

0 comments on commit dab9bd6

Please sign in to comment.