Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADO.NET Streaming Provider #8974

Merged
merged 91 commits into from
May 23, 2024
Merged

Conversation

JorgeCandeias
Copy link
Contributor

@JorgeCandeias JorgeCandeias commented May 4, 2024

This PR adds an ADO.NET Streaming provider.

This provider is intended to be a first-class sibling to the other ADO.NET providers such as Clustering, Persistence and Reminders.

Big Note

Only the Microsoft SQL Server queries are defined as of now. This is causing some provider tests to fail as they expect all RDBMS scripts to be available. My personal experience lies in SQL Server far more than others so any help with them is welcome.

Update: MariaDB/MySQL queries now defined.

Update: PostegreSQL queries now defined.

Approach

The idea here is to have "Azure Queue" like behaviour on top of a database and little else. This provider does not attempt, nor is it capable of, attaining Eventhub-like performance or capability. It does attempt to attain the best possible performance one can have in a database by using regular objects by default and some custom sql magic.

The easy:

  • This design is going for queue-like behaviour, not rewindable-hub-like behaviour. This means message archiving and point-in-time-subscriptions need not be supported.

The not so easy:

  • Concurrent batch queries against the same table are prone to deadlocks. These cause error noise and severe delays, which can hinder the functionality of the provider to the point of uselessness. To avert this occurrence, resource locks (for rows in this context) must be acquired in the same order across all queries without exception. While this is easy to induce under the correct conditions, and this provider creates those conditions, such conditions are brittle to change. User interference with the queries may make them vulnerable to deadlocks again.
  • The alternative to the above is to acquire and handle only one resource at a time. This has the obvious downturn in performance, but is not too unlike what a cloud work queue does either. This behaviour can be induced by the user by reducing the batch size to 1, though ideally any user changes to the queries will keep the need for consistent ordering in mind.
  • Another alternative is to update the queries to always use table locks. Again, less perf but guaranteed safety.
  • A shared database is an expensive resource and must be handled with care to avoid accidental stampedes. Therefore, although the provider fully supports the concept of multiple "queues" (akin to the azure provider, which in turn creates more pulling agents, one per queue), the default partition count is set to 1 for safety. The user is free to raise this number as desired and it will have the desired effect.

Design Overview

  • The lower layer maintains three tables: OrleansStreamMessage, OrleansStreamDeadLetter and OrleansStreamControl.
  • OrleansStreamMessage holds all transient messages. New messages are added to this table upon "queueing", marked appropriately upon "dequeuing" and deleted upon successful confirmation.
  • OrleansStreamDeadLetter is where undelivered messages are evicted to, either through normal expiry or upon delivery failure.
  • OrleansStreamControl helps maintain the eviction schedule so it remains independent of cluster size.
  • The integration layer is by and large copied from the Azure Queue provider.
  • The lower layer also attempts to mimic a standard cloud queue albeit with restrictions of an RDBMS.
  • As with the other providers, this provider supports data segregation via mapping ClusterOptions.ServiceId to a ServiceId column. This allows multiple deployments to share the same database without overlapping.
  • The underlying tables support multiple named adonet providers sharing the same database via mapping the provider name to a ProviderId column.
  • In addition, the provider supports the use of multiple "queues" (therefore multiple pulling agents), via the automatic mapping of the underlying Orleans QueueId to an appropriate QueueId column.

Implementation Details

Tables:

OrleansStreamMessage

CREATE TABLE [OrleansStreamMessage]
(
	[ServiceId] NVARCHAR(150) NOT NULL,
        [ProviderId] NVARCHAR(150) NOT NULL,
	[QueueId] NVARCHAR(150) NOT NULL,
	[MessageId] BIGINT NOT NULL,

	[Dequeued] INT NOT NULL,
	[VisibleOn] DATETIME2(7) NOT NULL,
	[ExpiresOn] DATETIME2(7) NOT NULL,
	[CreatedOn] DATETIME2(7) NOT NULL,
	[ModifiedOn] DATETIME2(7) NOT NULL,
	[Payload] VARBINARY(MAX) NULL,

	CONSTRAINT [PK_OrleansStreamMessage] PRIMARY KEY CLUSTERED
	(
		[ServiceId] ASC,
                [ProviderId] ASC,
		[QueueId] ASC,
		[MessageId] ASC
	)
);

