Skip to content

Commit

Permalink
[#3566] Replace direct Vert.x CompositeFuture method calls with Futur…
Browse files Browse the repository at this point in the history
…e method calls
  • Loading branch information
harism authored and sophokles73 committed Oct 19, 2023
1 parent 6757e40 commit 6ffdace
Show file tree
Hide file tree
Showing 58 changed files with 222 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import io.smallrye.config.ConfigMapping;
import io.vertx.core.CompositeFuture;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -317,7 +316,7 @@ protected void doStart() {
})
.onFailure(t -> LOG.error("failed to deploy notification receiver verticle(s)", t));

CompositeFuture.all(adapterTracker, notificationReceiverTracker)
Future.all(adapterTracker, notificationReceiverTracker)
.map(deploymentResult)
.onComplete(deploymentCheck);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -58,7 +58,6 @@
import io.micrometer.core.instrument.Timer.Sample;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -367,15 +366,14 @@ protected final Future<Void> stopInternal() {

private Future<Void> closeServiceClients() {

@SuppressWarnings("rawtypes")
final List<Future> results = new ArrayList<>();
final List<Future<Void>> results = new ArrayList<>();
results.add(tenantClient.stop());
results.add(registrationClient.stop());
results.add(credentialsClient.stop());
results.add(commandConsumerFactory.stop());
results.add(commandRouterClient.stop());
results.add(messagingClientProviders.stop());
return CompositeFuture.all(results).mapEmpty();
return Future.all(results).mapEmpty();
}

/**
Expand Down Expand Up @@ -459,7 +457,7 @@ protected Future<Void> checkConnectionLimit(final TenantObject tenantConfig, fin
}
});

return CompositeFuture.all(
return Future.all(
connectionLimitCheckResult,
checkConnectionDurationLimit(tenantConfig, spanContext),
messageLimitCheckResult).mapEmpty();
Expand Down Expand Up @@ -923,7 +921,7 @@ public final Future<Void> sendTtdEvent(
context);
final Future<TenantObject> tenantConfigTracker = getTenantConfiguration(tenant, context);

return CompositeFuture.all(tokenTracker, tenantConfigTracker).compose(ok -> {
return Future.all(tokenTracker, tenantConfigTracker).compose(ok -> {
if (tenantConfigTracker.result().isAdapterEnabled(getTypeName())) {
final Map<String, Object> props = new HashMap<>();
props.put(MessageHelper.APP_PROPERTY_ORIG_ADAPTER, getTypeName());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.ext.healthchecks.HealthCheckHandler;

Expand Down Expand Up @@ -141,15 +140,15 @@ public MessagingClientProvider<CommandResponseSender> getCommandResponseSenderPr
@Override
public Future<Void> start() {

return CompositeFuture.all(
return Future.all(
telemetrySenderProvider.start(),
eventSenderProvider.start(),
commandResponseSenderProvider.start()).mapEmpty();
}

@Override
public Future<Void> stop() {
return CompositeFuture.all(
return Future.all(
telemetrySenderProvider.stop(),
eventSenderProvider.stop(),
commandResponseSenderProvider.stop()).mapEmpty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2018, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -90,7 +90,6 @@
import io.opentracing.log.Fields;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -200,7 +199,7 @@ protected void doStart(final Promise<Void> startPromise) {
}
return Future.succeededFuture();
})
.compose(success -> CompositeFuture.all(bindSecureServer(), bindInsecureServer()))
.compose(success -> Future.all(bindSecureServer(), bindInsecureServer()))
.map(ok -> (Void) null)
.onComplete(startPromise);
}
Expand Down Expand Up @@ -291,7 +290,7 @@ protected void doStop(final Promise<Void> stopPromise) {
log.trace("stop already called");
return;
}
CompositeFuture.all(stopSecureServer(), stopInsecureServer())
Future.all(stopSecureServer(), stopInsecureServer())
.map(ok -> (Void) null)
.onComplete(ar -> log.info("AMQP server(s) closed"))
.onComplete(stopPromise);
Expand Down Expand Up @@ -487,8 +486,7 @@ void onConnectionLoss(final ProtonConnection con) {
private Future<Void> handleConnectionLossInternal(final ProtonConnection con, final Span span,
final boolean sendDisconnectedEvent, final boolean closeCommandConsumers) {
authenticatedDeviceConnections.remove(con);
@SuppressWarnings("rawtypes")
final List<Future> handlerResults;
final List<Future<Void>> handlerResults;
if (closeCommandConsumers) {
handlerResults = getCommandSubscriptions(con).stream()
.map(commandSubscription -> closeCommandConsumer(commandSubscription.getConsumer(),
Expand All @@ -498,7 +496,7 @@ private Future<Void> handleConnectionLossInternal(final ProtonConnection con, fi
handlerResults = Collections.emptyList();
}
decrementConnectionCount(con, span.context(), sendDisconnectedEvent);
return CompositeFuture.join(handlerResults)
return Future.join(handlerResults)
.recover(thr -> {
Tags.ERROR.set(span, true);
return Future.failedFuture(thr);
Expand Down Expand Up @@ -578,10 +576,10 @@ private Future<Void> checkAuthorizationAndResourceLimits(
// the device/gateway exists and is enabled and
// that the connection limit for the tenant is not exceeded.

CompositeFuture.all(
Future.all(
checkDeviceRegistration(authenticatedDevice, span.context()),
getTenantConfiguration(authenticatedDevice.getTenantId(), span.context())
.compose(tenantConfig -> CompositeFuture.all(
.compose(tenantConfig -> Future.all(
isAdapterEnabled(tenantConfig),
checkConnectionLimit(tenantConfig, span.context()))))
.map(ok -> {
Expand Down Expand Up @@ -1284,12 +1282,12 @@ private Future<Void> doUploadMessage(
final Future<TenantObject> tenantTracker = getTenantConfiguration(resource.getTenantId(),
currentSpan.context());
final Future<TenantObject> tenantValidationTracker = tenantTracker
.compose(tenantObject -> CompositeFuture
.compose(tenantObject -> Future
.all(isAdapterEnabled(tenantObject),
checkMessageLimit(tenantObject, context.getPayloadSize(), currentSpan.context()))
.map(success -> tenantObject));

return CompositeFuture.all(tenantValidationTracker, tokenFuture)
return Future.all(tenantValidationTracker, tokenFuture)
.compose(ok -> {
final Map<String, Object> props = getDownstreamMessageProperties(context);

Expand Down Expand Up @@ -1370,7 +1368,7 @@ private Future<Void> doUploadCommandResponseMessage(
final Future<TenantObject> tenantTracker = getTenantConfiguration(resource.getTenantId(),
currentSpan.context());

return CompositeFuture.all(tenantTracker, responseTracker)
return Future.all(tenantTracker, responseTracker)
.compose(ok -> {
final CommandResponse commandResponse = responseTracker.result();
log.trace("sending command response [device-id: {}, status: {}, correlation-id: {}, reply-to: {}]",
Expand All @@ -1388,13 +1386,13 @@ private Future<Void> doUploadCommandResponseMessage(
resource.getResourceId(),
context.getAuthenticatedDevice(),
currentSpan.context());
final Future<TenantObject> tenantValidationTracker = CompositeFuture
final Future<TenantObject> tenantValidationTracker = Future
.all(isAdapterEnabled(tenantTracker.result()),
checkMessageLimit(tenantTracker.result(), context.getPayloadSize(),
currentSpan.context()))
.map(success -> tenantTracker.result());

return CompositeFuture.all(tenantValidationTracker, tokenFuture)
return Future.all(tenantValidationTracker, tokenFuture)
.compose(success -> sendCommandResponse(
tenantTracker.result(),
tokenFuture.result(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -292,14 +291,14 @@ protected final Future<Void> doUploadMessage(
currentSpan.context());
final Future<TenantObject> tenantTracker = getAdapter().getTenantClient().get(tenantId, currentSpan.context());
final Future<TenantObject> tenantValidationTracker = tenantTracker
.compose(tenantObject -> CompositeFuture.all(
.compose(tenantObject -> Future.all(
getAdapter().isAdapterEnabled(tenantObject),
getAdapter().checkMessageLimit(tenantObject, payload.length(), currentSpan.context()))
.map(tenantObject));

// we only need to consider TTD if the device and tenant are enabled and the adapter
// is enabled for the tenant
final Future<Integer> ttdTracker = CompositeFuture.all(tenantValidationTracker, tokenTracker)
final Future<Integer> ttdTracker = Future.all(tenantValidationTracker, tokenTracker)
.compose(ok -> {
final Integer ttdParam = context.getTimeUntilDisconnect();
return getAdapter().getTimeUntilDisconnect(tenantTracker.result(), ttdParam)
Expand Down Expand Up @@ -346,7 +345,7 @@ protected final Future<Void> doUploadMessage(
props,
currentSpan.context());
}
return CompositeFuture.all(sendResult, responseReady.future()).mapEmpty();
return Future.all(sendResult, responseReady.future()).mapEmpty();
}).compose(proceed -> {

// downstream message sent and (if ttd was set) command was received or ttd has timed out
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2018, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2018, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -25,7 +25,6 @@
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.Futures;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -211,7 +210,7 @@ protected Resource createRoot() {
this.insecureEndpoint = ep;
});

return CompositeFuture.any(insecureEndpointFuture, secureEndpointFuture)
return Future.any(insecureEndpointFuture, secureEndpointFuture)
.map(ok -> {
this.server = newServer;
return newServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -170,8 +169,8 @@ public final Future<Void> uploadCommandResponseMessage(final CoapContext context
String.format("command-request-id [%s] or status code [%s] is missing/invalid",
commandRequestId, responseStatus))));

return CompositeFuture.all(tenantTracker, commandResponseTracker, deviceRegistrationTracker)
.compose(ok -> CompositeFuture.all(
return Future.all(tenantTracker, commandResponseTracker, deviceRegistrationTracker)
.compose(ok -> Future.all(
getAdapter().isAdapterEnabled(tenantTracker.result()),
getAdapter().checkMessageLimit(tenantTracker.result(), payload.length(), currentSpan.context()))
.mapEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -188,7 +187,7 @@ public final void doStart(final Promise<Void> startPromise) {
System.setProperty(HttpUtils.SYSTEM_PROPERTY_ROUTER_SETUP_LENIENT, "true");

addRoutes(router);
return CompositeFuture.all(bindSecureHttpServer(router), bindInsecureHttpServer(router));
return Future.all(bindSecureHttpServer(router), bindInsecureHttpServer(router));
}
})
.compose(ok -> {
Expand Down Expand Up @@ -473,7 +472,7 @@ public final void doStop(final Promise<Void> stopPromise) {
insecureServerStopTracker.complete();
}

CompositeFuture.all(serverStopTracker.future(), insecureServerStopTracker.future())
Future.all(serverStopTracker.future(), insecureServerStopTracker.future())
.compose(v -> postShutdown())
.onComplete(stopPromise);
}
Expand Down Expand Up @@ -617,14 +616,14 @@ private void doUploadMessage(
.orElse(0);
final Future<TenantObject> tenantTracker = getTenantConfiguration(tenant, currentSpan.context());
final Future<TenantObject> tenantValidationTracker = tenantTracker
.compose(tenantObject -> CompositeFuture
.compose(tenantObject -> Future
.all(isAdapterEnabled(tenantObject),
checkMessageLimit(tenantObject, payloadSize, currentSpan.context()))
.map(success -> tenantObject));

// we only need to consider TTD if the device and tenant are enabled and the adapter
// is enabled for the tenant
final Future<Integer> ttdTracker = CompositeFuture.all(tenantValidationTracker, tokenTracker)
final Future<Integer> ttdTracker = Future.all(tenantValidationTracker, tokenTracker)
.compose(ok -> {
final Integer ttdParam = getTimeUntilDisconnectFromRequest(ctx);
return getTimeUntilDisconnect(tenantTracker.result(), ttdParam)
Expand Down Expand Up @@ -656,7 +655,7 @@ private void doUploadMessage(
if (EndpointType.EVENT.equals(endpoint)) {
ctx.getTimeToLive()
.ifPresent(ttl -> props.put(MessageHelper.SYS_HEADER_PROPERTY_TTL, ttl.toSeconds()));
return CompositeFuture.all(
return Future.all(
getEventSender(tenantValidationTracker.result()).sendEvent(
tenantTracker.result(),
tokenTracker.result(),
Expand All @@ -668,7 +667,7 @@ private void doUploadMessage(
.map(s -> (Void) null);
} else {
// unsettled
return CompositeFuture.all(
return Future.all(
getTelemetrySender(tenantValidationTracker.result()).sendTelemetry(
tenantTracker.result(),
tokenTracker.result(),
Expand Down Expand Up @@ -1244,19 +1243,19 @@ public final void uploadCommandResponseMessage(
commandRequestId, responseStatus)));

final int payloadSize = Optional.ofNullable(payload).map(Buffer::length).orElse(0);
CompositeFuture.all(tenantTracker, commandResponseTracker)
Future.all(tenantTracker, commandResponseTracker)
.compose(commandResponse -> {
final Future<RegistrationAssertion> deviceRegistrationTracker = getRegistrationAssertion(
tenant,
deviceId,
authenticatedDevice,
currentSpan.context());
final Future<Void> tenantValidationTracker = CompositeFuture
final Future<Void> tenantValidationTracker = Future
.all(isAdapterEnabled(tenantTracker.result()),
checkMessageLimit(tenantTracker.result(), payloadSize, currentSpan.context()))
.map(ok -> null);

return CompositeFuture.all(tenantValidationTracker, deviceRegistrationTracker)
return Future.all(tenantValidationTracker, deviceRegistrationTracker)
.compose(ok -> sendCommandResponse(
tenantTracker.result(),
deviceRegistrationTracker.result(),
Expand Down
Loading

0 comments on commit 6ffdace

Please sign in to comment.