Skip to content

Commit

Permalink
feat: chunking indexing operation on passed items
Browse files Browse the repository at this point in the history
  • Loading branch information
aochmann committed Aug 11, 2020
1 parent a21bfeb commit ecd4ae2
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 29 deletions.
7 changes: 7 additions & 0 deletions Cogworks.AzureSearch/Extensions/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@ internal static class EnumerableExtensions
{
public static bool HasAny<T>(this IEnumerable<T> items)
=> items != null && items.Any();

public static IEnumerable<IEnumerable<T>> ChunkBy<T>(this IEnumerable<T> items, int chunkSize)
=> items
.Select((x, i) => new { Index = i, Value = x })
.GroupBy(x => x.Index / chunkSize)
.Select(x => x.Select(v => v.Value).ToList())
.ToList();
}
}
69 changes: 40 additions & 29 deletions Cogworks.AzureSearch/Repositories/AzureSearchRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class AzureSearchRepository<TAzureModel> : IAzureSearchRepository<TAzureM
private readonly ISearchIndexClient _searchIndex;
private readonly ISearchServiceClient _searchServiceClient;

private const int BatchOperationSize = 500;

public AzureSearchRepository(AzureIndexDefinition<TAzureModel> azureIndexDefinition, AzureSearchClientOption azureSearchClientOption)
{
_azureIndexDefinition = azureIndexDefinition;
Expand Down Expand Up @@ -128,26 +130,30 @@ public async Task<AzureBatchDocumentsOperationResult> AddOrUpdateDocumentsAsync(
};
}

var batchActions = models
var chunkedBatchActions = models
.Select(model => new IndexAction<TAzureModel>(model, IndexActionType.Upload))
.ChunkBy(BatchOperationSize)
.ToList();

var batch = IndexBatch.New(batchActions);

var indexResults = Enumerable.Empty<IndexingResult>();
var indexResults = new List<IndexingResult>();

try
foreach (var batchActions in chunkedBatchActions)
{
var result = await _searchIndex.Documents.IndexAsync(batch);
indexResults = result.Results;
}
catch (IndexBatchException indexBatchException)
{
indexResults = indexBatchException.IndexingResults;
}
catch (Exception exception)
{
// todo: handle it proper
var batch = IndexBatch.New(batchActions);

try
{
var result = await _searchIndex.Documents.IndexAsync(batch);
indexResults.AddRange(result.Results);
}
catch (IndexBatchException indexBatchException)
{
indexResults.AddRange(indexBatchException.IndexingResults);
}
catch (Exception exception)
{
// todo: handle it proper
}
}

return GetBatchOperationStatus(indexResults, "adding or updating");
Expand All @@ -173,25 +179,30 @@ public async Task<AzureBatchDocumentsOperationResult> TryRemoveDocumentsAsync(IE
};
}

var batchActions = models
var chunkedBatchActions = models
.Select(model => new IndexAction<TAzureModel>(model, IndexActionType.Delete))
.ChunkBy(BatchOperationSize)
.ToList();

var batch = IndexBatch.New(batchActions);
var indexResults = Enumerable.Empty<IndexingResult>();
var indexResults = new List<IndexingResult>();

try
{
var result = await _searchIndex.Documents.IndexAsync(batch);
indexResults = result.Results;
}
catch (IndexBatchException indexBatchException)
foreach (var batchActions in chunkedBatchActions)
{
indexResults = indexBatchException.IndexingResults;
}
catch (Exception exception)
{
// todo: handle it proper
var batch = IndexBatch.New(batchActions);

try
{
var result = await _searchIndex.Documents.IndexAsync(batch);
indexResults.AddRange(result.Results);
}
catch (IndexBatchException indexBatchException)
{
indexResults.AddRange(indexBatchException.IndexingResults);
}
catch (Exception exception)
{
// todo: handle it proper
}
}

return GetBatchOperationStatus(indexResults, "removing");
Expand Down

0 comments on commit ecd4ae2

Please sign in to comment.