This table holds all the messages waiting to be delivered.

  • The unique identifier of a message is (ServiceId, ProviderId, QueueId, MessageId).
  • Due to RDMBS's lack of a partitioned sequence feature, the MessageId is implemented as a regular sequence. This technically makes the identifier above a non-candidate super-key (an undesirable anti-pattern) and allows for gaps in MessageId in each (ServiceId, ProviderId, QueueId) group. However, as the existence of gaps does not impact the implementation, there is no harm to this effect.
  • The table is clustered by the unique identifier to:
    • Allow quick identification and deletion of a message upon confirmation.
    • Cheapen deadlock free batch queries by allowing them to lock rows in a well-known already-persisted order.
  • Dequeued contains the number of times the message was dequeued. This also doubles a message pop receipt for confirmation/deletion. The deletion request is only successful if the caller is passing the same number it received.
  • VisibleOn is updated upon dequeing, to the time in the future at which the message will become dequeuable again.
    • When Dequeued reaches the MaxAttempts option and VisibleOn is also reached then the message is considered undequeuable and becomes eligible for eviction to dead letters.
  • ExpiresOn is set upon initial queueing to the time in the future upon which the message will no longer be dequeueable, regardless of Dequeued count. Messages where ExpiredOn is reached and VisibleOn is also reached, regardless of Dequeued become eligible for eviction to dead letters.
  • CreatedOn and ModifiedOn are for troubleshooting only and have no impact on logic.
  • Payload holds the binary serialized Orleans message container.

Important: To avoid deadlocks from concurrent batch queries against this table, all such queries are induced to lock data rows in the same order, and that order is the same order of the clustered index above.

OrleansStreamDeadLetter

CREATE TABLE [OrleansStreamDeadLetter]
(
	[ServiceId] NVARCHAR(150) NOT NULL,
        [ProviderId] NVARCHAR(150) NOT NULL,
	[QueueId] NVARCHAR(150) NOT NULL,
	[MessageId] BIGINT NOT NULL,
	[Dequeued] INT NOT NULL,
	[VisibleOn] DATETIME2(7) NOT NULL,
	[ExpiresOn] DATETIME2(7) NOT NULL,
	[CreatedOn] DATETIME2(7) NOT NULL,
	[ModifiedOn] DATETIME2(7) NOT NULL,
	[DeadOn] DATETIME2(7) NOT NULL,
	[RemoveOn] DATETIME2(7) NOT NULL,
	[Payload] VARBINARY(MAX) NULL,

	CONSTRAINT [PK_OrleansStreamDeadLetter] PRIMARY KEY CLUSTERED
	(
		[ServiceId] ASC,
                [ProviderId] ASC,
		[QueueId] ASC,
		[MessageId] ASC
	)
);

This table holds messages that failed to be successfully delivered, including:

  • Maximum number of attempts reached.
  • Expiration time reached.

Eviction happens in two occasions:

  • Single message eviction from OrleansStreamMessage to OrleansStreamDeadLetter is attempted by the stream failure component on a case-by-case basis.
  • Batch message eviction of any leftovers is opportunistic and performed in a timely fashion by the "dequeueing" query at regular intervals.

Columns are copied from the original row in OrleansStreamMessage as-is with two exceptions:

  • DeadOn holds the time at which the message was evicted to dead letters.
  • RemoveOn holds the time in the future at which the message will be deleted from the dead letters table itself.

OrleansStreamControl

CREATE TABLE [OrleansStreamControl]
(
	[ServiceId] NVARCHAR(150) NOT NULL,
        [ProviderId] NVARCHAR(150) NOT NULL,
	[QueueId] NVARCHAR(150) NOT NULL,
        [EvictOn] DATETIME2(7) NOT NULL,

	CONSTRAINT [PK_OrleansStreamControl] PRIMARY KEY CLUSTERED
	(
		[ServiceId] ASC,
                [ProviderId] ASC,
		[QueueId] ASC
	)
);

This table is designed to hold synchronization variables at queue level to help ensure any opportunistic scheduled task remains stable, regardless of cluster size. For now, the only such task is message eviction.

EvictOn: The time in the future after which the next opportunistic eviction task will run.

At runtime, the dequeuing queries will check this table, and, when EvictOn is reached, will attempt to win a race to update the old value to the next schedule in the future. The query that wins that race also gets to run the eviction query. Hence, the process is opportunistic, and guaranteed to run eventually, without the need for extra background workers.

Queries / Stored Procedures

QueueStreamMessage

CREATE PROCEDURE [QueueStreamMessage]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
	@Payload VARBINARY(MAX),
	@ExpiryTimeout INT

This query adds a new message to the OrleansStreamMessage table with the following behaviour:

  • Dequeued is set to zero.
  • VisibleOn, CreatedOn and ModifiedOn are set to "now".
  • ExpiresOn is set to "now" + @ExpiryTimeout (seconds).

GetStreamMessages

CREATE PROCEDURE [GetStreamMessages]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
        @MaxCount INT,
	@MaxAttempts INT,
	@VisibilityTimeout INT,
        @RemovalTimeout INT,
        @EvictionInterval INT,
        @EvictionBatchSize INT

This query performs a number of steps:

  • Performs opportunistic eviction as necessary.
  • Gets a batch of messages from OrleansStreamMessage where:
    • Dequeued is under MaxAttempts (options).
    • VisibleOn has been reached.
    • ExpiresOn has not been reached.
  • Marks the batch:
    • Increases Dequeued by 1.
    • Sets VisibleOn to now + @VisibilityTimeout (seconds)
    • Sets ModifiedOn to now.

ConfirmStreamMessages

CREATE PROCEDURE [ConfirmStreamMessages]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
        @Items NVARCHAR(MAX)

This query deletes specific messages from the OrleansStreamMessage.
A list of messages and their pop receipts (taken from the Dequeued column at the time of dequeueing) is passed in @Items in the form 1:2|3:4|5:6, where the first number is the message identifier and the second number is the dequeue count.
Row deletion only occurs if both numbers match in the table. Otherwise, the row is assumed to have been dequeued again and therefore left alone.

FailStreamMessage

CREATE PROCEDURE [FailStreamMessage]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
        @MessageId BIGINT,
	@MaxAttempts INT,
	@RemovalTimeout INT

This query applies failure logic to a single message in OrleansStreamMessage. It is called by the stream failure handler component upon message or subscription failure. The logic is:

  • If the message dequeue counter has not reached MaxAttempts (options) then the message is made visible again.
  • Otherwise the message is moved immediately to dead letters.

EvictStreamMessages

CREATE PROCEDURE [EvictStreamMessages]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
	@BatchSize INT,
	@MaxAttempts INT,
	@RemovalTimeout INT

This query performs opportunistic eviction of a batch of messages from OrleansStreamMessage to OrleansStreamDeadLetter if the eviction policy applies to them.
This query is called by GetStreamMessages at regular intervals.

EvictStreamDeadLetters

CREATE PROCEDURE [EvictStreamDeadLetters]
	@ServiceId NVARCHAR(150),
        @ProviderId NVARCHAR(150),
	@QueueId NVARCHAR(150),
	@BatchSize INT

This query performs opportunistic removal of dead letters from OrleansStreamDeadLetter.
This query is called by GetStreamMessages at regular intervals.

Integration Artefacts

