From e59a5bf667f94eef57f704b3bb8ca10b756c4db0 Mon Sep 17 00:00:00 2001 From: Carmine Di Monaco Date: Thu, 13 Jul 2023 14:48:35 +0200 Subject: [PATCH] Projectors refactor (#1624) --- .../projectors/cluster_projector.ex | 25 ++++-- .../projectors/database_projector.ex | 64 +++++++------- .../application/projectors/host_projector.ex | 86 +++++++++---------- .../projectors/sap_system_projector.ex | 45 +++++----- 4 files changed, 110 insertions(+), 110 deletions(-) diff --git a/lib/trento/application/projectors/cluster_projector.ex b/lib/trento/application/projectors/cluster_projector.ex index 7a94d76d28..4aaf4180bd 100644 --- a/lib/trento/application/projectors/cluster_projector.ex +++ b/lib/trento/application/projectors/cluster_projector.ex @@ -64,7 +64,9 @@ defmodule Trento.ClusterProjector do }, fn multi -> changeset = - ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{ + ClusterReadModel + |> Repo.get!(cluster_id) + |> ClusterReadModel.changeset(%{ deregistered_at: deregistered_at }) @@ -77,10 +79,10 @@ defmodule Trento.ClusterProjector do cluster_id: cluster_id }, fn multi -> - cluster = Repo.get!(ClusterReadModel, cluster_id) - changeset = - ClusterReadModel.changeset(cluster, %{ + ClusterReadModel + |> Repo.get!(cluster_id) + |> ClusterReadModel.changeset(%{ deregistered_at: nil }) @@ -102,7 +104,9 @@ defmodule Trento.ClusterProjector do }, fn multi -> changeset = - ClusterReadModel.changeset(%ClusterReadModel{id: id}, %{ + ClusterReadModel + |> Repo.get!(id) + |> ClusterReadModel.changeset(%{ name: name, sid: sid, additional_sids: additional_sids, @@ -135,7 +139,10 @@ defmodule Trento.ClusterProjector do ) project(%ClusterHealthChanged{cluster_id: cluster_id, health: health}, fn multi -> - changeset = ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{health: health}) + changeset = + ClusterReadModel + |> Repo.get!(cluster_id) + |> ClusterReadModel.changeset(%{health: health}) Ecto.Multi.update(multi, :cluster, changeset) end) @@ -185,9 +192,9 @@ defmodule Trento.ClusterProjector do end @impl true - def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, _) do - %ClusterReadModel{name: name} = Repo.get!(ClusterReadModel, cluster_id) - + def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, %{ + cluster: %ClusterReadModel{name: name} + }) do TrentoWeb.Endpoint.broadcast("monitoring:clusters", "cluster_deregistered", %{ id: cluster_id, name: name diff --git a/lib/trento/application/projectors/database_projector.ex b/lib/trento/application/projectors/database_projector.ex index c0214bf4ce..fe840fb8e3 100644 --- a/lib/trento/application/projectors/database_projector.ex +++ b/lib/trento/application/projectors/database_projector.ex @@ -51,7 +51,9 @@ defmodule Trento.DatabaseProjector do }, fn multi -> changeset = - DatabaseReadModel.changeset(%DatabaseReadModel{id: sap_system_id}, %{health: health}) + DatabaseReadModel + |> Repo.get!(sap_system_id) + |> DatabaseReadModel.changeset(%{health: health}) Ecto.Multi.update(multi, :database, changeset) end @@ -104,14 +106,13 @@ defmodule Trento.DatabaseProjector do }, fn multi -> changeset = - DatabaseInstanceReadModel.changeset( - %DatabaseInstanceReadModel{ - sap_system_id: sap_system_id, - host_id: host_id, - instance_number: instance_number - }, - %{health: health} + DatabaseInstanceReadModel + |> Repo.get_by( + sap_system_id: sap_system_id, + instance_number: instance_number, + host_id: host_id ) + |> DatabaseInstanceReadModel.changeset(%{health: health}) Ecto.Multi.update(multi, :database_instance, changeset) end @@ -127,17 +128,16 @@ defmodule Trento.DatabaseProjector do }, fn multi -> changeset = - DatabaseInstanceReadModel.changeset( - %DatabaseInstanceReadModel{ - sap_system_id: sap_system_id, - host_id: host_id, - instance_number: instance_number - }, - %{ - system_replication: system_replication, - system_replication_status: system_replication_status - } + DatabaseInstanceReadModel + |> Repo.get_by( + sap_system_id: sap_system_id, + instance_number: instance_number, + host_id: host_id ) + |> DatabaseInstanceReadModel.changeset(%{ + system_replication: system_replication, + system_replication_status: system_replication_status + }) Ecto.Multi.update(multi, :database_instance, changeset) end @@ -150,12 +150,9 @@ defmodule Trento.DatabaseProjector do }, fn multi -> changeset = - DatabaseReadModel.changeset( - %DatabaseReadModel{ - id: sap_system_id - }, - %{deregistered_at: deregistered_at} - ) + DatabaseReadModel + |> Repo.get!(sap_system_id) + |> DatabaseReadModel.changeset(%{deregistered_at: deregistered_at}) Ecto.Multi.update(multi, :database, changeset) end @@ -167,13 +164,10 @@ defmodule Trento.DatabaseProjector do health: health }, fn multi -> - db = Repo.get!(DatabaseReadModel, sap_system_id) - changeset = - DatabaseReadModel.changeset( - db, - %{deregistered_at: nil, health: health} - ) + DatabaseReadModel + |> Repo.get!(sap_system_id) + |> DatabaseReadModel.changeset(%{deregistered_at: nil, health: health}) Ecto.Multi.update(multi, :database, changeset) end @@ -323,12 +317,12 @@ defmodule Trento.DatabaseProjector do sap_system_id: sap_system_id }, _, - _ + %{ + database: %DatabaseReadModel{ + sid: sid + } + } ) do - %DatabaseReadModel{ - sid: sid - } = Repo.get(DatabaseReadModel, sap_system_id) - TrentoWeb.Endpoint.broadcast( @databases_topic, "database_deregistered", diff --git a/lib/trento/application/projectors/host_projector.ex b/lib/trento/application/projectors/host_projector.ex index 7892b8d585..d582edf304 100644 --- a/lib/trento/application/projectors/host_projector.ex +++ b/lib/trento/application/projectors/host_projector.ex @@ -58,7 +58,9 @@ defmodule Trento.HostProjector do }, fn multi -> changeset = - HostReadModel.changeset(%HostReadModel{id: id}, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ deregistered_at: deregistered_at }) @@ -71,10 +73,10 @@ defmodule Trento.HostProjector do host_id: id }, fn multi -> - host = Repo.get!(HostReadModel, id) - changeset = - HostReadModel.changeset(host, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ deregistered_at: nil }) @@ -95,7 +97,8 @@ defmodule Trento.HostProjector do Ecto.Multi.insert(multi, :host, changeset, on_conflict: {:replace, [:cluster_id]}, - conflict_target: [:id] + conflict_target: [:id], + returning: true ) end ) @@ -132,7 +135,9 @@ defmodule Trento.HostProjector do }, fn multi -> changeset = - HostReadModel.changeset(%HostReadModel{id: id}, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ hostname: hostname, ip_addresses: ip_addresses, agent_version: agent_version @@ -163,7 +168,9 @@ defmodule Trento.HostProjector do %HeartbeatSucceded{host_id: id}, fn multi -> changeset = - HostReadModel.changeset(%HostReadModel{id: id}, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ heartbeat: :passing }) @@ -175,7 +182,9 @@ defmodule Trento.HostProjector do %HeartbeatFailed{host_id: id}, fn multi -> changeset = - HostReadModel.changeset(%HostReadModel{id: id}, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ heartbeat: :critical }) @@ -187,7 +196,9 @@ defmodule Trento.HostProjector do %ProviderUpdated{host_id: id, provider: provider, provider_data: provider_data}, fn multi -> changeset = - HostReadModel.changeset(%HostReadModel{id: id}, %{ + HostReadModel + |> Repo.get!(id) + |> HostReadModel.changeset(%{ provider: provider, provider_data: handle_provider_data(provider_data) }) @@ -205,13 +216,10 @@ defmodule Trento.HostProjector do @impl true @spec after_update(any, any, any) :: :ok | {:error, any} def after_update( - %HostRegistered{host_id: id}, + %HostRegistered{}, _, - _ + %{host: %HostReadModel{} = host} ) do - # We need to hit the database to get the cluster_id - host = Repo.get!(HostReadModel, id) - TrentoWeb.Endpoint.broadcast( "monitoring:hosts", "host_registered", @@ -239,10 +247,8 @@ defmodule Trento.HostProjector do def after_update( %HostDeregistered{host_id: id}, _, - _ + %{host: %HostReadModel{hostname: hostname}} ) do - %HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id) - TrentoWeb.Endpoint.broadcast( "monitoring:hosts", "host_deregistered", @@ -253,32 +259,30 @@ defmodule Trento.HostProjector do ) end + def after_update(%HostAddedToCluster{}, _, %{ + host: %HostReadModel{hostname: nil} + }), + do: :ok + def after_update( %HostAddedToCluster{host_id: id, cluster_id: cluster_id}, _, _ ) do - case Repo.get!(HostReadModel, id) do - # In case the host was not registered yet, we don't want to broadcast - %HostReadModel{hostname: nil} -> - :ok - - %HostReadModel{} -> - TrentoWeb.Endpoint.broadcast( - "monitoring:hosts", - "host_details_updated", - %{ - id: id, - cluster_id: cluster_id - } - ) - end + TrentoWeb.Endpoint.broadcast( + "monitoring:hosts", + "host_details_updated", + %{ + id: id, + cluster_id: cluster_id + } + ) end def after_update( %HostRemovedFromCluster{host_id: host_id}, _, - %{host: %Trento.HostReadModel{cluster_id: nil}} + %{host: %HostReadModel{cluster_id: nil}} ) do TrentoWeb.Endpoint.broadcast("monitoring:hosts", "host_details_updated", %{ id: host_id, @@ -289,7 +293,7 @@ defmodule Trento.HostProjector do def after_update( %HostDetailsUpdated{}, _, - %{host: host} + %{host: %HostReadModel{} = host} ) do TrentoWeb.Endpoint.broadcast( "monitoring:hosts", @@ -301,10 +305,8 @@ defmodule Trento.HostProjector do def after_update( %HeartbeatSucceded{host_id: id}, _, - _ + %{host: %HostReadModel{hostname: hostname}} ) do - %HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id) - TrentoWeb.Endpoint.broadcast( "monitoring:hosts", "heartbeat_succeded", @@ -320,10 +322,8 @@ defmodule Trento.HostProjector do def after_update( %HeartbeatFailed{host_id: id}, _, - _ + %{host: %HostReadModel{hostname: hostname}} ) do - %HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id) - TrentoWeb.Endpoint.broadcast( "monitoring:hosts", "heartbeat_failed", @@ -349,12 +349,10 @@ defmodule Trento.HostProjector do end def after_update( - %HostChecksSelected{host_id: host_id, checks: checks}, + %HostChecksSelected{checks: checks}, _, - _ + %{host: %HostReadModel{selected_checks: checks} = host} ) do - host = %HostReadModel{id: host_id, selected_checks: checks} - message = HostView.render( "host_details_updated.json", diff --git a/lib/trento/application/projectors/sap_system_projector.ex b/lib/trento/application/projectors/sap_system_projector.ex index e47168f9c1..3fa921d143 100644 --- a/lib/trento/application/projectors/sap_system_projector.ex +++ b/lib/trento/application/projectors/sap_system_projector.ex @@ -60,7 +60,9 @@ defmodule Trento.SapSystemProjector do }, fn multi -> changeset = - SapSystemReadModel.changeset(%SapSystemReadModel{id: sap_system_id}, %{health: health}) + SapSystemReadModel + |> Repo.get!(sap_system_id) + |> SapSystemReadModel.changeset(%{health: health}) Ecto.Multi.update(multi, :sap_system, changeset) end @@ -106,14 +108,15 @@ defmodule Trento.SapSystemProjector do new_host_id: new_host_id }, fn multi -> - instance = - Repo.get_by(ApplicationInstanceReadModel, + changeset = + ApplicationInstanceReadModel + |> Repo.get_by( sap_system_id: sap_system_id, instance_number: instance_number, host_id: old_host_id ) + |> ApplicationInstanceReadModel.changeset(%{host_id: new_host_id}) - changeset = ApplicationInstanceReadModel.changeset(instance, %{host_id: new_host_id}) Ecto.Multi.update(multi, :application_instance, changeset) end ) @@ -127,14 +130,13 @@ defmodule Trento.SapSystemProjector do }, fn multi -> changeset = - ApplicationInstanceReadModel.changeset( - %ApplicationInstanceReadModel{ - sap_system_id: sap_system_id, - host_id: host_id, - instance_number: instance_number - }, - %{health: health} + ApplicationInstanceReadModel + |> Repo.get_by( + sap_system_id: sap_system_id, + instance_number: instance_number, + host_id: host_id ) + |> ApplicationInstanceReadModel.changeset(%{health: health}) Ecto.Multi.update(multi, :application_instance, changeset) end @@ -147,10 +149,9 @@ defmodule Trento.SapSystemProjector do }, fn multi -> changeset = - SapSystemReadModel.changeset( - %SapSystemReadModel{id: sap_system_id}, - %{deregistered_at: deregistered_at} - ) + SapSystemReadModel + |> Repo.get!(sap_system_id) + |> SapSystemReadModel.changeset(%{deregistered_at: deregistered_at}) Ecto.Multi.update(multi, :sap_system, changeset) end @@ -164,10 +165,10 @@ defmodule Trento.SapSystemProjector do health: health }, fn multi -> - sap_system = Repo.get!(SapSystemReadModel, sap_system_id) - changeset = - SapSystemReadModel.changeset(sap_system, %{ + SapSystemReadModel + |> Repo.get!(sap_system_id) + |> SapSystemReadModel.changeset(%{ tenant: tenant, db_host: db_host, health: health, @@ -203,7 +204,9 @@ defmodule Trento.SapSystemProjector do }, fn multi -> changeset = - SapSystemReadModel.changeset(%SapSystemReadModel{id: sap_system_id}, %{ + SapSystemReadModel + |> Repo.get!(sap_system_id) + |> SapSystemReadModel.changeset(%{ ensa_version: ensa_version }) @@ -317,10 +320,8 @@ defmodule Trento.SapSystemProjector do def after_update( %SapSystemDeregistered{sap_system_id: sap_system_id}, _, - _ + %{sap_system: %SapSystemReadModel{sid: sid}} ) do - %SapSystemReadModel{sid: sid} = Repo.get!(SapSystemReadModel, sap_system_id) - TrentoWeb.Endpoint.broadcast( @sap_systems_topic, "sap_system_deregistered",