Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Elsa 3.2.1 - Workflow instances aren't persisted when starting #6067

Closed
valsov opened this issue Oct 29, 2024 · 5 comments
Closed

[BUG] Elsa 3.2.1 - Workflow instances aren't persisted when starting #6067

valsov opened this issue Oct 29, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@valsov
Copy link

valsov commented Oct 29, 2024

Description

When starting a workflow instance, both manually from Elsa studio, or automatically with a CRON expression (configured in studio) with a configured Quartz scheduler; the instance isn't persisted to the database until it has completed execution.
This is concerning because if the server stops, a running workflow wont resume.

Steps to Reproduce

To help us identify the issue more quickly, please follow these guidelines:

  1. Detailed Steps:
    In Elsa studio:
  • Create a workflow definition
  • Add a start activity
  • Add another activity, for example an HTTP request and link it after the start activity
  • Publish the workflow definition
  • Execute the workflow with the button from Elsa studio
  • Behavior:
    • the page loads but only goes to the instance view once the workflow execution ends
    • while the workflow instance is executing, it can't be found in the database or in Elsa studio's instances page
  1. Code Snippets:
    Elsa server configuration code:
var dbConnectionString = configuration.GetConnectionString("WorkflowsDb");
var schedulerDbConnectionString = configuration.GetConnectionString("SchedulerDb");

var tokenSigningKey = configuration["BearerKey"];

var schedulerConfig = configuration.GetSection(SchedulerConfiguration.Configuration);
serviceCollection.Configure<SchedulerConfiguration>(schedulerConfig);

serviceCollection.AddElsa(elsa =>
{
    // Configure Management layer to use EF Core.
    elsa.UseWorkflowManagement(management =>
        management.UseEntityFrameworkCore(ef => ef.UseSqlServer(dbConnectionString))
    );

    // Configure Runtime layer to use EF Core.
    elsa.UseWorkflowRuntime(runtime =>
        runtime.UseEntityFrameworkCore(ef => ef.UseSqlServer(dbConnectionString))
    );

    // Identity features for authentication/authorization.
    elsa.UseIdentity(identity =>
    {
        identity.TokenOptions = options => options.SigningKey = tokenSigningKey;
    });

    // Configure ASP.NET authentication/authorization.
    elsa.UseDefaultAuthentication();

    // Expose Elsa API endpoints.
    elsa.UseWorkflowsApi();

    // Setup a SignalR hub for real-time updates from the server.
    elsa.UseRealTimeWorkflows();

    // Activities:
    elsa.UseExpressions();
    elsa.UseLiquid(x => x.FluidOptions = f => f.AllowConfigurationAccess = true);
    elsa.UseHttp();

    // Scheduling config.
    elsa.UseQuartz(x =>
    {
        x.ConfigureQuartz = config =>
        {
            var schedulerConfiguration = schedulerConfig.Get<SchedulerConfiguration>();
            config.SchedulerId = schedulerConfiguration.InstanceName;
            config.SchedulerName = schedulerConfiguration.InstanceName;
            config.UsePersistentStore(store => {
                store.UseNewtonsoftJsonSerializer();
                store.UseSqlServer(sql =>
                {
                    sql.ConnectionString = schedulerDbConnectionString;
                    sql.TablePrefix = schedulerConfiguration.TablePrefix;
                });
            });
            config.UseDefaultThreadPool(p => p.MaxConcurrency = 10);
        };
        x.ConfigureQuartzHostedService = config =>
        {
            config.WaitForJobsToComplete = true;
        };
    });
    elsa.UseScheduling(x => x.UseQuartzScheduler());

    elsa.AddSwagger();
});

serviceCollection.ConfigureAuthServices(configuration);
serviceCollection.AddControllers();
  1. Attachments:

  2. Reproduction Rate: every time, both with scheduled and manual execution from Elsa studio

  3. Video/Screenshots: N/A

  4. Additional Configuration:

    • Quartz scheduler is configured
    • Persistence is configured to use SQL Server with entity framework

Expected Behavior

I would expect the workflow instance to be persisted to the database when its execution starts, this way I can view its progress in the instances view in Elsa dashboard.

Actual Behavior

The instance is only persisted at its execution end. I checked IWorkflowInstanceStore, the workflow instances dashboard page and [WorkflowInstances] SQL table. The instance is nowhere to be found until it has finished executing.

Environment

  • Elsa Package Version: Elsa V3.2.1, from Nuget package
  • Operating System: Windows 10
  • Browser and Version: N/A

Troubleshooting Attempts