Middleware streaming artefacts are by and large copied from the Azure Queue and SQS implementations.

  • AdoNetBatchContainer: The adonet flavour of IBatchContainer, same as other ones.
  • AdoNetQueueAdapter: The adonet IQueueAdapter implementation, which simply forwards requests to the RDBMS queries.
  • AdoNetQueueAdapterFactory: The adonet factory of IQueueAdapter instances. Due to the lack async lifetime, this class has some logic to avoid creating more than one instance of the relational queries object.
  • AdoNetQueueAdapterReceiver: The adonet implementation of IQueueAdapterReceiver, which simply forwards requests to the RDBMS queries.
  • AdoNetStreamFailureHandler: The adonet implementation of IStreamFailureHandler, which forwards failure notifications to the FailStreamMessage query.
  • AdoNetStreamQueueMapper: Maps Orleans StreamId and QueueId values to the appropriate QueueId column in the message table.

Benchmarks (SQL Server)

To be updated as tweaks are made.

QueueStreamMessage

Message "queueing" translates to inserting a new row at the end of a clustered table. This approaches an O(1) operation in principle, however with the expected inefficiencies:

  1. Latency correlated to payload size.
  2. Concurrent operations competing for writing into the transaction log.

The "queue id" abstraction appears to make no difference, given 2).


BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3593/23H2/2023Update/SunValley3)
12th Gen Intel Core i5-1240P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.205
  [Host] : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2

Job=InProcess  Toolchain=InProcessEmitToolchain  InvocationCount=1  
IterationCount=3  UnrollFactor=1  WarmupCount=1  

Method QueueCount PayloadSize Concurrency Mean Error StdDev
QueueStreamMessage 1 1000 1 2.427 ms 11.8353 ms 0.6487 ms
QueueStreamMessage 1 1000 4 3.631 ms 4.0146 ms 0.2201 ms
QueueStreamMessage 1 1000 8 5.039 ms 8.1941 ms 0.4491 ms
QueueStreamMessage 1 10000 1 2.546 ms 10.7113 ms 0.5871 ms
QueueStreamMessage 1 10000 4 4.217 ms 7.0977 ms 0.3890 ms
QueueStreamMessage 1 10000 8 5.857 ms 3.1351 ms 0.1718 ms
QueueStreamMessage 1 100000 1 4.417 ms 13.0434 ms 0.7150 ms
QueueStreamMessage 1 100000 4 9.071 ms 0.6015 ms 0.0330 ms
QueueStreamMessage 1 100000 8 14.487 ms 7.7452 ms 0.4245 ms
QueueStreamMessage 4 1000 1 2.241 ms 9.5473 ms 0.5233 ms
QueueStreamMessage 4 1000 4 3.536 ms 10.6093 ms 0.5815 ms
QueueStreamMessage 4 1000 8 4.871 ms 6.6343 ms 0.3636 ms
QueueStreamMessage 4 10000 1 2.383 ms 5.7473 ms 0.3150 ms
QueueStreamMessage 4 10000 4 4.093 ms 7.9349 ms 0.4349 ms
QueueStreamMessage 4 10000 8 5.884 ms 5.1694 ms 0.2834 ms
QueueStreamMessage 4 100000 1 4.871 ms 9.6464 ms 0.5288 ms
QueueStreamMessage 4 100000 4 9.088 ms 1.0694 ms 0.0586 ms
QueueStreamMessage 4 100000 8 14.302 ms 2.6907 ms 0.1475 ms
QueueStreamMessage 8 1000 1 2.343 ms 10.0013 ms 0.5482 ms
QueueStreamMessage 8 1000 4 3.597 ms 6.4206 ms 0.3519 ms
QueueStreamMessage 8 1000 8 4.999 ms 6.3607 ms 0.3487 ms
QueueStreamMessage 8 10000 1 2.472 ms 8.3953 ms 0.4602 ms
QueueStreamMessage 8 10000 4 4.517 ms 11.1712 ms 0.6123 ms
QueueStreamMessage 8 10000 8 5.934 ms 6.7816 ms 0.3717 ms
QueueStreamMessage 8 100000 1 4.611 ms 9.1420 ms 0.5011 ms
QueueStreamMessage 8 100000 4 8.902 ms 1.9354 ms 0.1061 ms
QueueStreamMessage 8 100000 8 14.486 ms 5.8374 ms 0.3200 ms

