Skip to content

Commit

Permalink
Fix blocking call in AsyncRestProxy (#36384)
Browse files Browse the repository at this point in the history
  • Loading branch information
srnagar authored Aug 18, 2023
1 parent ae56b9d commit 317aba0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.azure.json.JsonSerializable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -143,19 +144,32 @@ private Mono<?> handleRestResponseReturnType(final HttpResponseDecoder.HttpDecod
return response.getSourceResponse().getBody().ignoreElements()
.then(Mono.fromCallable(() -> createResponse(response, entityType, null)));
} else {
return handleBodyReturnType(response.getSourceResponse(), response::getDecodedBody, methodParser,
bodyType)
return handleBodyReturnType(response.getSourceResponse(),
decodeBytes(response),
methodParser, bodyType)
.map(bodyAsObject -> createResponse(response, entityType, bodyAsObject))
.switchIfEmpty(Mono.fromCallable(() -> createResponse(response, entityType, null)));
}
} else {
// For now, we're just throwing if the Maybe didn't emit a value.
return handleBodyReturnType(response.getSourceResponse(), response::getDecodedBody, methodParser,
entityType);
return handleBodyReturnType(response.getSourceResponse(), decodeBytes(response), methodParser, entityType);
}
}

static Mono<?> handleBodyReturnType(HttpResponse sourceResponse, Function<byte[], Object> getDecodedBody,
private static Function<byte[], Mono<Object>> decodeBytes(HttpResponseDecoder.HttpDecodedResponse response) {
return bytes -> Mono.fromCallable(() -> response.getDecodedBody(bytes))
.publishOn(Schedulers.boundedElastic())
.handle((object, sink) -> {
if (object == null) {
sink.complete();
} else {
sink.next(object);
sink.complete();
}
});
}

static Mono<?> handleBodyReturnType(HttpResponse sourceResponse, Function<byte[], Mono<Object>> getDecodedBody,
SwaggerMethodParser methodParser, Type entityType) {
final int responseStatusCode = sourceResponse.getStatusCode();
final HttpMethod httpMethod = methodParser.getHttpMethod();
Expand Down Expand Up @@ -197,7 +211,7 @@ static Mono<?> handleBodyReturnType(HttpResponse sourceResponse, Function<byte[]
asyncResult = sourceResponse.getBodyAsInputStream();
} else {
// Mono<Object> or Mono<Page<T>>
asyncResult = sourceResponse.getBodyAsByteArray().mapNotNull(getDecodedBody);
asyncResult = sourceResponse.getBodyAsByteArray().flatMap(getDecodedBody);
}
return asyncResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Args=\
ch.qos.logback.classic.Logger,\
ch.qos.logback.classic.PatternLayout,\
ch.qos.logback.core.CoreConstants,\
ch.qos.logback.core.pattern.parser.Parser,\
ch.qos.logback.core.spi.AppenderAttachableImpl,\
ch.qos.logback.core.status.InfoStatus,\
ch.qos.logback.core.status.StatusBase,\
Expand Down

0 comments on commit 317aba0

Please sign in to comment.