Skip to content

Commit

Permalink
Fix #464 - add lock lifetime for all
Browse files Browse the repository at this point in the history
* Add HTTP timeout
* Make adjustable through options
* Will need to delete all locks from MongoDB - otherwise will endlessly loop for startup
* Fix some ci-e2e issues

Only use locking when accessing SMT model

Fix unit tests

Update to latest version of Machine

Fix bug where wrong id is used when starting a build

Remove reference to Serval.Shared in Serval.Machine.Shared
  • Loading branch information
johnml1135 committed Sep 10, 2024
1 parent e56cdf7 commit ffe2da5
Show file tree
Hide file tree
Showing 57 changed files with 1,189 additions and 1,225 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/ci-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ jobs:
- name: Install regctl
uses: iarekylew00t/regctl-installer@v1

- name: Getr Version of Machine.py
run: echo "MACHINE_PY_IMAGE=ghcr.io/sillsdev/machine.py:$(regctl image config ghcr.io/sillsdev/machine.py | jq -r ".config.Labels[\"org.opencontainers.image.version\"]")" >> $GITHUB_ENV
- name: Set proper version of Machine.py
run: |
export MACHINE_PY_IMAGE=ghcr.io/sillsdev/machine.py:$(regctl image config ghcr.io/sillsdev/machine.py | jq -r ".config.Labels[\"org.opencontainers.image.version\"]") && \
echo "MACHINE_PY_IMAGE=$MACHINE_PY_IMAGE" >> $GITHUB_ENV && \
echo "MACHINE_PY_CPU_IMAGE=$MACHINE_PY_IMAGE.cpu_only" >> $GITHUB_ENV
- name: Confirm proper version of Machine.py
run: |
echo $MACHINE_PY_IMAGE $MACHINE_PY_CPU_IMAGE
- name: Setup .NET
uses: actions/setup-dotnet@v3
Expand All @@ -50,6 +57,9 @@ jobs:
- name: Test
run: dotnet test --no-build --verbosity normal --filter "TestCategory!=slow&TestCategory=E2E" --collect:"Xplat Code Coverage"

- name: Debug network again
run: docker ps -a && docker logs --since 10m serval_cntr && docker logs --since 10m echo_cntr && docker logs --since 10m machine-engine-cntr && docker logs --since 10m serval-mongo-1 && docker logs --since 10m machine-job-cntr

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
Expand Down
2 changes: 1 addition & 1 deletion src/Machine/src/Serval.Machine.JobServer/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
{
"TranslationEngineType": "SmtTransfer",
"ModelType": "thot",
"Queue": "jobs_backlog",
"Queue": "jobs_backlog.cpu_only",
"DockerImage": "ghcr.io/sillsdev/machine.py:latest"
}
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Serval.Machine.Shared.Configuration;

public class DistributedReaderWriterLockOptions
{
public const string Key = "DistributedReaderWriterLock";

public TimeSpan DefaultLifetime { get; set; } = TimeSpan.FromSeconds(56); // must be less than DefaultHttpRequestTimeout
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC
return builder;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
Action<DistributedReaderWriterLockOptions> configureOptions
)
{
build.Services.Configure(configureOptions);
return build;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
IConfiguration config
)
{
build.Services.Configure<DistributedReaderWriterLockOptions>(config);
return build;
}

public static IMachineBuilder AddMessageOutboxOptions(
this IMachineBuilder builder,
Action<MessageOutboxOptions> configureOptions
Expand Down Expand Up @@ -360,6 +378,7 @@ public static IMachineBuilder AddServalTranslationEngineService(
{
options.Interceptors.Add<CancellationInterceptor>();
options.Interceptors.Add<UnimplementedInterceptor>();
options.Interceptors.Add<TimeoutInterceptor>();
});
builder.AddServalPlatformService(connectionString);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(o => { });
builder.AddSmtTransferEngineOptions(o => { });
builder.AddClearMLOptions(o => { });
builder.AddDistributedReaderWriterLockOptions(o => { });
builder.AddBuildJobOptions(o => { });
builder.AddMessageOutboxOptions(o => { });
}
Expand All @@ -37,6 +38,9 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(configuration.GetSection(SharedFileOptions.Key));
builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key));
builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key));
builder.AddDistributedReaderWriterLockOptions(
configuration.GetSection(DistributedReaderWriterLockOptions.Key)
);
builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key));
builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public class SmtTransferEngineOptions
public string EnginesDir { get; set; } = "translation_engines";
public TimeSpan EngineCommitFrequency { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan InactiveEngineTimeout { get; set; } = TimeSpan.FromMinutes(10);
public TimeSpan SaveModelTimeout { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan EngineCommitTimeout { get; set; } = TimeSpan.FromMinutes(2);
}
3 changes: 2 additions & 1 deletion src/Machine/src/Serval.Machine.Shared/Models/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public record Lock
{
public required string Id { get; init; }
public DateTime? ExpiresAt { get; init; }

public DateTime ExpiresAt { get; init; }
public required string HostId { get; init; }
}
7 changes: 3 additions & 4 deletions src/Machine/src/Serval.Machine.Shared/Models/RWLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ public record RWLock : IEntity
public bool IsAvailableForReading()
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& WriterQueue.Count == 0;
return (WriterLock is null || WriterLock.ExpiresAt <= now) && WriterQueue.Count == 0;
}