GetStreamMessages

"Dequeing" messages is a multi-step process in a SQL query plan:

  1. Identify the correct leaf list given the (ServiceId, ProviderId, QueueId) tuple in the clustered table (In a binary tree, this approaches O(log(K)) where K is the number of distinct keys)
  2. From the linked list identified by 1), scan through the list, skipping non-visible rows, until enough rows are identified, marking rows non-visible as they are found, up to the batch size.

The cost of 2) is comprised of O(S) where S is the number of skipped rows, plus O(B) where B is the batch size.
At best case, the cost is only O(B) when no rows are skipped.
At worst case O(S) = O(N) where N is the size of leaf list itself (or the table itself if only one queue is used).

In a healthy scenario, where B rows are marked and then deleted in a stable loop, this implementation will approach the best case of O(log(K)) + O(B).

In an unhealthy scenario, where rows are marked but not deleted, this implementation will march towards O(N) where N is the count of rows for the given tuple.

Given the effect above, having multiple "queue id" values may help improve dequeuing performance in a cluster demanding high throughput regardless of poison pills or consumer stability, by partitioning the leaf lists and reducing overhead from skipped rows.

Note that QueueCount also means Concurrency in the dequeuing benchmark below.


BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3593/23H2/2023Update/SunValley3)
12th Gen Intel Core i5-1240P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.205
  [Host]     : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
  Job-FHPUWH : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2

InvocationCount=1  IterationCount=3  UnrollFactor=1  
WarmupCount=1  

Method QueueCount PayloadSize BatchSize FullnessRatio Mean Error StdDev
GetStreamMessages 1 10000 1 0 879.2 μs 1,463.9 μs 80.24 μs
GetStreamMessages 1 10000 1 0.5 1,903.1 μs 4,762.8 μs 261.07 μs
GetStreamMessages 1 10000 1 1 2,931.8 μs 11,783.7 μs 645.91 μs
GetStreamMessages 1 10000 16 0 1,169.5 μs 5,274.0 μs 289.09 μs
GetStreamMessages 1 10000 16 0.5 7,068.0 μs 5,357.2 μs 293.65 μs
GetStreamMessages 1 10000 16 1 10,318.7 μs 1,287.4 μs 70.57 μs
GetStreamMessages 1 10000 32 0 755.6 μs 407.3 μs 22.33 μs
GetStreamMessages 1 10000 32 0.5 11,937.8 μs 1,197.6 μs 65.64 μs
GetStreamMessages 1 10000 32 1 18,236.6 μs 6,146.3 μs 336.90 μs
GetStreamMessages 4 10000 1 0 1,584.6 μs 7,606.1 μs 416.91 μs
GetStreamMessages 4 10000 1 0.5 3,019.9 μs 4,734.3 μs 259.50 μs
GetStreamMessages 4 10000 1 1 4,535.6 μs 6,575.8 μs 360.44 μs
GetStreamMessages 4 10000 16 0 1,299.5 μs 413.8 μs 22.68 μs
GetStreamMessages 4 10000 16 0.5 10,913.4 μs 7,305.4 μs 400.43 μs
GetStreamMessages 4 10000 16 1 16,899.9 μs 1,671.9 μs 91.64 μs
GetStreamMessages 4 10000 32 0 1,665.1 μs 6,011.5 μs 329.51 μs
GetStreamMessages 4 10000 32 0.5 18,987.9 μs 1,642.4 μs 90.03 μs
GetStreamMessages 4 10000 32 1 29,302.9 μs 4,583.8 μs 251.25 μs
GetStreamMessages 8 10000 1 0 2,044.6 μs 3,070.2 μs 168.29 μs
GetStreamMessages 8 10000 1 0.5 4,284.7 μs 4,326.1 μs 237.13 μs
GetStreamMessages 8 10000 1 1 6,006.3 μs 3,535.4 μs 193.79 μs
GetStreamMessages 8 10000 16 0 2,134.3 μs 2,776.8 μs 152.20 μs
GetStreamMessages 8 10000 16 0.5 16,383.5 μs 6,648.7 μs 364.44 μs
GetStreamMessages 8 10000 16 1 24,770.0 μs 15,440.0 μs 846.32 μs
GetStreamMessages 8 10000 32 0 2,135.7 μs 2,950.9 μs 161.75 μs
GetStreamMessages 8 10000 32 0.5 28,778.3 μs 11,489.8 μs 629.79 μs
GetStreamMessages 8 10000 32 1 45,012.4 μs 29,096.1 μs 1,594.86 μs
Microsoft Reviewers: Open in CodeFlow

