Skip to content

Commit

Permalink
Assignment test + NPE fix (#490)
Browse files Browse the repository at this point in the history
* Assignment test + NPE fix

Signed-off-by: Stanislav Knot <[email protected]>

* Paolo

Signed-off-by: Stanislav Knot <[email protected]>

* Paolo is right

Signed-off-by: Stanislav Knot <[email protected]>

* do not rely on the returned exception but watch the state exclusivity

Signed-off-by: Stanislav Knot <[email protected]>

* rebase

Signed-off-by: Stanislav Knot <[email protected]>
  • Loading branch information
sknot-rh authored Jun 1, 2021
1 parent 0c04268 commit f96cb3f
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
14 changes: 13 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public abstract class SinkBridgeEndpoint<K, V> implements BridgeEndpoint {
protected List<SinkTopicSubscription> topicSubscriptions;
protected Pattern topicSubscriptionsPattern;

protected boolean subscribed;
protected boolean assigned;


private int recordIndex;
private int batchSize;

Expand Down Expand Up @@ -115,6 +119,8 @@ public SinkBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig,
this.format = format;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.subscribed = false;
this.assigned = false;
}

@Override
Expand Down Expand Up @@ -191,19 +197,22 @@ protected void subscribe(boolean shouldAttachHandler) {
this.shouldAttachSubscriberHandler = shouldAttachHandler;

log.info("Subscribe to topics {}", this.topicSubscriptions);
this.subscribed = true;
this.setPartitionsAssignmentHandlers();

Set<String> topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet());
this.consumer.subscribe(topics, this::subscribeHandler);
}

/**
* Unubscribe all the topics which the consumer currently subscribes
* Unsubscribe all the topics which the consumer currently subscribes
*/
protected void unsubscribe() {
log.info("Unsubscribe from topics {}", this.topicSubscriptions);
topicSubscriptions.clear();
topicSubscriptionsPattern = null;
this.subscribed = false;
this.assigned = false;
this.consumer.unsubscribe(this::unsubscribeHandler);
}

Expand All @@ -228,6 +237,7 @@ protected void subscribe(Pattern pattern, boolean shouldAttachHandler) {

log.info("Subscribe to topics with pattern {}", pattern);
this.setPartitionsAssignmentHandlers();
this.subscribed = true;
this.consumer.subscribe(pattern, this::subscribeHandler);
}

Expand Down Expand Up @@ -276,6 +286,7 @@ protected void assign(boolean shouldAttachHandler) {
this.shouldAttachSubscriberHandler = shouldAttachHandler;

log.info("Assigning to topics partitions {}", this.topicSubscriptions);
this.assigned = true;
this.partitionsAssignmentAndSeek();
}

Expand Down Expand Up @@ -360,6 +371,7 @@ private Set<TopicPartition> topicPartitionsToAssign(List<PartitionInfo> availabl
Optional<PartitionInfo> requestedPartitionInfo =
availablePartitions.stream()
.filter(p -> p.getTopic().equals(topicSubscription.getTopic()) &&
topicSubscription.getPartition() != null &&
p.getPartition() == topicSubscription.getPartition())
.findFirst();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ public Iterator<Map.Entry<String, String>> iterator() {
}

private void doAssign(RoutingContext routingContext, JsonObject bodyAsJson) {
if (subscribed) {
HttpBridgeError error = new HttpBridgeError(
HttpResponseStatus.CONFLICT.code(), "Subscriptions to topics, partitions, and patterns are mutually exclusive."
);
HttpUtils.sendResponse(routingContext, HttpResponseStatus.CONFLICT.code(),
BridgeContentType.KAFKA_JSON, error.toJson().toBuffer());
return;
}
JsonArray partitionsList = bodyAsJson.getJsonArray("partitions");
this.topicSubscriptions.addAll(
partitionsList.stream()
Expand All @@ -374,7 +382,7 @@ private void doAssign(RoutingContext routingContext, JsonObject bodyAsJson) {

private void doSubscribe(RoutingContext routingContext, JsonObject bodyAsJson) {
// cannot specify both topics list and topic pattern
if (bodyAsJson.containsKey("topics") && bodyAsJson.containsKey("topic_pattern")) {
if ((bodyAsJson.containsKey("topics") && bodyAsJson.containsKey("topic_pattern")) || assigned) {
HttpBridgeError error = new HttpBridgeError(
HttpResponseStatus.CONFLICT.code(),
"Subscriptions to topics, partitions, and patterns are mutually exclusive."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void listConsumerSubscriptions(VertxTestContext context) throws InterruptedExcep

consumerService()
.createConsumer(context, groupId, json)
.subscribeConsumer(context, groupId, name, topic, topic2);
.subscribeConsumer(context, groupId, name, topicsRoot);

// poll to subscribe
CompletableFuture<Boolean> consume = new CompletableFuture<>();
Expand Down Expand Up @@ -292,6 +292,60 @@ void tryToPollWithoutSubscriptionTest(VertxTestContext context) throws Interrupt
consume.complete(true);
});
consume.get(TEST_TIMEOUT, TimeUnit.SECONDS);
consumerService()
.deleteConsumer(context, groupId, name);
context.completeNow();
}

@Test
void assignAfterSubscriptionTest(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
String topic = "subscribe-and-assign-topic";

adminClientFacade.createTopic(topic, 4, 1);

assertThat(adminClientFacade.listTopic().size(), is(1));

String name = "my-kafka-consumer-assign";

JsonObject json = new JsonObject();
json.put("name", name);
json.put("format", "json");

JsonObject partitionsRoot = new JsonObject();
JsonArray partitions = new JsonArray();
JsonObject part0 = new JsonObject();
part0.put("topic", topic);
part0.put("partition", 0);

JsonObject part1 = new JsonObject();
part1.put("topic", topic);
part1.put("partition", 1);
partitions.add(part0);
partitions.add(part1);

partitionsRoot.put("partitions", partitions);

consumerService()
.createConsumer(context, groupId, json)
.subscribeConsumer(context, groupId, name, topic);

CompletableFuture<Boolean> assignCF = new CompletableFuture<>();
consumerService()
.assignRequest(groupId, name, partitionsRoot)
.sendJsonObject(partitionsRoot, ar -> {
context.verify(() -> {
assertThat(ar.succeeded(), is(true));
HttpResponse<JsonObject> response = ar.result();
HttpBridgeError error = HttpBridgeError.fromJson(response.body());
assertThat(response.statusCode(), is(HttpResponseStatus.CONFLICT.code()));
assertThat(error.getCode(), is(HttpResponseStatus.CONFLICT.code()));
assertThat(error.getMessage(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive."));
});
assignCF.complete(true);
});

assignCF.get(TEST_TIMEOUT, TimeUnit.SECONDS);

consumerService()
.deleteConsumer(context, groupId, name);
context.completeNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public HttpRequest<JsonObject> offsetsRequest(String groupId, String name, JsonO
.as(BodyCodec.jsonObject());
}

public HttpRequest<JsonObject> assignRequest(String groupId, String name, JsonObject json) {
return postRequest(Urls.consumerInstanceAssignments(groupId, name))
.putHeader(CONTENT_LENGTH.toString(), String.valueOf(json.toBuffer().length()))
.putHeader(CONTENT_TYPE.toString(), BridgeContentType.KAFKA_JSON)
.as(BodyCodec.jsonObject());
}

public HttpRequest<JsonObject> offsetsRequest(String groupId, String name) {
return postRequest(Urls.consumerInstanceOffsets(groupId, name));
}
Expand Down

0 comments on commit f96cb3f

Please sign in to comment.