-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operations orchestration skeleton #543
base: main
Are you sure you want to change the base?
Conversation
c35a4c7
to
65e9c71
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this. I wonder what are your thoughts about the pros and cons of using a persistent/DB backed worker processes in the context of operations orchestration/execution ? In my understanding, with something like Oban we get better transactional support, robustness and error recovery/retries (from the start/out of the box) as compared to more ephemeral processes that are not DB backed (where these aspects are up to our own implementation).
In this case I guess our business logic is not a single background async task by itself. Anyway, I have not used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful LGTM!
Regarding using a persistent database, I was one the first to create wanda long time ago and we had a look to oban and I used oban myself in the past.
We decided it was not the right tool for the nature of the task that Wanda performs, in the past was the check execution and now the operation orchestration.
It's a dynamic spawned process and it's basically a state machine, and it will come perfectly in the use case of dynamic supervised GenServers
Using oban or other job processing tool means the we have to change the nature and the structure of wanda itself, and the benefits are not tangible because we need to change the architecture and the way we think about wanda processes.
If it was a recurrent job, a queue, wathever simple oban is perfect, but I don't think it suits the usecase we have right now.
If something happen to a process can be re triggered and can be retried, that's of course my two cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CDimonaco : I have some questions to help improve my understanding, in case you have already evaluated Oban for Wanda:
- How do we ensure that any submitted long running requests are idempotent ? Or is idempotence not needed here?
- Main app crashes are one thing, but what about crashes in the parent process of the operations orchestration? The app is still up, but the parent process has crashed and has no way of regaining its old state (it was all in-memory). How would the restarted parent process know about previous history of partial execution ? Or is this again not a concern?
- You mention retries but dynamically run/without a db backing the execution request/orchestration state, my understanding is that execution request deduplication/uniqueness would be difficult to implement. How do we avoid the impact of partial failures/(partially) duplicated executions in this case ? Or do you have reasons to believe that this is somehow not a concern?
- I believe I would benefit from a conversation about the architectural/"nature and structure of wanda" reasons. Let's talk about this, perhaps?
- (Comment) There are a few ways to do state machines, but backed by the db, with/without Oban, but I guess that's a topic for a later conversation.
- Ways of replicating this using a db backed solution is also a topic for sync/later conversation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @arbulu89!
The overall workflow sounds good to me. I left several comments with nitpicking and questions to better understand the code.
I see this is a skeleton, and we might stop here. Anyway, there is something we might want to include in the skeleton, too:
- Test the workflow using
receive_operation_reports
, too - Some docs on the state struct will be helpful
UPDATE: I gave a better look at the tests. I see most of our tests (9 out of 14) do direct calls to lifecycle functions (handle_continue
and handle_cast
) and make assertions on the state shape. That makes it hard to refactor the implementation when needed (we're testing the implementation not the result).
As we are making ground for the work to come, I think it's worth the investment to have a stronger test suite.
|
||
%Step{predicate: predicate, operator: operator} = Enum.at(steps, current_step_index) | ||
|
||
state = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion(immutability): use newState
to avoid reassign a variable
end | ||
end | ||
|
||
defp maybe_save_skipped_operation_state( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: this function sets the report to :skipped
to all the agents that don't satisfy the predicate. Right?
%State{} = state, | ||
_operator | ||
) do | ||
# publish operation execution to agents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question this is supposed to be a message dispatched to RabbitMQ, right?
|
||
pending_targets = List.delete(targets, agent_id) | ||
|
||
state = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion(immutability): use newState
to avoid reassign a variable
assert :ok == | ||
Server.receive_operation_reports(operation_id, group_id, 1, UUID.uuid4(), :updated) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: How can this test fail? What are the conditions for which we don't get :ok
?
end | ||
end | ||
|
||
test "should not start opeartion if it is already running for that group_id" do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
test "should not start opeartion if it is already running for that group_id" do | |
test "should not start operation if it is already running for that group_id" do |
defstruct [ | ||
:engine, | ||
:operation_id, | ||
:group_id, | ||
:operation, | ||
:timeout, | ||
targets: [], | ||
pending_targets_on_step: [], | ||
current_step_index: 0, | ||
agent_reports: %{}, | ||
step_failed: false | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: it would be helpful a description of the fields and what to expect from them. For example, I didn't get what group_id
is and what relationship with the other fields its in.
test "should finish operation if all steps are completed" do | ||
state = %State{ | ||
current_step_index: 1, | ||
agent_reports: [ | ||
%StepReport{ | ||
step_number: 0, | ||
agents: [%AgentReport{agent_id: UUID.uuid4(), result: :updated}] | ||
} | ||
] | ||
} | ||
|
||
assert {:stop, :normal, ^state} = | ||
Server.handle_continue( | ||
:execute_step, | ||
state | ||
) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: Alongside with calling handle_continue
directly it would be helpful to use the Server API, i.e. calling receive_operation_reports
on a genserver with the desired internal status.
%State{pending_targets_on_step: pending_targets} = | ||
state | ||
|> predicate_targets_execution(predicate) | ||
|> maybe_save_skipped_operation_state() | ||
|> maybe_request_operation_execution(operator) | ||
|> maybe_increase_current_step() | ||
|
||
if pending_targets == [] do | ||
{:noreply, state, {:continue, :execute_step}} | ||
else | ||
{:noreply, state} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: I understand that this code does 3 things:
- select target agents and dispatch the job to them
- set a
:skipped
report for agents that do not satisfy the predicate - "shortcut" the workflow if no agent satisfies the predicate
I think we can make it simplier with some refactoring. I don't know if it's worth, being this PR a skeleton.
The Idea is, when dispatching the job to all the agents we can:
if predicate_is_met(agent)
dispatch on queue
else
receive_operation_reports(agent.id, :skipped)
Have you considered that already? If you think it's worth we can refactor on this PR, otherwise we can refactor later.
Description
Implement the operation orchestration skeleton.
Operations are defined using actual elixir code. Their are composed basically by a number of steps, that are executed sequentially in all the targets agents. Each step consists of a operator execution (the same way we do with gatherers).
The orchestration works in the next way:
The steps are executed sequentially, but the order to executed the operation in all related hosts is parallel.
I have excluded from this PR everything related to messaging, storing operation results in the database, and results evaluation.
It only includes the fundamental part of the orchestration.
A solution code can looks something like this:
How was this tested?
UT test added