@benjaminpetit
Copy link
Member

We have the same issue with #8931 (I think). I think it would be safer for now to revert to 8.0.33 for the time being. Thought @ReubenBond ?

@ReubenBond
Copy link
Member

@benjaminpetit agreed, but we can't stop end users from upgrading unless we set an upper bound. We will need to investigate further and fix

@JorgeCandeias
Copy link
Contributor Author

Lacking control over driver versions, we could wrap all SQL calls with .WaitAsync(token), as a protective measure, to guarantee responsiveness to the caller. Or even WaitAsync(timeout, token) if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.

@JorgeCandeias
Copy link
Contributor Author

After further testing, here are some funny findings:

  • I observed that running against the latest version of MariaDB (11.3.2) in a docker desktop instance (as opposed to a locally installed instance, also latest) still caused deadlocks even with v8.4 drivers. Tested x3.
  • I proceeded to pull MariaDB 10.6, as configured in the repository workflow file. Retested x3. No deadlocks.
  • I proceeded to sync the branch to downgrade the MySQL driver back to 8.0.31. Retested x3. No deadlocks... from the driver. However, these showed up on the engine:
image

The above queries stayed waiting for a schema lock forever in order to drop the database. The commands eventually time out on the client side and therefore newer tests start attempting to drop the database again. What was holding these, who knows, probably each other.

I suspect this happens due to a combination of buggy pooled connection recovery plus dropping the database all the time. So to reduce the likelihood of this happening I made the following code run before all tests:

image

Tested again x3. No deadlocks, all green.

I also observed in passing that a deadlock will happen quickly once the pool limit is reached but rarely before. Therefore I suspect the concurrency limit I added to the more aggressive tests may be keeping it at bay. And I also suspect the other tests never reach this concurrency to begin with, at least I never observed it. However that limit didn't seem to help the v8.4 drivers against MariaDB 11.3.2 in docker desktop. Neither did it help with the lack of stable connection recovery after a database drop. Only clearing the pool did that.

Anyway, it appears whatever regression there was in the driver, engine, or the interaction between both, doesn't appear to be fully fixed.

I'm not invested enough in MySQL to go down this rabbit hole any further so I'm happy with leaving the old versions as-is.

@veikkoeeva
Copy link
Contributor

veikkoeeva commented May 21, 2024

As discussed in dev channel, I too finally managed to take a look. Looks good, it was a good discussion too.

As for MySQL/MariaDB observations, indeed the RelationalStorage implementation has handling on the official Oracle connector that it just deadlocks on all versions. It might have been sensible to make it apply to any connector and be version based, but it wasn't possible back then. One option would be to ignore such things and tell "tough luck" for developers. But the aim was to make it a core piece of a software-intensive, dependable system, so seem like near zero price to pay to see if such issues could be fixed. SQL Server (the old one) and Postgres have had some intermittent deadlock issues, Oracle MySQL one seem to be persistent..?