public bool IsAvailableForWriting(string? lockId = null)
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt is null || l.ExpiresAt > now)
return (WriterLock is null || WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt > now)
&& (lockId is null || WriterQueue.Count > 0 && WriterQueue[0].Id == lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public record TranslationEngine : IEntity
public required bool IsModelPersisted { get; init; }
public int BuildRevision { get; init; }
public Build? CurrentBuild { get; init; }
public bool? CollectTrainSegmentPairs { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
<PackageReference Include="Hangfire.Mongo" Version="1.9.10" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="6.0.14" />
<PackageReference Include="SIL.Machine" Version="3.2.6" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<PackageReference Include="SIL.Machine.Morphology.HermitCrab" Version="3.2.6" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<PackageReference Include="SIL.Machine.Translation.Thot" Version="3.2.6" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
<PackageReference Include="SIL.Machine" Version="3.2.7" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<PackageReference Include="SIL.Machine.Morphology.HermitCrab" Version="3.2.7" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<PackageReference Include="SIL.Machine.Translation.Thot" Version="3.2.7" Condition="!Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
<PackageReference Include="SIL.WritingSystems" Version="14.1.1" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
Expand Down
91 changes: 45 additions & 46 deletions src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public async Task DeleteEngineAsync(string engineId, CancellationToken cancellat

public async Task<bool> StartBuildJobAsync(
BuildJobRunnerType runnerType,
TranslationEngineType engineType,
string engineId,
string buildId,
BuildStage stage,
Expand All @@ -67,18 +68,9 @@ public async Task<bool> StartBuildJobAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.GetAsync(
e =>
e.EngineId == engineId
&& (e.CurrentBuild == null || e.CurrentBuild.JobState != BuildJobState.Canceling),
cancellationToken
);
if (engine is null)
return false;

IBuildJobRunner runner = _runners[runnerType];
string jobId = await runner.CreateJobAsync(
engine.Type,
engineType,
engineId,
buildId,
stage,
Expand All @@ -88,8 +80,17 @@ public async Task<bool> StartBuildJobAsync(
);
try
{
await _engines.UpdateAsync(
e => e.EngineId == engineId,
TranslationEngine? engine = await _engines.UpdateAsync(
e =>
e.EngineId == engineId
&& (
(stage == BuildStage.Preprocess && e.CurrentBuild == null)
|| (
stage != BuildStage.Preprocess
&& e.CurrentBuild != null
&& e.CurrentBuild.JobState != BuildJobState.Canceling
)
),
u =>
u.Set(
e => e.CurrentBuild,
Expand All @@ -105,6 +106,11 @@ await _engines.UpdateAsync(
),
cancellationToken: cancellationToken
);
if (engine is null)
{
await runner.DeleteJobAsync(jobId, CancellationToken.None);
return false;
}
await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken);
return true;
}
Expand All @@ -120,44 +126,36 @@ await _engines.UpdateAsync(
CancellationToken cancellationToken = default
)
{
TranslationEngine? engine = await _engines.GetAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
cancellationToken
// cancel a job that hasn't started yet
TranslationEngine? engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending,
u =>
{
u.Unset(b => b.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
},
returnOriginal: true,
cancellationToken: cancellationToken
);
if (engine is null || engine.CurrentBuild is null)
return (null, BuildJobState.None);

IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];

if (engine.CurrentBuild.JobState is BuildJobState.Pending)
if (engine is not null && engine.CurrentBuild is not null)
{
// cancel a job that hasn't started yet
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
u => u.Unset(b => b.CurrentBuild),
returnOriginal: true,
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
// job will be deleted from the queue
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.None);
}
// job will be deleted from the queue
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.None);
}
else if (engine.CurrentBuild.JobState is BuildJobState.Active)

// cancel a job that is already running
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Active,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling),
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
// cancel a job that is already running
engine = await _engines.UpdateAsync(
e => e.EngineId == engineId && e.CurrentBuild != null,
u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling),
cancellationToken: cancellationToken
);
if (engine is not null && engine.CurrentBuild is not null)
{
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.Canceling);
}
IBuildJobRunner runner = _runners[engine.CurrentBuild.BuildJobRunner];
await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None);
return (engine.CurrentBuild.BuildId, BuildJobState.Canceling);
}

return (null, BuildJobState.None);
Expand Down Expand Up @@ -193,6 +191,7 @@ public Task BuildJobFinishedAsync(
u =>
{
u.Unset(e => e.CurrentBuild);
u.Set(e => e.CollectTrainSegmentPairs, false);
if (buildComplete)
u.Inc(e => e.BuildRevision);
},
Expand Down
Loading

0 comments on commit ffe2da5

Please sign in to comment.