Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Datacleanup job to fix HasFrameData value #3097

Merged
merged 12 commits into from
Oct 19, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Dicom.Core.Configs;
using Microsoft.Health.Dicom.Core.Features.Operations;
using Microsoft.Health.Dicom.Core.Models.Operations;
using Microsoft.Health.Operations;

namespace Microsoft.Health.Dicom.Api.Features.BackgroundServices;

public class StartInstanceDataCleanupBackgroundService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<StartInstanceDataCleanupBackgroundService> _logger;
private readonly InstanceDataCleanupConfiguration _instanceDataCleanupConfiguration;

public StartInstanceDataCleanupBackgroundService(
IServiceProvider serviceProvider,
IOptions<InstanceDataCleanupConfiguration> options,
ILogger<StartInstanceDataCleanupBackgroundService> logger)
{
_serviceProvider = EnsureArg.IsNotNull(serviceProvider, nameof(serviceProvider));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
EnsureArg.IsNotNull(options, nameof(options));
_instanceDataCleanupConfiguration = options.Value;
}

[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Do not throw exceptions.")]
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
using IServiceScope scope = _serviceProvider.CreateScope();

IDicomOperationsClient operationsClient = scope.ServiceProvider.GetRequiredService<IDicomOperationsClient>();

// Get existing operation status
OperationCheckpointState<DicomOperation> existingInstance = await operationsClient.GetLastCheckpointAsync(_instanceDataCleanupConfiguration.OperationId, stoppingToken);

if (existingInstance == null)
{
_logger.LogInformation("No existing frame range fixing operation.");
}
else
{
_logger.LogInformation("Existing data cleanup operation is in status: '{Status}'", existingInstance.Status);
}

if (IsOperationInterruptedOrNull(existingInstance))
{
await operationsClient.StartInstanceDataCleanupOperationAsync(
_instanceDataCleanupConfiguration.OperationId,
_instanceDataCleanupConfiguration.StartTimeStamp,
_instanceDataCleanupConfiguration.EndTimeStamp,
stoppingToken);
}
else if (existingInstance.Status == OperationStatus.Succeeded)
{
_logger.LogInformation("Data cleanup operation with ID '{InstanceId}' has already completed successfully.", _instanceDataCleanupConfiguration.OperationId);
}
else
{
_logger.LogInformation("Data cleanup operation with ID '{InstanceId}' has already been started by another client.", _instanceDataCleanupConfiguration.OperationId);
}
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Unhandled exception while starting data cleanup operation.");
}
}