The storage tests are capped regaring concurrency also, but due to exhausting the ThreadPool in the CI during high-concurrency banging the DB in tests more than other reasons.

I'll cross-reference #634 as this long-standing dream is becoming real. :)

@veikkoeeva
Copy link
Contributor

Lacking control over driver versions, we could wrap all SQL calls with .WaitAsync(token), as a protective measure, to guarantee responsiveness to the caller. Or even WaitAsync(timeout, token) if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.

Or we could surface a configuration option to do that for the developer (see my previous). Not my call to make, but I provide the rational for why it was done it was done in https://github.com/dotnet/orleans/blob/main/src/AdoNet/Shared/Storage/RelationalStorage.cs#L273.

@JorgeCandeias
Copy link
Contributor Author

JorgeCandeias commented May 21, 2024

@veikkoeeva Thanks a lot for reviewing. Indeed I'm not sure what do to with MySQL either. I'll leave it to @ReubenBond to decide. It seems stable enough for now at least, with enough band aids around it. Yet I do hope no one has the unfortunate idea of running this provider on a MySQL database regardless of deadlocks. The implementation is quite inefficient due to the limitations of the engine and is just asking for trouble either way.

Not necessarily for this PR, but I was looking at a way to make this provider rewindable.
This would improve its ability to support projections with less fuss required in the Orleans code.

From the point of view of SQL artefacts, it looks straightforward:

  1. Have another table OrleansStreamArchive (or *Delivered, or *History, etc, naming things is hard).
  2. Update FailStreamMessage to optionally move the delivered message to that table instead of deleting it.
  3. Update GetStreamMessages to take an @AfterMessageId parameter with the MessageId after which to get messages from. If the parameter is specified then all include all appropriate messages from the archive table.

One issue here arises from the StreamIds being distributed across the QueueIds to support a fixed set of pulling agents. I understand why this is required yet it gets in the way of efficient rewinding. Even if you know what QueueId to target, you may still potentially touch far more foreign messages than the ones that belong to that particular StreamId you're rewinding.

A solution here is to also store the StreamIds as part of the clustered key. This would not change cardinality, nor affect the queue-based pulling agent, and would allow rewinding a particular StreamId without touching anything else.

However I'm having trouble identifying the correct bits in the streaming middleware to plug into. The only rewindable provider I see is EventHubs and its implementation looks tightly coupled to how EventHubs itself works, including having its own cache.

Do you have any suggestions on how to proceed with this without causing too much havoc? @ReubenBond maybe?

@JorgeCandeias
Copy link
Contributor Author

As noted on discord, this PR is now ready for formal review.

@ReubenBond ReubenBond changed the title Adonet Streaming Provider ADO.NET Streaming Provider May 22, 2024
@ReubenBond
Copy link
Member

@benjaminpetit is taking a look

@benjaminpetit benjaminpetit merged commit fd89211 into dotnet:main May 23, 2024
18 checks passed
@benjaminpetit
Copy link
Member

Many thanks!

@JorgeCandeias
Copy link
Contributor Author

@benjaminpetit Wow did you just merge this without comment? You're more confident than I am! 😅

@JorgeCandeias JorgeCandeias deleted the adonet-streaming branch May 23, 2024 19:24
@benjaminpetit
Copy link
Member

benjaminpetit commented May 23, 2024

Well... that's embarrassing. I merged the wrong PR 😓 But I started to review it few days ago.

I still think it's a worthy addition for a preview, so let's keep it.

I will open a PR to mark this new package "prerelease" or "preview", if that's ok with you?

@JorgeCandeias
Copy link
Contributor Author

That's totally fine. Cheers.

@github-actions github-actions bot locked and limited conversation to collaborators Jun 23, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants