Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uploading a blob through a Controller using StreamingFileUpload fails on Azure #423

Closed
johanra opened this issue Feb 19, 2024 · 0 comments · Fixed by #424
Closed

Uploading a blob through a Controller using StreamingFileUpload fails on Azure #423

johanra opened this issue Feb 19, 2024 · 0 comments · Fixed by #424

Comments

@johanra
Copy link
Contributor

johanra commented Feb 19, 2024

Expected Behavior

I created a class StreamingFileUploadRequest which is based on the already existing io.micronaut.objectstorage.request.CompletedFileUploadRequest from micronaut-object-storage-core

class StreamingFileUploadRequest implements UploadRequest {
	    @NonNull
	    private final StreamingFileUpload streamingFileUpload;
	    @NonNull
	    private final String key;

	    @NonNull
	    private Map<String, String> metadata;

	    public StreamingFileUploadRequest(@NonNull StreamingFileUpload streamingFileUpload) {
	        this(streamingFileUpload, streamingFileUpload.getName(), Collections.emptyMap());
	    }

	    public StreamingFileUploadRequest(@NonNull StreamingFileUpload streamingFileUpload, @NonNull String key) {
	        this(streamingFileUpload, key, Collections.emptyMap());
	    }

	    public StreamingFileUploadRequest(@NonNull StreamingFileUpload streamingFileUpload, @NonNull String key, @NonNull Map<String, String> metadata) {
	        this.streamingFileUpload = streamingFileUpload;
	        this.key = key;
	        this.metadata = metadata;
	    }

	    @NonNull
	    @Override
	    public Optional<String> getContentType() {
	        return streamingFileUpload.getContentType()
	            .map(MediaType::getName);
	    }

	    @NonNull
	    @Override
	    public String getKey() {
	        return key;
	    }

	    @NonNull
	    @Override
	    public Optional<Long> getContentSize() {
	        return Optional.of(streamingFileUpload.getSize());
	    }

	    @NonNull
	    @Override
	    public InputStream getInputStream() {
	    	return streamingFileUpload.asInputStream();
	    }

	    @Override
	    @NonNull
	    public Map<String, String> getMetadata() {
	        return this.metadata;
	    }

	    @Override
	    public void setMetadata(@NonNull Map<String, String> metadata) {
	        this.metadata = metadata;
	    }
}

This class is used in a controller like this

@Post(uri = "/blobDirect/{id}", consumes = MediaType.MULTIPART_FORM_DATA)
    public HttpResponse<String> uploadBlobDirect(@NotNull final StreamingFileUpload file, @NotNull @PathVariable final String tenant, @NotNull @PathVariable final String id) {

		StreamingFileUploadRequest uploadRequest = new StreamingFileUploadRequest(file, id);
		objectStorageOperations.upload(uploadRequest);
		
		return  HttpResponse.ok("Uploaded");
	}

I expect the given file to be uploaded into the azure blob storage

Actual Behaviour

An error is returned

java.io.IOException: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
	at io.micronaut.http.netty.PublisherAsStream.read(PublisherAsStream.java:58)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:291)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:347)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399)
	at com.azure.storage.common.Utility.lambda$convertStreamToByteBuffer$1(Utility.java:273)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:271)
	at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:213)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:272)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onSubscribe(FluxHandle.java:90)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onSubscribe(FluxConcatArray.java:187)
	at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:85)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
	at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.Operators.complete(Operators.java:137)
	at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122)
	at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
	at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336)
	at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:71)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
	at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:445)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:710)
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:195)
	at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:456)
	at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
	at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
	at reactor.netty.tcp.SslProvider$SslReadHandler.userEventTriggered(SslProvider.java:856)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:398)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:376)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:368)
	at io.netty.handler.ssl.SslHandler.setHandshakeSuccess(SslHandler.java:1940)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1462)
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1349)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1389)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:387)
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:218)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:296)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
	at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:422)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
	at io.micronaut.http.server.netty.multipart.NettyStreamingFileUpload.asInputStream(NettyStreamingFileUpload.java:133)
	at io.micronaut.upload.UploadController$StreamingFileUploadRequest.getInputStream(UploadController.java:119)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.doUpload(AzureBlobStorageOperations.java:158)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.upload(AzureBlobStorageOperations.java:73)
	at io.micronaut.upload.UploadController.uploadBlobDirect(UploadController.java:67)
	at io.micronaut.upload.$UploadController$Definition$Exec.dispatch(Unknown Source)
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invokeUnsafe(AbstractExecutableMethodsDefinition.java:447)
	at io.micronaut.context.DefaultBeanContext$BeanContextUnsafeExecutionHandle.invokeUnsafe(DefaultBeanContext.java:4214)
	at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:228)
	at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:74)
	at io.micronaut.http.server.RouteExecutor.executeRouteAndConvertBody(RouteExecutor.java:480)
	at io.micronaut.http.server.RouteExecutor.callRoute(RouteExecutor.java:470)
	at io.micronaut.http.server.RequestLifecycle.lambda$normalFlow$2(RequestLifecycle.java:146)
	at io.micronaut.core.execution.ImperativeExecutionFlowImpl.flatMap(ImperativeExecutionFlowImpl.java:72)
	at io.micronaut.core.execution.DelayedExecutionFlowImpl$FlatMap.apply(DelayedExecutionFlowImpl.java:279)
	at io.micronaut.core.execution.DelayedExecutionFlowImpl.work(DelayedExecutionFlowImpl.java:51)
	at io.micronaut.core.execution.DelayedExecutionFlowImpl.next(DelayedExecutionFlowImpl.java:91)
	at io.micronaut.core.execution.DelayedExecutionFlowImpl.flatMap(DelayedExecutionFlowImpl.java:103)
	at io.micronaut.http.server.RequestLifecycle.lambda$normalFlow$4(RequestLifecycle.java:146)
	at io.micronaut.http.server.RequestLifecycle.lambda$runWithFilters$14(RequestLifecycle.java:264)
	at io.micronaut.http.filter.TerminalFilter.processRequestFilter(TerminalFilter.java:58)
	at io.micronaut.http.filter.FilterRunner.filterRequest0(FilterRunner.java:153)
	at io.micronaut.http.filter.FilterRunner.filterRequest0(FilterRunner.java:153)
	at io.micronaut.http.filter.FilterRunner.lambda$filterRequest0$2(FilterRunner.java:153)
	at io.micronaut.core.execution.ImperativeExecutionFlowImpl.flatMap(ImperativeExecutionFlowImpl.java:72)
	at io.micronaut.http.filter.MethodFilter.processRequestFilter(MethodFilter.java:263)
	at io.micronaut.http.filter.FilterRunner.filterRequest0(FilterRunner.java:153)
	at io.micronaut.http.filter.FilterRunner.filterRequest(FilterRunner.java:137)
	at io.micronaut.http.filter.FilterRunner.run(FilterRunner.java:132)
	at io.micronaut.http.server.RequestLifecycle.runWithFilters(RequestLifecycle.java:281)
	at io.micronaut.http.server.RequestLifecycle.normalFlow(RequestLifecycle.java:143)
	at io.micronaut.http.server.netty.NettyRequestLifecycle.handleNormal(NettyRequestLifecycle.java:85)
	at io.micronaut.http.server.netty.RoutingInBoundHandler.accept(RoutingInBoundHandler.java:220)
	at io.micronaut.http.server.netty.handler.PipeliningServerHandler$OptimisticBufferingInboundHandler.read(PipeliningServerHandler.java:422)
	at io.micronaut.http.server.netty.handler.PipeliningServerHandler.channelRead(PipeliningServerHandler.java:206)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)

Steps To Reproduce

  1. run micronaut application from the attached project is running with the azure dependendency
  2. Invoke the endpoint http://localhost:8080/tenant1/upload/blobDirect/abcd
  3. This returns
{
    "_links": {
        "self": [
            {
                "href": "/tenant1/upload/blobDirect/abcd",
                "templated": false
            }
        ]
    },
    "_embedded": {
        "errors": [
            {
                "message": "Internal Server Error: java.io.IOException: java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber"
            }
        ]
    },
    "message": "Internal Server Error"
}
  1. change the pom to use micronaut-object-storage-local and the same endpoint now returns
Uploaded

It seems that in the azure implementation the StreamingFileUploadRequest#getInputStream is called twice which isn't supported by the current implementation of io.micronaut.http.multipart.StreamingFileUpload#asInputStream

first invocation

at io.micronaut.upload.UploadController$StreamingFileUploadRequest.getInputStream(UploadController.java:121)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.getUploadOptions(AzureBlobStorageOperations.java:140)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.upload(AzureBlobStorageOperations.java:72)
	at io.micronaut.upload.UploadController.uploadBlobDirect(UploadController.java:67)

second invocation

at io.micronaut.upload.UploadController$StreamingFileUploadRequest.getInputStream(UploadController.java:121)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.doUpload(AzureBlobStorageOperations.java:158)
	at io.micronaut.objectstorage.azure.AzureBlobStorageOperations.upload(AzureBlobStorageOperations.java:73)
	at io.micronaut.upload.UploadController.uploadBlobDirect(UploadController.java:67)

To fix the issue the doUpload method in AzureBlobStorageOperations can be modified to avoid the second invocation

UploadResponse<BlockBlobItem> doUpload(@NonNull UploadRequest request,
                                             @NonNull BlobParallelUploadOptions options) {
       ...
        if (request.getContentSize().isPresent()) {
            long length = request.getContentSize().get();
            BlockBlobSimpleUploadOptions simpleUploadOptions = toBlockBlobSimpleUploadOptions(options, request.getInputStream(), length);
            ...
        } 
        ...
    }

could be changed to avoid the getInputStream on the request when a dataStream is already present in the BlobParallelUploadOptions

UploadResponse<BlockBlobItem> doUpload(@NonNull UploadRequest request,
                                             @NonNull BlobParallelUploadOptions options) {
       ...
        if (request.getContentSize().isPresent()) {
            long length = request.getContentSize().get();
            InputStream inputStream = Optional.ofNullable(options.getDataStream()).orElse(request.getInputStream());
            BlockBlobSimpleUploadOptions simpleUploadOptions = toBlockBlobSimpleUploadOptions(options, inputStream, length);
            ...
        } 
        ...
    }

Environment Information

No response

Example Application

https://github.com/johanra/micronaut-StreamingFileUpload-issues

Version

4.2.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant