Skip to content

Commit

Permalink
Merge branch 'hotfix-3.0.6'
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Mar 12, 2018
2 parents 7ef7a57 + eb26141 commit 14aad2d
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/AcceptanceTestsHolder/AcceptanceTestsHolder.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@
<Compile Include="App_Packages\NSB.AcceptanceTests\Tx\When_receiving_with_the_default_settings.cs" />
<Compile Include="App_Packages\NSB.AcceptanceTests\Tx\When_sending_within_an_ambient_transaction.cs" />
<Compile Include="App_Packages\NSB.AcceptanceTests\Versioning\When_multiple_versions_of_a_message_is_published.cs" />
<Compile Include="App_Packages\When_using_outbox_but_no_sagas.cs" />
<Compile Include="App_Packages\When_using_different_persistence.cs" />
<Compile Include="Partials\When_Deferring_a_message.cs" />
<Compile Include="AssemblyInfo.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Features;
using NUnit.Framework;

[TestFixture]
public class When_using_outbox_but_no_sagas : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_be_able_to_start_the_endpoint()
{
// The EndpointsStarted flag is set by acceptance framework
var context = await Scenario.Define<Context>()
.WithEndpoint<OutboxEndpointWithSagasDisabled>()
.Done(c => c.EndpointsStarted)
.Run()
.ConfigureAwait(false);

Assert.True(context.EndpointsStarted);
}

public class Context : ScenarioContext
{
}

public class OutboxEndpointWithSagasDisabled : EndpointConfigurationBuilder
{
public OutboxEndpointWithSagasDisabled()
{
EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<Sagas>();
c.EnableOutbox();
});
}
}

public class StartSagaMessage : IMessage
{
public string Property { get; set; }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ declare @dummy int;
merge [TheSchema].[TheTablePrefixSubscriptionData] with (holdlock, tablock) as target
using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source
on target.Subscriber = source.Subscriber
and target.MessageType = source.MessageType
and ((target.Endpoint = source.Endpoint) or (target.Endpoint is null and source.endpoint is null))
and target.MessageType = source.MessageType
when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then
update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion
when not matched then
insert
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ values
@PersistenceVersion
)
on duplicate key update
Endpoint = @Endpoint,
Endpoint = coalesce(@Endpoint, Endpoint),
PersistenceVersion = @PersistenceVersion
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ begin
);
commit;
exception
when DUP_VAL_ON_INDEX
then ROLLBACK;
when DUP_VAL_ON_INDEX then
if :Endpoint is not null then
update "THETABLEPREFIXSS" set
Endpoint = :Endpoint,
PersistenceVersion = :PersistenceVersion
where
MessageType = :MessageType
and Subscriber = :Subscriber;
else
ROLLBACK;
end if;
end;
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ values
@PersistenceVersion
)
on conflict ("Id") do update
set "Endpoint" = @Endpoint,
set "Endpoint" = coalesce(@Endpoint, "public"."TheTablePrefixSubscriptionData"."Endpoint"),
"PersistenceVersion" = @PersistenceVersion
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"TransportAddress": "e@machine1",
"Endpoint": "e2"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"TransportAddress": "e@machine1",
"Endpoint": "endpoint"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"TransportAddress": "e@machine1",
"Endpoint": "endpoint"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,54 @@ public void Subscribe_duplicate_add()
#endif
}

[Test]
public void Subscribe_version_migration()
{
var persister = Setup(schema);
var type1 = new MessageType("type1", new Version(0, 0, 0, 0));
//NSB 5.x: endpoint is null
persister.Subscribe(new Subscriber("e@machine1", null), type1, null).Await();
//NSB 6.x: same subscriber now mentions endpoint
persister.Subscribe(new Subscriber("e@machine1", "endpoint"), type1, null).Await();
var result = persister.GetSubscribers(type1).Result.ToList();
Assert.IsNotEmpty(result);
#if NET452
ObjectApprover.VerifyWithJson(result);
#endif
}

[Test]
public void Subscribe_different_endpoint_name()
{
var persister = Setup(schema);
var type1 = new MessageType("type1", new Version(0, 0, 0, 0));
//NSB 6.x: old endpoint value
persister.Subscribe(new Subscriber("e@machine1", "e1"), type1, null).Await();
//NSB 6.x: same address, new endpoint value
persister.Subscribe(new Subscriber("e@machine1", "e2"), type1, null).Await();
var result = persister.GetSubscribers(type1).Result.ToList();
Assert.IsNotEmpty(result);
#if NET452
ObjectApprover.VerifyWithJson(result);
#endif
}

[Test]
public void Subscribe_should_not_downgrade()
{
var persister = Setup(schema);
var type1 = new MessageType("type1", new Version(0, 0, 0, 0));
//NSB 6.x: subscriber contains endpoint
persister.Subscribe(new Subscriber("e@machine1", "endpoint"), type1, null).Await();
//NSB 5.x: endpoint is null, don't want to remove endpoint value from table though
persister.Subscribe(new Subscriber("e@machine1", null), type1, null).Await();
var result = persister.GetSubscribers(type1).Result.ToList();
Assert.IsNotEmpty(result);
#if NET452
ObjectApprover.VerifyWithJson(result);
#endif
}

[Test]
public void Unsubscribe()
{
Expand Down
6 changes: 6 additions & 0 deletions src/SqlPersistence/SqlPersistenceStorageSessionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ public static Task<TSagaData> GetSagaData<TSagaData>(this SynchronizedStorageSes
Guard.AgainstNull(nameof(context), context);
Guard.AgainstNull(nameof(appendParameters), appendParameters);
Guard.AgainstNullAndEmpty(nameof(whereClause), whereClause);

var writableContextBag = (ContextBag)context;
var sqlSession = session.GetSqlStorageSession();

if (sqlSession.InfoCache == null)
{
throw new Exception("Cannot load saga data because the Sagas feature is disabled in the endpoint.");
}
return SagaPersister.GetByWhereClause<TSagaData>(whereClause, session, writableContextBag, appendParameters, sqlSession.InfoCache);
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/SqlPersistence/Subscription/SqlDialect_MsSqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ internal override string GetSubscriptionSubscribeCommand(string tableName)
merge {tableName} with (holdlock, tablock) as target
using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source
on target.Subscriber = source.Subscriber
and target.MessageType = source.MessageType
and ((target.Endpoint = source.Endpoint) or (target.Endpoint is null and source.endpoint is null))
and target.MessageType = source.MessageType
when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then
update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion
when not matched then
insert
(
Expand Down
2 changes: 1 addition & 1 deletion src/SqlPersistence/Subscription/SqlDialect_MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ insert into {tableName}
@PersistenceVersion
)
on duplicate key update
Endpoint = @Endpoint,
Endpoint = coalesce(@Endpoint, Endpoint),
PersistenceVersion = @PersistenceVersion
";
}
Expand Down
13 changes: 11 additions & 2 deletions src/SqlPersistence/Subscription/SqlDialect_Oracle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,17 @@ insert into {tableName}
);
commit;
exception
when DUP_VAL_ON_INDEX
then ROLLBACK;
when DUP_VAL_ON_INDEX then
if :Endpoint is not null then
update {tableName} set
Endpoint = :Endpoint,
PersistenceVersion = :PersistenceVersion
where
MessageType = :MessageType
and Subscriber = :Subscriber;
else
ROLLBACK;
end if;
end;
";
}
Expand Down
2 changes: 1 addition & 1 deletion src/SqlPersistence/Subscription/SqlDialect_PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ insert into {tableName}
@PersistenceVersion
)
on conflict (""Id"") do update
set ""Endpoint"" = @Endpoint,
set ""Endpoint"" = coalesce(@Endpoint, {tableName}.""Endpoint""),
""PersistenceVersion"" = @PersistenceVersion
";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ protected override void Setup(FeatureConfigurationContext context)
var isOutboxEnabledForSqlPersistence = settings.IsFeatureActive(typeof(SqlOutboxFeature));

SagaInfoCache infoCache = null;
if (isOutboxEnabledForSqlPersistence || isSagasEnabledForSqlPersistence)
if (isSagasEnabledForSqlPersistence)
{
infoCache = BuildSagaInfoCache(sqlDialect, settings);
}

if (isOutboxEnabledForSqlPersistence || isSagasEnabledForSqlPersistence)
{
//Info cache can be null if Outbox is enabled but Sagas are disabled.
container.ConfigureComponent(() => new SynchronizedStorage(connectionBuilder, infoCache), DependencyLifecycle.SingleInstance);
container.ConfigureComponent(() => new StorageAdapter(connectionBuilder, infoCache, sqlDialect), DependencyLifecycle.SingleInstance);
}
Expand Down

0 comments on commit 14aad2d

Please sign in to comment.