diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java index 9a0efc53b2909..11d1fe1693b6a 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/MultiInvoker.java @@ -125,7 +125,9 @@ public Multi method(String name, Entity entity, GenericType respons if (!emitter.isCancelled()) { if (response.getStatus() == 200 && MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(response.getMediaType())) { - registerForSse(multiRequest, responseType, response, vertxResponse); + registerForSse(multiRequest, responseType, response, vertxResponse, + (String) restClientRequestContext.getProperties() + .get(RestClientRequestContext.DEFAULT_CONTENT_TYPE_PROP)); } else if (response.getStatus() == 200 && RestMediaType.APPLICATION_STREAM_JSON_TYPE.isCompatible(response.getMediaType())) { registerForJsonStream(multiRequest, restClientRequestContext, responseType, response, @@ -152,12 +154,12 @@ private boolean isNewlineDelimited(ResponseImpl response) { private void registerForSse(MultiRequest multiRequest, GenericType responseType, Response response, - HttpClientResponse vertxResponse) { + HttpClientResponse vertxResponse, String defaultContentType) { // honestly, isn't reconnect contradictory with completion? // FIXME: Reconnect settings? // For now we don't want multi to reconnect SseEventSourceImpl sseSource = new SseEventSourceImpl(invocationBuilder.getTarget(), - invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS); + invocationBuilder, Integer.MAX_VALUE, TimeUnit.SECONDS, defaultContentType); multiRequest.onCancel(sseSource::close); sseSource.register(event -> { diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java index 86ce7c526591c..fbdf2cc6ff5e5 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseEventSourceImpl.java @@ -42,6 +42,11 @@ public class SseEventSourceImpl implements SseEventSource, Handler { public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder, long reconnectDelay, TimeUnit reconnectUnit) { + this(webTarget, invocationBuilder, reconnectDelay, reconnectUnit, null); + } + + public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocationBuilder, + long reconnectDelay, TimeUnit reconnectUnit, String defaultContentType) { // tests set a null endpoint Objects.requireNonNull(reconnectUnit); if (reconnectDelay <= 0) @@ -49,7 +54,7 @@ public SseEventSourceImpl(WebTargetImpl webTarget, Invocation.Builder invocation this.webTarget = webTarget; this.reconnectDelay = reconnectDelay; this.reconnectUnit = reconnectUnit; - this.sseParser = new SseParser(this); + this.sseParser = new SseParser(this, defaultContentType); this.invocationBuilder = invocationBuilder; } @@ -136,7 +141,9 @@ private void registerOnClient(HttpClientResponse vertxClientResponse) { vertxClientResponse.request().exceptionHandler(null); connection = vertxClientResponse.request().connection(); String sseContentTypeHeader = vertxClientResponse.getHeader(CommonSseUtil.SSE_CONTENT_TYPE); - sseParser.setSseContentTypeHeader(sseContentTypeHeader); + if ((sseContentTypeHeader != null) && !sseContentTypeHeader.isEmpty()) { + sseParser.setSseContentTypeHeader(sseContentTypeHeader); + } // we don't add a closeHandler handler on the connection as it can race with this handler // and close before the emitter emits anything // see: https://github.com/quarkusio/quarkus/pull/16438 diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseParser.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseParser.java index 7728e87ff7098..08525139dc0e9 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseParser.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/SseParser.java @@ -50,7 +50,8 @@ public class SseParser implements Handler { */ private String contentType; /** - * The content type we're reading. Defaults to the X-Sse-Element-Type header + * The content type we're reading. If the X-Sse-Element-Type header is not set, then it defaults to the declared @Produces + * (if any) */ private String contentTypeHeader; /** @@ -67,8 +68,9 @@ public class SseParser implements Handler { private long eventReconnectTime = SseEvent.RECONNECT_NOT_SET; private SseEventSourceImpl sseEventSource; - public SseParser(SseEventSourceImpl sseEventSource) { + public SseParser(SseEventSourceImpl sseEventSource, String defaultContentType) { this.sseEventSource = sseEventSource; + this.contentTypeHeader = defaultContentType; } public void setSseContentTypeHeader(String sseContentTypeHeader) {