Skip to content

Commit

Permalink
feat: Enable MongoDB collection sharding (#760)
Browse files Browse the repository at this point in the history
# Motivation

Following aneoconsulting/ArmoniK.Infra#168, this
PR aims to enable the deployment of a sharded MongoDB as ArmoniK's
database.

A more complete motivation is provided in AEP 004
(aneoconsulting/ArmoniK.Community#48). This PR
is meant to treat a previsible bottleneck on ArmoniK's database by
enabling a sharded architecture to it.

For now, when a sharded database is specified, the following collections
will be sharded :
- Result
- TaskData
- SessionData

# Description

This PR adds : 
- A method `ShardCollectionAsync` to the `IMongoDataModelMapping`
interface.
- 2 new `MongoOptions` : a string `AuthSource` and a boolean `Sharding`.
- A new class `ShardingExt` containing an extension method
`shardCollection` for the `IClientSessionHandle` interface.

When the `MongoOption` `Sharding` is true, the `MongoCollectionProvider`
calls `ShardCollectionAsync`. Then the implementation depends on whether
the collection is wanted to be sharded :
- If the collection has to be sharded, the `shardCollection` extension
method will be called.
- If the collection isn't wanted to be sharded, the method directly
returns a complete `Task`.

The new `MongoOption` `AuthSource` is required because the `MongoClient`
has to authenticate as an administrator to be able to shard a
collection.

# Testing

It has been tested manually. Since these developmentsts are very coupled
with ArmoniK's database, it is difficult to write unit tests. Anyhow, it
is still possible to have automatized integration tests that would
verify in a first time if a deployment of ArmoniK using these
deployments succeeds, and in a second time if sharding is indeed enabled
on the right collections. Workflows testing a deployment of ArmoniK with
a sharded MongoDB are currently being studied.

# Impact

In the end, it is expected to enhance ArmoniK's strong scalability.

# Additional Information

To better understand how MongoDB sharding works see [MongoDB's
documentation](https://www.mongodb.com/docs/manual/sharding/).

To understand why this new MongoOption is required, see : MongoDB's
documentation on the [shardCollection database
command](https://www.mongodb.com/docs/manual/reference/command/shardCollection/#mongodb-dbcommand-dbcmd.shardCollection)
and on [connection string's authSource
option](https://www.mongodb.com/docs/manual/reference/connection-string-options/#mongodb-urioption-urioption.authSource)

# Checklist

- [X] My code adheres to the coding and style guidelines of the project.
- [X] I have performed a self-review of my code.
- [X] I have commented my code, particularly in hard-to-understand
areas.
- [ ] I have made corresponding changes to the documentation.
- [X] I have thoroughly tested my modifications and added tests when
necessary.
- [X] Tests pass locally and in the CI.
- [ ] I have assessed the performance impact of my modifications.
  • Loading branch information
aneojgurhem authored Oct 1, 2024
2 parents b074b61 + 89eb732 commit 2cb6681
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 2 deletions.
3 changes: 3 additions & 0 deletions Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ public interface IMongoDataModelMapping<T>
Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<T> collection,
Options.MongoDB options);

Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options);
}
27 changes: 26 additions & 1 deletion Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,34 @@ await Task.Delay(1000 * indexRetry,
}
}

if (options.Sharding)
{
for (var indexRetry = 1; indexRetry < options.MaxRetries; indexRetry++)
{
lastException = null;
try
{
await model.ShardCollectionAsync(session,
options)
.ConfigureAwait(false);
break;
}
catch (Exception ex)
{
lastException = ex;
logger.LogDebug(ex,
"Retrying to shard {CollectionName} collection",
model.CollectionName);
await Task.Delay(1000 * indexRetry,
cancellationToken)
.ConfigureAwait(false);
}
}
}

if (lastException is not null)
{
throw new TimeoutException($"Init Indexes for {model.CollectionName}: Max retries reached",
throw new TimeoutException($"Init Index or shard for {model.CollectionName}: Max retries reached",
lastException);
}

Expand Down
4 changes: 4 additions & 0 deletions Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;
}
7 changes: 6 additions & 1 deletion Adaptors/MongoDB/src/Options/MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class MongoDB

public QueueStorage QueueStorage { get; set; } = new();

public int MaxConnectionPoolSize { get; set; } = 500;
public int MaxConnectionPoolSize { get; set; } = 500;

public TimeSpan ServerSelectionTimeout { get; set; } = TimeSpan.FromMinutes(2);

public bool Sharding { get; set; }

public string AuthSource { get; set; } = "";
}
5 changes: 5 additions & 0 deletions Adaptors/MongoDB/src/ServiceCollectionExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public static IServiceCollection AddMongoClient(this IServiceCollection services
mongoOptions.DatabaseName);
}

if (!string.IsNullOrEmpty(mongoOptions.AuthSource))
{
connectionString = $"{connectionString}?authSource={mongoOptions.AuthSource}";
}

var settings = MongoClientSettings.FromUrl(new MongoUrl(connectionString));
settings.AllowInsecureTls = mongoOptions.AllowInsecureTls;
settings.UseTls = mongoOptions.Tls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> await sessionHandle.shardCollection(options,
CollectionName)
.ConfigureAwait(false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> await sessionHandle.shardCollection(options,
CollectionName)
.ConfigureAwait(false);
}
52 changes: 52 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/ShardingExt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Collections.Generic;
using System.Threading.Tasks;

using MongoDB.Bson;
using MongoDB.Driver;

namespace ArmoniK.Core.Adapters.MongoDB.Table.DataModel;

public static class ShardingExt
{
public static async Task shardCollection(this IClientSessionHandle sessionHandle,
Options.MongoDB options,
string collectionName)
{
var adminDB = sessionHandle.Client.GetDatabase("admin");
var shardingCommandDict = new Dictionary<string, object>
{
{
"shardCollection", $"{options.DatabaseName}.{collectionName}"
},
{
"key", new Dictionary<string, object>
{
{
"_id", "hashed"
},
}
},
};

var shardingCommand = new BsonDocumentCommand<BsonDocument>(new BsonDocument(shardingCommandDict));
await adminDB.RunCommandAsync(shardingCommand)
.ConfigureAwait(false);
}
}
6 changes: 6 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,10 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
=> await sessionHandle.shardCollection(options,
CollectionName)
.ConfigureAwait(false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Adapters.MongoDB.Options.MongoDB options)
=> Task.CompletedTask;
}

0 comments on commit 2cb6681

Please sign in to comment.