Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InvalidOperationException during INatsObjStore.PutAsync #672

Closed
uhfath opened this issue Nov 9, 2024 · 2 comments · Fixed by #675
Closed

InvalidOperationException during INatsObjStore.PutAsync #672

uhfath opened this issue Nov 9, 2024 · 2 comments · Fixed by #675

Comments

@uhfath
Copy link
Contributor

uhfath commented Nov 9, 2024

Observed behavior

In our main project we get aforementioned exception during upload a binary object (about 23 Mb.). The exception occurs on subsequent uploads (after first one succeeds). Each upload creates a new INatsObjContext using a previous INatsConnection (it's a singleton). The code is as simple as:

var objectStore = await _natsObjContext.GetObjectStoreAsync(_blobsCacheOptions.ObjectBucket, cancellationToken);
var objectData = await objectStore.PutAsync(blobMeta, stream, true, cancellationToken);

The exception we get is:

System.InvalidOperationException
  HResult=0x80131509
  Message=The response headers cannot be modified because the response has already started.
  Source=NATS.Client.Core
  StackTrace:
>	NATS.Client.Core.dll!NATS.Client.Core.NatsHeaders.ThrowIfReadOnly() Line 414	C#
 	NATS.Client.Core.dll!NATS.Client.Core.NatsHeaders.this[string].set(string key, Microsoft.Extensions.Primitives.StringValues value) Line 140	C#
 	NATS.Client.Core.dll!NATS.Client.Core.Internal.Telemetry.AddTraceContextHeaders.AnonymousMethod__5_0(object carrier, string fieldName, string fieldValue) Line 90	C#
 	System.Diagnostics.DiagnosticSource.dll!System.Diagnostics.LegacyPropagator.Inject(System.Diagnostics.Activity activity, object carrier, System.Diagnostics.DistributedContextPropagator.PropagatorSetterCallback setter) Line 31	C#
 	NATS.Client.Core.dll!NATS.Client.Core.Internal.Telemetry.AddTraceContextHeaders(System.Diagnostics.Activity activity, ref NATS.Client.Core.NatsHeaders headers) Line 79	C#
 	NATS.Client.Core.dll!NATS.Client.Core.NatsConnection.PublishAsync<NATS.Client.ObjectStore.Models.ObjectMetadata>(string subject, NATS.Client.ObjectStore.Models.ObjectMetadata data, NATS.Client.Core.NatsHeaders headers, string replyTo, NATS.Client.Core.INatsSerialize<NATS.Client.ObjectStore.Models.ObjectMetadata> serializer, NATS.Client.Core.NatsPubOpts opts, System.Threading.CancellationToken cancellationToken) Line 40	C#
 	NATS.Client.Core.dll!NATS.Client.Core.NatsConnection.CreateRequestSubAsync<NATS.Client.ObjectStore.Models.ObjectMetadata, NATS.Client.JetStream.Models.PubAckResponse>(string subject, NATS.Client.ObjectStore.Models.ObjectMetadata data, NATS.Client.Core.NatsHeaders headers, NATS.Client.Core.INatsSerialize<NATS.Client.ObjectStore.Models.ObjectMetadata> requestSerializer, NATS.Client.Core.INatsDeserialize<NATS.Client.JetStream.Models.PubAckResponse> replySerializer, NATS.Client.Core.NatsPubOpts requestOpts, NATS.Client.Core.NatsSubOpts replyOpts, System.Threading.CancellationToken cancellationToken) Line 23	C#
 	NATS.Client.JetStream.dll!NATS.Client.JetStream.NatsJSContext.PublishAsync<NATS.Client.ObjectStore.Models.ObjectMetadata>(string subject, NATS.Client.ObjectStore.Models.ObjectMetadata data, NATS.Client.Core.INatsSerialize<NATS.Client.ObjectStore.Models.ObjectMetadata> serializer, NATS.Client.JetStream.NatsJSPubOpts opts, NATS.Client.Core.NatsHeaders headers, System.Threading.CancellationToken cancellationToken) Line 123	C#
 	NATS.Client.ObjectStore.dll!NATS.Client.ObjectStore.NatsObjStore.PublishMeta(NATS.Client.ObjectStore.Models.ObjectMetadata meta, System.Threading.CancellationToken cancellationToken) Line 606	C#
 	NATS.Client.ObjectStore.dll!NATS.Client.ObjectStore.NatsObjStore.PutAsync(NATS.Client.ObjectStore.Models.ObjectMetadata meta, System.IO.Stream stream, bool leaveOpen, System.Threading.CancellationToken cancellationToken) Line 279	C#

The main problem is that we can't reproduce this case in a separate simple project.
I understand it's hard to diagnose in this case so I wanted to ask if there is something specific I should be looking for to create a repro?

Expected behavior

Uploading should end without exceptions.

Server and client version

Client: 2.5.3
Server: 2.10.20

Here are some logs (removed most irrelevant parts):

[
    {
        "@t": "2024-11-09T08:52:00.4003363Z",
        "@mt": "Caching a blob.",
        "@i": "e81ceeb3",
        "@l": "Debug",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "45ad88708c80ffbc",
        "@st": "2024-11-09T08:52:00.3518087Z",
        "@ps": "597f1ea1abb4908c",
        "@sk": "Internal",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "Operation": "Handle",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "ITC.AppStore.Logic.Services.BlobsCache.BlobsCacheService",
        "UserId": null,
        "UserName": null
    },
    {
        "@t": "2024-11-09T08:52:00.4046471Z",
        "@mt": "PUB {Subject} {ReplyTo}",
        "@i": "9d2be71e",
        "@l": "Verbose",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "27050fc7ad59b624",
        "@st": "2024-11-09T08:52:00.4046372Z",
        "@ps": "45ad88708c80ffbc",
        "@sk": "Producer",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "EventId": {
            "Id": 1005,
            "Name": "Protocol"
        },
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "Operation": "$JS.API publish",
        "ReplyTo": "_INBOX.LRPdxhtE7gn57Y842RjYDS.2enPB8kn0BIUVfMuWHDbQH",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "NATS.Client.Core.Commands.CommandWriter",
        "Subject": "$JS.API.STREAM.INFO.OBJ_APP-STORE-ASSETS",
        "UserId": null,
        "UserName": null,
        "messaging.client_id": "14141",
        "messaging.destination.name": "$JS.API.STREAM.INFO.OBJ_APP-STORE-ASSETS",
        "messaging.nats.message.reply_to": "_INBOX.LRPdxhtE7gn57Y842RjYDS.2enPB8kn0BIUVfMuWHDbQH",
        "messaging.operation": "publish",
        "messaging.system": "nats",
        "network.local.address": "192.168.1.3",
        "network.peer.address": "0.0.0.0",
        "network.peer.port": "4222",
        "network.protocol.name": "nats",
        "network.protocol.version": "1",
        "server.address": "0.0.0.0",
        "server.port": "4222"
    },
    {
        "@t": "2024-11-09T08:52:00.4415332Z",
        "@mt": "Uploading blob binary.",
        "@i": "3510e6e1",
        "@l": "Debug",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "45ad88708c80ffbc",
        "@st": "2024-11-09T08:52:00.3518087Z",
        "@ps": "597f1ea1abb4908c",
        "@sk": "Internal",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "NATSClientId": 14141,
        "Operation": "Handle",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "ITC.AppStore.Logic.Services.BlobsCache.BlobsCacheService",
        "UserId": null,
        "UserName": null
    },
    {
        "@t": "2024-11-09T08:52:00.4418472Z",
        "@mt": "PUB {Subject} {ReplyTo}", // removed alot of these
        "@i": "9d2be71e",
        "@l": "Verbose",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "98624746ed9eaf4e",
        "@st": "2024-11-09T08:52:00.4418407Z",
        "@ps": "45ad88708c80ffbc",
        "@sk": "Producer",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "EventId": {
            "Id": 1005,
            "Name": "Protocol"
        },
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "NATSClientId": 14141,
        "Operation": "$JS.API publish",
        "ReplyTo": "_INBOX.LRPdxhtE7gn57Y842RjYDS.XAuHLgcXANy11xlNUSXVIj",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "NATS.Client.Core.Commands.CommandWriter",
        "Subject": "$JS.API.STREAM.MSG.GET.OBJ_APP-STORE-ASSETS",
        "UserId": null,
        "UserName": null,
        "messaging.client_id": "14141",
        "messaging.destination.name": "$JS.API.STREAM.MSG.GET.OBJ_APP-STORE-ASSETS",
        "messaging.nats.message.reply_to": "_INBOX.LRPdxhtE7gn57Y842RjYDS.XAuHLgcXANy11xlNUSXVIj",
        "messaging.operation": "publish",
        "messaging.system": "nats",
        "network.local.address": "192.168.1.3",
        "network.peer.address": "0.0.0.0",
        "network.peer.port": "4222",
        "network.protocol.name": "nats",
        "network.protocol.version": "1",
        "server.address": "0.0.0.0",
        "server.port": "4222"
    },
    {
        "@t": "2024-11-09T08:52:05.2736371Z",
        "@mt": "PUB {Subject} {ReplyTo}",
        "@i": "9d2be71e",
        "@l": "Verbose",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "64de4bd8a955e184",
        "@st": "2024-11-09T08:52:05.2736243Z",
        "@ps": "45ad88708c80ffbc",
        "@sk": "Producer",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "EventId": {
            "Id": 1005,
            "Name": "Protocol"
        },
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "NATSClientId": 14141,
        "Operation": "$O.APP-STORE-ASSETS publish",
        "ReplyTo": "_INBOX.LRPdxhtE7gn57Y842RjYDS.cEXqY7hPYptxqTehJ2ucMF",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "NATS.Client.Core.Commands.CommandWriter",
        "Subject": "$O.APP-STORE-ASSETS.C.LRPdxhtE7gn57Y842RjahQ",
        "UserId": null,
        "UserName": null,
        "messaging.client_id": "14141",
        "messaging.destination.name": "$O.APP-STORE-ASSETS.C.LRPdxhtE7gn57Y842RjahQ",
        "messaging.nats.message.reply_to": "_INBOX.LRPdxhtE7gn57Y842RjYDS.cEXqY7hPYptxqTehJ2ucMF",
        "messaging.operation": "publish",
        "messaging.system": "nats",
        "network.local.address": "192.168.1.3",
        "network.peer.address": "0.0.0.0",
        "network.peer.port": "4222",
        "network.protocol.name": "nats",
        "network.protocol.version": "1",
        "server.address": "0.0.0.0",
        "server.port": "4222"
    },
    {
        "@t": "2024-11-09T08:52:29.6965943Z",
        "@mt": "End subscription {Reason}",
        "@i": "73e4dfcb",
        "@l": "Debug",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "45ad88708c80ffbc",
        "@st": "2024-11-09T08:52:00.3518087Z",
        "@ps": "597f1ea1abb4908c",
        "@sk": "Internal",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "EventId": {
            "Id": 1002,
            "Name": "Subscription"
        },
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "NATSClientId": 14141,
        "Operation": "Handle",
        "Reason": "Timeout",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "NATS.Client.Core.NatsSubBase",
        "UserId": null,
        "UserName": null
    },
    {
        "@t": "2024-11-09T08:52:41.5540831Z",
        "@mt": "An unhandled exception has occurred while executing the request.",
        "@i": "bd577466",
        "@l": "Error",
        "@x": "System.InvalidOperationException: The response headers cannot be modified because the response has already started.\r\n   at NATS.Client.Core.NatsHeaders.ThrowIfReadOnly() in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\NatsHeaders.cs:line 414\r\n   at NATS.Client.Core.NatsHeaders.set_Item(String key, StringValues value) in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\NatsHeaders.cs:line 140\r\n   at NATS.Client.Core.Internal.Telemetry.<>c.<AddTraceContextHeaders>b__5_0(Object carrier, String fieldName, String fieldValue) in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\Internal\\Telemetry.cs:line 90\r\n   at System.Diagnostics.LegacyPropagator.Inject(Activity activity, Object carrier, PropagatorSetterCallback setter)\r\n   at NATS.Client.Core.Internal.Telemetry.AddTraceContextHeaders(Activity activity, NatsHeaders& headers) in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\Internal\\Telemetry.cs:line 79\r\n   at NATS.Client.Core.NatsConnection.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsPubOpts opts, CancellationToken cancellationToken) in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\NatsConnection.Publish.cs:line 40\r\n   at NATS.Client.Core.NatsConnection.CreateRequestSubAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize`1 requestSerializer, INatsDeserialize`1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken) in H:\\Projects\\nats.net\\src\\NATS.Client.Core\\NatsConnection.RequestSub.cs:line 23\r\n   at NATS.Client.JetStream.NatsJSContext.PublishAsync[T](String subject, T data, INatsSerialize`1 serializer, NatsJSPubOpts opts, NatsHeaders headers, CancellationToken cancellationToken) in H:\\Projects\\nats.net\\src\\NATS.Client.JetStream\\NatsJSContext.cs:line 123\r\n   at NATS.Client.ObjectStore.NatsObjStore.PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) in H:\\Projects\\nats.net\\src\\NATS.Client.ObjectStore\\NatsObjStore.cs:line 606\r\n   at NATS.Client.ObjectStore.NatsObjStore.PutAsync(ObjectMetadata meta, Stream stream, Boolean leaveOpen, CancellationToken cancellationToken) in H:\\Projects\\nats.net\\src\\NATS.Client.ObjectStore\\NatsObjStore.cs:line 279\r\n   at ITC.AppStore.Logic.Services.BlobsCache.BlobsCacheService.CacheBlobAsync(Guid blobId, CancellationToken cancellationToken) in H:\\Projects\\ITC\\ITC.AppStore\\applications\\ITC.AppStore.Logic\\Services\\BlobsCache\\BlobsCacheService.cs:line 85",
        "@tr": "e17438ca9e786d3209b9c2f4750313c7",
        "@sp": "597f1ea1abb4908c",
        "@st": "2024-11-09T08:52:00.3442235Z",
        "@sk": "Server",
        "AgentId": null,
        "Application": "ITC.AppStore.API",
        "Assembly": "ITC.AppStore",
        "AssemblyVersion": "2024.4.0.0",
        "BlobExternalId": "HN48N94H90WMNlq4g1wIw",
        "BlobId": "ab68e4f3-35f2-4f92-b126-d6715ea4b0e1",
        "CachedBlobId": "0ef96986-7fba-464f-8759-5394e9fe5aaa",
        "CollectionExternalId": "a3abf43b-2cf6-4afd-aa20-8b4f1c30e21c",
        "ConnectionId": "0HN80JEI5D3PS",
        "ConnectionRemoteIp": "::1",
        "Environment": "Development",
        "EventId": {
            "Id": 1,
            "Name": "UnhandledException"
        },
        "ExceptionDetail": {
            "HResult": -2146233079,
            "Message": "The response headers cannot be modified because the response has already started.",
            "Source": "NATS.Client.Core",
            "TargetSite": "Void ThrowIfReadOnly()",
            "Type": "System.InvalidOperationException"
        },
        "HostExternalId": null,
        "HostName": null,
        "LogicRequest": "ITC.AppStore.Logic.Features.UserTasks.Commands.CreateUserTaskCommand",
        "NATSClientId": 14141,
        "Operation": "Microsoft.AspNetCore.Hosting.HttpRequestIn",
        "RequestId": "0HN80JEI5D3PS:00000002",
        "RequestPath": "/userTasks",
        "SourceContext": "Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware",
        "UserId": null,
        "UserName": null
    },
]

Not sure if it's relevant but in my case server is accessed through a slow VPN connection. Uploading takes about 40 sec. or so.

@uhfath
Copy link
Contributor Author

uhfath commented Nov 9, 2024

I believe the issue is in this method:

private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)

Specifically:

var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken);

It uses a statically defined NatsRollupHeaders:

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

Which is when passed to NatsConnection.PublishAsync through series of calls, starting with:

private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)

Is marked as read only here:

And on subsequent calls it is reused again but already as read only. Thus we get an exception here:

private void ThrowIfReadOnly()

Am I correct in my assumptions?
If so a simple fix could be just removing static from NatsRollupHeaders definition since it is not used anywhere else.

@uhfath
Copy link
Contributor Author

uhfath commented Nov 9, 2024

Here is a repro project.
https://github.com/uhfath/TestNATSObjectStore

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant