From be3538ec08e79e07a5e59e31db9da0a03437fffa Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 12 Dec 2024 14:35:39 +0200 Subject: [PATCH] Only modify existing relations --- .../lib/electric/postgres/configuration.ex | 41 ++++++++++++++++++- .../lib/electric/shapes/consumer.ex | 2 +- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index daba5ea65f..98962947f2 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -41,7 +41,7 @@ defmodule Electric.Postgres.Configuration do @doc """ Get Postgres server version """ - @spec get_pg_version(Postgrex.conn()) :: non_neg_integer() + @spec get_pg_version(Postgrex.conn()) :: {:ok, non_neg_integer()} | {:error, term()} def get_pg_version(conn) do case Postgrex.query( conn, @@ -67,6 +67,8 @@ defmodule Electric.Postgres.Configuration do Postgrex.transaction(pool, fn conn -> publication = Utils.quote_name(publication_name) + relation_filters = filter_for_existing_relations(conn, relation_filters) + prev_published_tables = get_publication_tables(conn, publication_name) |> Enum.map(&Utils.relation_to_sql/1) @@ -116,6 +118,9 @@ defmodule Electric.Postgres.Configuration do publication_name ) do Postgrex.transaction(pool, fn conn -> + # Ensure that all tables are present in the publication + relation_filters = filter_for_existing_relations(conn, relation_filters) + # Update the entire publication with the new filters Postgrex.query!(conn, make_alter_publication_query(publication_name, relation_filters), []) @@ -184,6 +189,40 @@ defmodule Electric.Postgres.Configuration do base_sql <> tables end + @spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters() + defp filter_for_existing_relations(conn, filters) do + query = " + WITH input_relations AS ( + SELECT + UNNEST($1::text[]) AS schemaname, + UNNEST($2::text[]) AS tablename + ) + SELECT ir.schemaname, ir.tablename + FROM input_relations ir + JOIN pg_class pc ON pc.relname = ir.tablename + JOIN pg_namespace pn ON pn.oid = pc.relnamespace + WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r'; + " + + relation_filter_map = + filters |> Enum.map(&{&1.relation, &1}) |> Map.new() + + relations = Map.keys(relation_filter_map) + + Postgrex.query!(conn, query, [ + Enum.map(relations, &elem(&1, 0)), + Enum.map(relations, &elem(&1, 1)) + ]) + |> Map.fetch!(:rows) + |> Enum.map(&List.to_tuple/1) + |> Enum.reduce([], fn rel, filters -> + case Map.get(relation_filter_map, rel) do + nil -> filters + filter -> [filter | filters] + end + end) + end + @spec make_table_clause(RelationFilter.t()) :: String.t() defp make_table_clause(%RelationFilter{ relation: relation, diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 6629878527..1362200913 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -376,8 +376,8 @@ defmodule Electric.Shapes.Consumer do } = state shape_status.remove_shape(shape_status_state, state.shape_handle) - ShapeCache.Storage.cleanup!(state.storage) publication_manager.remove_shape(state.shape, publication_manager_opts) + ShapeCache.Storage.cleanup!(state.storage) end defp reply_to_snapshot_waiters(_reply, %{awaiting_snapshot_start: []} = state) do