Skip to content

Commit

Permalink
Save operations in the database (#547)
Browse files Browse the repository at this point in the history
* Create operation migration and schema

* Implement operation functions

* Store operation progress in the database

* Adapt tests to initially pass

* Add and update tests to avoid implementation details usage

* Use operation enums

* Use enums in test factories

* Add required fields validation in operation schema

* Restore updated_at field

* Use missing enums properly

* Use jsonb specifially as operation schema type

* Add new database indexes for jsonb fields

* Update index migration using ecto and adding jsonb_path_ops
  • Loading branch information
arbulu89 authored Jan 16, 2025
1 parent 2e19bef commit e54977f
Show file tree
Hide file tree
Showing 12 changed files with 663 additions and 343 deletions.
66 changes: 66 additions & 0 deletions lib/wanda/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,70 @@ defmodule Wanda.Operations do
Operations are combined actions dispatched to different agents in order to apply
persistent changes on them.
"""

alias Wanda.Repo

alias Wanda.Operations.{
Operation,
OperationTarget,
StepReport
}

require Wanda.Operations.Enums.Result, as: Result
require Wanda.Operations.Enums.Status, as: Status

@doc """
Create a new operarion.
If the operation already exists, it will be returned.
"""
@spec create_operation!(String.t(), String.t(), [OperationTarget.t()]) :: Operation.t()
def create_operation!(operation_id, group_id, targets) do
%Operation{}
|> Operation.changeset(%{
operation_id: operation_id,
group_id: group_id,
status: Status.running(),
result: Result.not_executed(),
targets: Enum.map(targets, &Map.from_struct/1)
})
|> Repo.insert!(on_conflict: :nothing)
end

@doc """
Get an operation by operation_id.
"""
@spec get_operation!(String.t()) :: Operation.t()
def get_operation!(operation_id) do
Repo.get!(Operation, operation_id)
end

@doc """
Update agent_reports of an operation
"""
@spec update_agent_reports!(String.t(), [StepReport.t()]) ::
Operation.t()
def update_agent_reports!(operation_id, agent_reports) do
Operation
|> Repo.get!(operation_id)
|> Operation.changeset(%{
agent_reports: agent_reports
})
|> Repo.update!()
end

@doc """
Marks a previously started operation as completed
"""
@spec complete_operation!(String.t(), Result.t()) :: Operation.t()
def complete_operation!(operation_id, result) do
Operation
|> Repo.get!(operation_id)
|> Operation.changeset(%{
result: result,
status: Status.completed(),
completed_at: DateTime.utc_now()
})
|> Repo.update!()
end
end
1 change: 1 addition & 0 deletions lib/wanda/operations/agent_report.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Wanda.Operations.AgentReport do

require Wanda.Operations.Enums.Result, as: Result

@derive Jason.Encoder
defstruct [
:agent_id,
:result
Expand Down
2 changes: 1 addition & 1 deletion lib/wanda/operations/enums/result.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Wanda.Operations.Enums.Result do
@moduledoc """
Type that represents am operation result.
Type that represents an operation result.
"""

use Wanda.Support.Enum,
Expand Down
8 changes: 8 additions & 0 deletions lib/wanda/operations/enums/status.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Wanda.Operations.Enums.Status do
@moduledoc """
Type that represents an operation execution status.
"""

use Wanda.Support.Enum,
values: [:running, :completed]
end
55 changes: 55 additions & 0 deletions lib/wanda/operations/operation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Wanda.Operations.Operation do
@moduledoc """
Schema of a persisted operation.
"""

use Ecto.Schema

import Ecto.Changeset

require Wanda.Operations.Enums.Result, as: Result
require Wanda.Operations.Enums.Status, as: Status

@type t :: %__MODULE__{}

@fields ~w(operation_id group_id result status agent_reports started_at updated_at completed_at)a
@target_fields ~w(agent_id arguments)a

@required_fields ~w(operation_id group_id result status)a
@targets_required_fields ~w(agent_id)a

@derive {Jason.Encoder, [except: [:__meta__]]}
@primary_key false
schema "operations" do
field :operation_id, Ecto.UUID, primary_key: true
field :group_id, Ecto.UUID
field :result, Ecto.Enum, values: Result.values()
field :status, Ecto.Enum, values: Status.values()

embeds_many :targets, Target, primary_key: false do
@derive Jason.Encoder

field :agent_id, Ecto.UUID, primary_key: true
field :arguments, :map
end

field :agent_reports, {:array, :map}

field :completed_at, :utc_datetime_usec
timestamps(type: :utc_datetime_usec, inserted_at: :started_at)
end

@spec changeset(t() | Ecto.Changeset.t(), map) :: Ecto.Changeset.t()
def changeset(operation, params) do
operation
|> cast(params, @fields)
|> cast_embed(:targets, with: &target_changeset/2, required: true)
|> validate_required(@required_fields)
end

defp target_changeset(target, params) do
target
|> cast(params, @target_fields)
|> validate_required(@targets_required_fields)
end
end
33 changes: 31 additions & 2 deletions lib/wanda/operations/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Wanda.Operations.Server do

use GenServer, restart: :transient

alias Wanda.Operations

alias Wanda.Operations.{AgentReport, OperationTarget, State, StepReport, Supervisor}
alias Wanda.Operations.Catalog.{Operation, Step}

Expand Down Expand Up @@ -75,18 +77,25 @@ defmodule Wanda.Operations.Server do
@impl true
def handle_continue(
:start_operation,
%State{} = state
%State{
operation_id: operation_id,
group_id: group_id,
targets: targets
} = state
) do
engine = EvaluationEngine.new()
new_state = initialize_report_results(state)

Operations.create_operation!(operation_id, group_id, targets)

{:noreply, %State{new_state | engine: engine}, {:continue, :execute_step}}
end

@impl true
def handle_continue(
:execute_step,
%State{
operation_id: operation_id,
step_failed: true,
agent_reports: _agent_reports
} = state
Expand All @@ -95,7 +104,10 @@ defmodule Wanda.Operations.Server do

# Evaluate results using agent_reports
# Start rollback?
# Publish and store results
# Publish results

# Result is failed or rolledback, depending on the evaluation result
Operations.complete_operation!(operation_id, Result.failed())

{:stop, :normal, state}
end
Expand All @@ -122,6 +134,7 @@ defmodule Wanda.Operations.Server do
|> maybe_save_skipped_operation_state()
|> maybe_request_operation_execution(operator)
|> maybe_increase_current_step()
|> store_agent_reports()

if pending_targets == [] do
{:noreply, new_state, {:continue, :execute_step}}
Expand All @@ -134,6 +147,7 @@ defmodule Wanda.Operations.Server do
def handle_continue(
:execute_step,
%State{
operation_id: operation_id,
agent_reports: _agent_reports
} = state
) do
Expand All @@ -142,6 +156,9 @@ defmodule Wanda.Operations.Server do
# Evaluate results using agent_reports
# Publish and store results

# Result based on evaluation result
Operations.complete_operation!(operation_id, Result.updated())

{:stop, :normal, state}
end

Expand All @@ -166,6 +183,7 @@ defmodule Wanda.Operations.Server do
|> update_report_results(step_number, agent_id, operation_result)
|> maybe_set_step_failed(operation_result)
|> maybe_increase_current_step()
|> store_agent_reports()

if pending_targets == [] do
{:noreply, new_state, {:continue, :execute_step}}
Expand Down Expand Up @@ -339,6 +357,17 @@ defmodule Wanda.Operations.Server do
%State{state | agent_reports: updated_agent_reports}
end

defp store_agent_reports(
%State{
operation_id: operation_id,
agent_reports: agent_reports
} = state
) do
Operations.update_agent_reports!(operation_id, agent_reports)

state
end

defp maybe_set_step_failed(state, result)
when result in [Result.failed(), Result.rolled_back()],
do: %State{state | step_failed: true}
Expand Down
1 change: 1 addition & 0 deletions lib/wanda/operations/step_report.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Wanda.Operations.StepReport do

alias Wanda.Operations.AgentReport

@derive Jason.Encoder
defstruct [
:step_number,
:agents
Expand Down
19 changes: 19 additions & 0 deletions priv/repo/migrations/20250109154813_add_operation.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Wanda.Repo.Migrations.AddOperation do
use Ecto.Migration

def change do
create table(:operations, primary_key: false) do
add :operation_id, :uuid, primary_key: true
add :group_id, :uuid, null: false
add :result, :string, null: false
add :status, :string, null: false
add :targets, :jsonb, null: false, default: "[]"
add :agent_reports, :jsonb, null: false, default: "[]"
add :completed_at, :utc_datetime_usec
timestamps(type: :utc_datetime_usec, inserted_at: :started_at)
end

create index(:operations, [:group_id])
create unique_index(:operations, [:operation_id, :group_id])
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Wanda.Repo.Migrations.AddOperationJsonbIndexes do
use Ecto.Migration

def change do
create index(:operations, ["targets jsonb_path_ops"], using: "GIN")
create index(:operations, ["agent_reports jsonb_path_ops"], using: "GIN")
end
end
Loading

0 comments on commit e54977f

Please sign in to comment.