private static bool IsOperationInterruptedOrNull(OperationCheckpointState<DicomOperation> operation)
{
return operation == null
|| operation.Status == OperationStatus.Canceled
|| operation.Status == OperationStatus.Failed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public static IDicomServerBuilder AddBackgroundWorkers(this IDicomServerBuilder
EnsureArg.IsNotNull(serverBuilder, nameof(serverBuilder));
serverBuilder.Services.AddScoped<DeletedInstanceCleanupWorker>();
serverBuilder.Services.AddHostedService<DeletedInstanceCleanupBackgroundService>();
serverBuilder.Services.AddHostedService<StartInstanceDataCleanupBackgroundService>();

serverBuilder.Services
.AddCustomerKeyValidationBackgroundService(options => configuration
Expand Down Expand Up @@ -112,6 +113,7 @@ public static IDicomServerBuilder AddDicomServer(
services.AddSingleton(Options.Create(dicomServerConfiguration.Services.InstanceMetadataCacheConfiguration));
services.AddSingleton(Options.Create(dicomServerConfiguration.Services.FramesRangeCacheConfiguration));
services.AddSingleton(Options.Create(dicomServerConfiguration.Services.UpdateServiceSettings));
services.AddSingleton(Options.Create(dicomServerConfiguration.Services.DataCleanupConfiguration));

services.RegisterAssemblyModules(Assembly.GetExecutingAssembly(), dicomServerConfiguration);
services.RegisterAssemblyModules(typeof(InitializationModule).Assembly, dicomServerConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ public async Task<IReadOnlyDictionary<int, FrameRange>> GetInstanceFramesRangeAs
}
}

/// <inheritdoc />
public async Task<bool> DoesFrameRangeExistAsync(long version, CancellationToken cancellationToken)
{
BlockBlobClient blobClient = GetInstanceFramesRangeBlobClient(version);

return await ExecuteAsync(async t =>
{
Response<bool> response = await blobClient.ExistsAsync(cancellationToken);
return response.Value;
}, cancellationToken);
}

private BlockBlobClient GetInstanceFramesRangeBlobClient(long version, bool fallBackClient = false)
{
var blobName = fallBackClient ? _nameWithPrefix.GetInstanceFramesRangeFileNameWithSpace(version) : _nameWithPrefix.GetInstanceFramesRangeFileName(version);
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.Health.Dicom.Client/Models/DicomOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public enum DicomOperation
/// </summary>
Unknown,

/// <summary>
/// Specifies an data cleanup operation that cleans up instance data.
/// </summary>
DataCleanup,

/// <summary>
/// Specifies a reindexing operation that updates the indicies for previously added data based on new tags.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;

namespace Microsoft.Health.Dicom.Core.Configs;

public class InstanceDataCleanupConfiguration
{
/// <summary>
/// Gets or sets the operation id
/// </summary>
public Guid OperationId { get; set; } = Guid.Parse("f0a54b2a-eeca-4c45-af90-52ac15f6d486");

/// <summary>
/// Gets or sets the start time stamp for clean up
/// </summary>
public DateTimeOffset StartTimeStamp { get; set; } = new DateTimeOffset(new DateTime(2023, 06, 01, 0, 0, 0), TimeSpan.Zero);

/// <summary>
/// Gets or sets the end time stamp for clean up
/// </summary>
public DateTimeOffset EndTimeStamp { get; set; } = new DateTimeOffset(new DateTime(2023, 09, 30, 0, 0, 0), TimeSpan.Zero);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public class ServicesConfiguration
public FramesRangeCacheConfiguration FramesRangeCacheConfiguration { get; } = new FramesRangeCacheConfiguration();

public UpdateConfiguration UpdateServiceSettings { get; } = new UpdateConfiguration();

public InstanceDataCleanupConfiguration DataCleanupConfiguration { get; } = new InstanceDataCleanupConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,12 @@ Task<IReadOnlyDictionary<int, FrameRange>> GetInstanceFramesRangeAsync(
Task DeleteInstanceFramesRangeAsync(
long version,
CancellationToken cancellationToken = default);

/// <summary>
/// Returns true if the frame range exist for the given version
/// </summary>
/// <param name="version"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<bool> DoesFrameRangeExistAsync(long version, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,17 @@ public interface IDicomOperationsClient
/// </exception>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was canceled.</exception>
Task<OperationReference> StartUpdateOperationAsync(Guid operationId, UpdateSpecification updateSpecification, Partition partition, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously begins the clean up of instance data.
/// </summary>
/// <param name="operationId">The desired ID for the cleanup operation.</param>
/// <param name="startFilterTimeStamp">Start timestamp to filter instances.</param>
/// <param name="endFilterTimeStamp">End timestamp to filter instances.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns>
/// A task representing the <see cref="StartInstanceDataCleanupOperationAsync"/> operation.
/// </returns>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was canceled.</exception>
Task StartInstanceDataCleanupOperationAsync(Guid operationId, DateTimeOffset startFilterTimeStamp, DateTimeOffset endFilterTimeStamp, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,34 @@ Task<IReadOnlyList<InstanceMetadata>> GetInstanceIdentifierWithPropertiesAsync(
string seriesInstanceUid = null,
string sopInstanceUid = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously retrieves the specified number of instance batches filtered by timestamp.
/// </summary>
/// <param name="batchSize">The desired size of each batch.</param>
/// <param name="batchCount">The maximum number of batches.</param>
/// <param name="indexStatus">The index status</param>
/// <param name="startTimeStamp">Start filterstamp</param>
/// <param name="endTimeStamp">End filterstamp</param>
/// <param name="maxWatermark">An optional maximum watermark to consider.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.
/// </param>
/// <returns>
/// A task representing the asynchronous get operation. The value of its <see cref="Task{TResult}.Result"/>
/// property contains a list of batches as defined by their smallest and largest watermark.
/// The size of the collection is at most the value of the <paramref name="batchCount"/> parameter.
/// </returns>
/// <exception cref="ArgumentOutOfRangeException">
/// <paramref name="batchSize"/> or <paramref name="batchCount"/> is less than <c>1</c>.
/// </exception>
/// <exception cref="OperationCanceledException">The <paramref name="cancellationToken"/> was canceled.</exception>
Task<IReadOnlyList<WatermarkRange>> GetInstanceBatchesByTimeStampAsync(
int batchSize,
int batchCount,
IndexStatus indexStatus,
DateTimeOffset startTimeStamp,
DateTimeOffset endTimeStamp,
long? maxWatermark = null,
CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,14 @@ public interface IIndexDataStore
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous add operation.</returns>
Task EndUpdateInstanceAsync(int partitionKey, string studyInstanceUid, DicomDataset dicomDataset, IReadOnlyList<InstanceMetadata> instanceMetadataList, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously updates DICOM instance HasFrameMetadata to 1
/// </summary>
/// <param name="partitionKey">The partition key.</param>
/// <param name="versions">List of instances watermark to update</param>
/// <param name="hasFrameMetadata">Has additional frame range metadata stores.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that with list of instance metadata with new watermark.</returns>
Task UpdateFrameDataAsync(int partitionKey, IEnumerable<long> versions, bool hasFrameMetadata, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ public enum DicomOperation
/// </summary>
Unknown,

/// <summary>
/// Specifies an data cleanup operation that cleans up instance data.
/// </summary>
[IgnoreEnum]
DataCleanup,

/// <summary>
/// Specifies a reindexing operation that updates the indicies for previously added data based on new tags.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Microsoft.Health.Dicom.Core.Features.Model;
using Microsoft.Health.Operations.Functions.DurableTask;
using Newtonsoft.Json.Linq;

namespace Microsoft.Health.Dicom.Functions.DataCleanup;

public class DataCleanupCheckPoint : DataCleanupInput, IOrchestrationCheckpoint
{
public WatermarkRange? Completed { get; set; }

public DateTime? CreatedTime { get; set; }

public int? PercentComplete
{
get
{
if (Completed.HasValue)
{
WatermarkRange range = Completed.GetValueOrDefault();
return range.End == 1 ? 100 : (int)((double)(range.End - range.Start + 1) / range.End * 100);
}
return 0;
}
}
public IReadOnlyCollection<string> ResourceIds => null;

public object GetResults(JToken output) => null;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;

namespace Microsoft.Health.Dicom.Functions.DataCleanup;

public class DataCleanupInput
{
public BatchingOptions Batching { get; set; }

public DateTimeOffset StartFilterTimeStamp { get; set; }

public DateTimeOffset EndFilterTimeStamp { get; set; }
}
8 changes: 8 additions & 0 deletions src/Microsoft.Health.Dicom.Functions.App/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
}
},
"DicomFunctions": {
"DataCleanup": {
"MaxParallelThreads": 5,
"RetryOptions": {
"BackoffCoefficient": 3,
"FirstRetryInterval": "00:01:00",
"MaxNumberOfAttempts": 4
}
},
"Export": {
"BatchSize": 100,
"MaxParallelThreads": 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Microsoft.Health.Dicom.Core.Models.Operations;
using Microsoft.Health.Dicom.Core.Models.Update;
using Microsoft.Health.Dicom.Functions.Client.Extensions;
using Microsoft.Health.Dicom.Functions.DataCleanup;
using Microsoft.Health.Dicom.Functions.Export;
using Microsoft.Health.Dicom.Functions.Indexing;
using Microsoft.Health.Dicom.Functions.Update;
Expand Down Expand Up @@ -204,6 +205,26 @@ public async Task<OperationReference> StartUpdateOperationAsync(Guid operationId
return new OperationReference(operationId, _urlResolver.ResolveOperationStatusUri(operationId));
}

/// <inheritdoc/>
public async Task StartInstanceDataCleanupOperationAsync(Guid operationId, DateTimeOffset startFilterTimeStamp, DateTimeOffset endFilterTimeStamp, CancellationToken cancellationToken = default)
{
EnsureArg.IsGt(endFilterTimeStamp, startFilterTimeStamp, nameof(endFilterTimeStamp));

cancellationToken.ThrowIfCancellationRequested();

string instanceId = await _durableClient.StartNewAsync(
_options.DataCleanup.Name,
operationId.ToString(OperationId.FormatSpecifier),
new DataCleanupCheckPoint
{
Batching = _options.DataCleanup.Batching,
StartFilterTimeStamp = startFilterTimeStamp,
EndFilterTimeStamp = endFilterTimeStamp,
});

_logger.LogInformation("Successfully started data cleanup operation with ID '{InstanceId}'.", instanceId);
}

private async Task<T> GetStateAsync<T>(
Guid operationId,
Func<DicomOperation, DurableOrchestrationStatus, IOrchestrationCheckpoint, CancellationToken, Task<T>> factory,
Expand Down Expand Up @@ -242,6 +263,7 @@ private async Task<IReadOnlyCollection<Uri>> GetResourceUrlsAsync(
switch (type)
{
case DicomOperation.Export:
case DicomOperation.DataCleanup:
return null;
case DicomOperation.Reindex:
IReadOnlyList<Uri> tagPaths = Array.Empty<Uri>();
Expand All @@ -268,6 +290,7 @@ private async Task<IReadOnlyCollection<Uri>> GetResourceUrlsAsync(
private static IOrchestrationCheckpoint ParseCheckpoint(DicomOperation type, DurableOrchestrationStatus status)
=> type switch
{
DicomOperation.DataCleanup => status.Input?.ToObject<DataCleanupCheckPoint>() ?? new DataCleanupCheckPoint(),
DicomOperation.Export => status.Input?.ToObject<ExportCheckpoint>() ?? new ExportCheckpoint(),
DicomOperation.Reindex => status.Input?.ToObject<ReindexCheckpoint>() ?? new ReindexCheckpoint(),
DicomOperation.Update => status.Input?.ToObject<UpdateCheckpoint>() ?? new UpdateCheckpoint(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ internal class DicomFunctionOptions
[Required]
public DurableClientOptions DurableTask { get; set; }

[Required]
public FanOutFunctionOptions DataCleanup { get; set; }

[Required]
public FanOutFunctionOptions Export { get; set; }

Expand Down
Loading