From f96cb3f0958bfcad3c875d54ebb78cedc967a8a7 Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Tue, 1 Jun 2021 20:45:39 +0200 Subject: [PATCH] Assignment test + NPE fix (#490) * Assignment test + NPE fix Signed-off-by: Stanislav Knot * Paolo Signed-off-by: Stanislav Knot * Paolo is right Signed-off-by: Stanislav Knot * do not rely on the returned exception but watch the state exclusivity Signed-off-by: Stanislav Knot * rebase Signed-off-by: Stanislav Knot --- .../kafka/bridge/SinkBridgeEndpoint.java | 14 ++++- .../bridge/http/HttpSinkBridgeEndpoint.java | 10 +++- .../bridge/http/ConsumerSubscriptionIT.java | 56 ++++++++++++++++++- .../bridge/http/services/ConsumerService.java | 7 +++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java index d0b0a7bb1..481ed2d95 100644 --- a/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/SinkBridgeEndpoint.java @@ -69,6 +69,10 @@ public abstract class SinkBridgeEndpoint implements BridgeEndpoint { protected List topicSubscriptions; protected Pattern topicSubscriptionsPattern; + protected boolean subscribed; + protected boolean assigned; + + private int recordIndex; private int batchSize; @@ -115,6 +119,8 @@ public SinkBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, this.format = format; this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; + this.subscribed = false; + this.assigned = false; } @Override @@ -191,6 +197,7 @@ protected void subscribe(boolean shouldAttachHandler) { this.shouldAttachSubscriberHandler = shouldAttachHandler; log.info("Subscribe to topics {}", this.topicSubscriptions); + this.subscribed = true; this.setPartitionsAssignmentHandlers(); Set topics = this.topicSubscriptions.stream().map(ts -> ts.getTopic()).collect(Collectors.toSet()); @@ -198,12 +205,14 @@ protected void subscribe(boolean shouldAttachHandler) { } /** - * Unubscribe all the topics which the consumer currently subscribes + * Unsubscribe all the topics which the consumer currently subscribes */ protected void unsubscribe() { log.info("Unsubscribe from topics {}", this.topicSubscriptions); topicSubscriptions.clear(); topicSubscriptionsPattern = null; + this.subscribed = false; + this.assigned = false; this.consumer.unsubscribe(this::unsubscribeHandler); } @@ -228,6 +237,7 @@ protected void subscribe(Pattern pattern, boolean shouldAttachHandler) { log.info("Subscribe to topics with pattern {}", pattern); this.setPartitionsAssignmentHandlers(); + this.subscribed = true; this.consumer.subscribe(pattern, this::subscribeHandler); } @@ -276,6 +286,7 @@ protected void assign(boolean shouldAttachHandler) { this.shouldAttachSubscriberHandler = shouldAttachHandler; log.info("Assigning to topics partitions {}", this.topicSubscriptions); + this.assigned = true; this.partitionsAssignmentAndSeek(); } @@ -360,6 +371,7 @@ private Set topicPartitionsToAssign(List availabl Optional requestedPartitionInfo = availablePartitions.stream() .filter(p -> p.getTopic().equals(topicSubscription.getTopic()) && + topicSubscription.getPartition() != null && p.getPartition() == topicSubscription.getPartition()) .findFirst(); diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 1d7bf0873..8d832b379 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -355,6 +355,14 @@ public Iterator> iterator() { } private void doAssign(RoutingContext routingContext, JsonObject bodyAsJson) { + if (subscribed) { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.CONFLICT.code(), "Subscriptions to topics, partitions, and patterns are mutually exclusive." + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.CONFLICT.code(), + BridgeContentType.KAFKA_JSON, error.toJson().toBuffer()); + return; + } JsonArray partitionsList = bodyAsJson.getJsonArray("partitions"); this.topicSubscriptions.addAll( partitionsList.stream() @@ -374,7 +382,7 @@ private void doAssign(RoutingContext routingContext, JsonObject bodyAsJson) { private void doSubscribe(RoutingContext routingContext, JsonObject bodyAsJson) { // cannot specify both topics list and topic pattern - if (bodyAsJson.containsKey("topics") && bodyAsJson.containsKey("topic_pattern")) { + if ((bodyAsJson.containsKey("topics") && bodyAsJson.containsKey("topic_pattern")) || assigned) { HttpBridgeError error = new HttpBridgeError( HttpResponseStatus.CONFLICT.code(), "Subscriptions to topics, partitions, and patterns are mutually exclusive." diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java index 9623ea1cb..0bba689c6 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerSubscriptionIT.java @@ -224,7 +224,7 @@ void listConsumerSubscriptions(VertxTestContext context) throws InterruptedExcep consumerService() .createConsumer(context, groupId, json) - .subscribeConsumer(context, groupId, name, topic, topic2); + .subscribeConsumer(context, groupId, name, topicsRoot); // poll to subscribe CompletableFuture consume = new CompletableFuture<>(); @@ -292,6 +292,60 @@ void tryToPollWithoutSubscriptionTest(VertxTestContext context) throws Interrupt consume.complete(true); }); consume.get(TEST_TIMEOUT, TimeUnit.SECONDS); + consumerService() + .deleteConsumer(context, groupId, name); + context.completeNow(); + } + + @Test + void assignAfterSubscriptionTest(VertxTestContext context) throws InterruptedException, ExecutionException, TimeoutException { + String topic = "subscribe-and-assign-topic"; + + adminClientFacade.createTopic(topic, 4, 1); + + assertThat(adminClientFacade.listTopic().size(), is(1)); + + String name = "my-kafka-consumer-assign"; + + JsonObject json = new JsonObject(); + json.put("name", name); + json.put("format", "json"); + + JsonObject partitionsRoot = new JsonObject(); + JsonArray partitions = new JsonArray(); + JsonObject part0 = new JsonObject(); + part0.put("topic", topic); + part0.put("partition", 0); + + JsonObject part1 = new JsonObject(); + part1.put("topic", topic); + part1.put("partition", 1); + partitions.add(part0); + partitions.add(part1); + + partitionsRoot.put("partitions", partitions); + + consumerService() + .createConsumer(context, groupId, json) + .subscribeConsumer(context, groupId, name, topic); + + CompletableFuture assignCF = new CompletableFuture<>(); + consumerService() + .assignRequest(groupId, name, partitionsRoot) + .sendJsonObject(partitionsRoot, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + HttpBridgeError error = HttpBridgeError.fromJson(response.body()); + assertThat(response.statusCode(), is(HttpResponseStatus.CONFLICT.code())); + assertThat(error.getCode(), is(HttpResponseStatus.CONFLICT.code())); + assertThat(error.getMessage(), is("Subscriptions to topics, partitions, and patterns are mutually exclusive.")); + }); + assignCF.complete(true); + }); + + assignCF.get(TEST_TIMEOUT, TimeUnit.SECONDS); + consumerService() .deleteConsumer(context, groupId, name); context.completeNow(); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/services/ConsumerService.java b/src/test/java/io/strimzi/kafka/bridge/http/services/ConsumerService.java index 681dd99e0..3e4800efa 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/services/ConsumerService.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/services/ConsumerService.java @@ -93,6 +93,13 @@ public HttpRequest offsetsRequest(String groupId, String name, JsonO .as(BodyCodec.jsonObject()); } + public HttpRequest assignRequest(String groupId, String name, JsonObject json) { + return postRequest(Urls.consumerInstanceAssignments(groupId, name)) + .putHeader(CONTENT_LENGTH.toString(), String.valueOf(json.toBuffer().length())) + .putHeader(CONTENT_TYPE.toString(), BridgeContentType.KAFKA_JSON) + .as(BodyCodec.jsonObject()); + } + public HttpRequest offsetsRequest(String groupId, String name) { return postRequest(Urls.consumerInstanceOffsets(groupId, name)); }