I looked for documentation on the subject but didn't find any. I tried to configure the workflow engine with multiple options, like manually configuring the workflow execution pipeline, but it didn't change the behavior.
It look like BackgroundWorkflowDispatcher doesn't create a workflow instance before executing it. This is strange because the MassTransit implementation of IWorkflowDispatcher (MassTransitWorkflowDispatcher) creates one. Am I missing something ?

Related Issues

I Found this related issue, but I don't have access to the code executing the workflows in my use cases:
#5832

@valsov valsov added the bug Something isn't working label Oct 29, 2024
@valsov valsov changed the title [BUG] Workflow instances aren't persisted when starting [BUG] Elsa 3.2.1 - Workflow instances aren't persisted when starting Oct 31, 2024
@valsov
Copy link
Author

valsov commented Nov 7, 2024

Is no one else experiencing this issue ? This looks like a pretty serious problem impacting both workflows execution monitoring and workflow instance resuming in the cause of a server stop. There is just no trace in the database of a workflow instance being executed until it finishes running.
Please let me know if I can provide any complementary information.

@sfmskywalker
Copy link
Member

sfmskywalker commented Nov 23, 2024

Hi @valsov , this is actually by design. The rationale being that not all scenarios demand immediate persistence of a workflow instance. Rather, there are scenarios where performance is more important than an up-to-date state in the DB.
Granted, there are likely more scenarios where an up to date DB is more important than raw performance, and the tradeoff may not be significant.

To that end, we have this parent issue: #4833
This will allow fine-grained control over the frequency with which the DB is updated with the state. This will include options to update the DB after every step, and options to control this per individual step. There will also be handlers the application or modules can implement to determine whether state should be committed using custom logic.

Until that feature is implemented, the workflow state is committed to the DB only when there are no more activities to execute (i.e. when the workflow gets suspended or is finished).

Now, as you have observed, some trigger implementations decide to create the workflow instance before executing it. This is a deviation from the current pattern and was done in order to store workflow input with the workflow instance, rather than sending that input via a service bus message - which can lead to message size exceeded exceptions when providing large inputs.

Until the aforementioned feature is implemented, you can try implementing the ActivityExecuted notification. In this handler, you can use the IWorkflowInstanceManager to persist the current state (it has a method that accepts a WorkflowExecutionContext argument, which is basically the current workflow state to be persisted).

@valsov
Copy link
Author

valsov commented Nov 26, 2024

Hello @sfmskywalker , thank you for your input on this.
I implemented the notification handling to persist the workflow instance when the workflow starts, and after each activity execution. The issue I'm now facing regarding recovery is that execution logs and instance variables aren't persisted when using IWorkflowInstanceManager.SaveAsync(workflowExecutionContext). They are correctly persisted at the end of an instance execution.
I managed to restart workflows at their last execution step on startup using IWorkflowRunner.RunAsync(workflowGraph, workflowInstance.WorkflowState) but since variables aren't persisted, I'm getting an inconsistent execution.
Do you know how variables could be persisted and used in this context ?

For reference, this is the code I'm using to restart a workflow instance:

var runner = serviceProvider.GetRequiredService<IWorkflowRunner>();
var definitionStore = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
var instanceStore = serviceProvider.GetRequiredService<IWorkflowInstanceStore>();
var instanceManager = serviceProvider.GetRequiredService<IWorkflowInstanceManager>();

// Retrieve state
var instance = await instanceStore.FindAsync(new Elsa.Workflows.Management.Filters.WorkflowInstanceFilter
{
    WorkflowStatus = Elsa.Workflows.WorkflowStatus.Running
});
var workflowGraph = await definitionStore.FindWorkflowGraphAsync(instance.DefinitionVersionId);

// Restart workflow
var endState = await runner.RunAsync(workflowGraph, instance.WorkflowState);

// Persist instance
await instanceManager.SaveAsync(endState.WorkflowState, CancellationToken.None);

I look forward to the workflow execution recovery feature you mentioned.
Thank you for your help, have a good day !

@sfmskywalker
Copy link
Member

It's great to see that you got this working! In order to persist and reload workflow variables, try using the IVariablePersistenceManager service - it has methods to persist and load workflow variables using their associated storage driver. The ActivityExecuted notification carries an ActivityExecutionContext, which in turn provides you with the WorkflowExecutionContext that you might use when working with the variable persistence manager. If you only have access to (de)serialized WorkflowState, then you might have to first turn that into a WorkflowExecutionContext.

@valsov
Copy link
Author

valsov commented Nov 27, 2024

Thanks again. I managed to implement a simple recovery solution with your answers. Here is the code in case anyone needs it:

public async Task HandleAsync(ActivityExecuted notification, CancellationToken cancellationToken)
{
    try
    {
        var workflowInstanceManager = serviceProvider.GetRequiredService<IWorkflowInstanceManager>();
        await workflowInstanceManager.SaveAsync(notification.ActivityExecutionContext.WorkflowExecutionContext, cancellationToken);

        // Persist variables state when the activity sets a variables:
        // 1) SetVariable activity
        // 2) Set variable via output
        if (notification.ActivityExecutionContext.Activity is SetVariable ||
            notification.ActivityExecutionContext.Activity.GetOutputs().Any())
        {
            var variableService = serviceProvider.GetRequiredService<IVariablePersistenceManager>();
            await variableService.SaveVariablesAsync(notification.ActivityExecutionContext.WorkflowExecutionContext);
        }
    }
    catch (Exception e)
    {
        logger.LogError(e, "Workflow lifecycle notification handling failed ({LifecycleEvent})", nameof(ActivityExecuted));
    }
}
/// <summary>
/// Service running on startup to resume workflows that were executing before the server stop.
/// </summary>
/// <remarks>Monitor this issue to remove this custom code: <see href="https://github.com/elsa-workflows/elsa-core/issues/4833"/></remarks>
public class WorkflowsRecoveryBackgroundService(IServiceScopeFactory serviceScopeFactory, ILogger<WorkflowsRecoveryBackgroundService> logger) : BackgroundService
{
    /// <summary>
    /// Server startup time, should be set by the server configuration pipeline
    /// </summary>
    public static DateTime ServerStartupTime { get; set; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var instanceStore = scope.ServiceProvider.GetRequiredService<IWorkflowInstanceStore>();
        try
        {
            // Find all workflow instances with a running status that started before the application started, thus needing recovery
            var instancesNeedingRecovery = (await instanceStore.FindManyAsync(new WorkflowInstanceFilter
            {
                WorkflowStatus = WorkflowStatus.Running,
                TimestampFilters =
                [
                    new TimestampFilter
                    {
                        Column = nameof(WorkflowInstance.UpdatedAt),
                        Operator = TimestampFilterOperator.LessThan,
                        Timestamp = new DateTimeOffset(ServerStartupTime)
                    }
                ]
            }, stoppingToken)).ToList();

            if (instancesNeedingRecovery.Count == 0)
            {
                logger.LogDebug("Workflows recovery process finished: no instances to recover");
                return;
            }

            logger.LogInformation("Workflows recovery process started for {WorkflowInstancesCount} workflow instances", instancesNeedingRecovery.Count);
            var tasks = new List<Task>();
            foreach (var instance in instancesNeedingRecovery)
            {
                tasks.Add(ResumeWorkflowExecution(instance, stoppingToken));
            }
            await Task.WhenAll(tasks);
            logger.LogInformation("Workflows recovery process finished");
        }
        catch (OperationCanceledException)
        {
            throw;
        }
        catch (Exception e)
        {
            logger.LogError(e, "Workflows recovery process failed");
        }
    }

    private async Task ResumeWorkflowExecution(WorkflowInstance instance, CancellationToken stoppingToken)
    {
        using var logScope = logger.AddLogPropertyToScope("CorrelationId", instance.CorrelationId);
        try
        {
            // Create a new service scope from each recovery: needed for a correct execution context
            using var serviceScope = serviceScopeFactory.CreateScope();
            var runner = serviceScope.ServiceProvider.GetRequiredService<IWorkflowRunner>();
            var definitionStore = serviceScope.ServiceProvider.GetRequiredService<IWorkflowDefinitionService>();
            var instanceManager = serviceScope.ServiceProvider.GetRequiredService<IWorkflowInstanceManager>();
            var variableService = serviceScope.ServiceProvider.GetRequiredService<IVariablePersistenceManager>();

            // Retrieve state
            var workflowGraph = await definitionStore.FindWorkflowGraphAsync(instance.DefinitionVersionId);
            var executionContext = await WorkflowExecutionContext.CreateAsync(serviceScope.ServiceProvider, workflowGraph, instance.WorkflowState, instance.CorrelationId, instance.ParentWorkflowInstanceId, instance.WorkflowState.Input, instance.WorkflowState.Properties);
            await variableService.LoadVariablesAsync(executionContext);

            logger.LogInformation("Workflow instance execution resuming started: {Workflow}", workflowGraph.Workflow.WorkflowMetadata.Name);

            // Start workflow
            var endState = await runner.RunAsync(executionContext);

            // Persist workflow instance
            await instanceManager.SaveAsync(endState.WorkflowState, stoppingToken);
        }
        catch (Exception e)
        {
            logger.LogError(e, "Failed to resume workflow instance execution with Id={WorkflowInstanceId}", instance.Id);
        }
    }
}

@valsov valsov closed this as completed Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants