Skip to content

Commit

Permalink
Only modify existing relations
Browse files Browse the repository at this point in the history
  • Loading branch information
msfstef committed Dec 12, 2024
1 parent d97e749 commit be3538e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
41 changes: 40 additions & 1 deletion packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Electric.Postgres.Configuration do
@doc """
Get Postgres server version
"""
@spec get_pg_version(Postgrex.conn()) :: non_neg_integer()
@spec get_pg_version(Postgrex.conn()) :: {:ok, non_neg_integer()} | {:error, term()}
def get_pg_version(conn) do
case Postgrex.query(
conn,
Expand All @@ -67,6 +67,8 @@ defmodule Electric.Postgres.Configuration do
Postgrex.transaction(pool, fn conn ->
publication = Utils.quote_name(publication_name)

relation_filters = filter_for_existing_relations(conn, relation_filters)

prev_published_tables =
get_publication_tables(conn, publication_name)
|> Enum.map(&Utils.relation_to_sql/1)
Expand Down Expand Up @@ -116,6 +118,9 @@ defmodule Electric.Postgres.Configuration do
publication_name
) do
Postgrex.transaction(pool, fn conn ->
# Ensure that all tables are present in the publication
relation_filters = filter_for_existing_relations(conn, relation_filters)

# Update the entire publication with the new filters
Postgrex.query!(conn, make_alter_publication_query(publication_name, relation_filters), [])

Expand Down Expand Up @@ -184,6 +189,40 @@ defmodule Electric.Postgres.Configuration do
base_sql <> tables
end

@spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters()
defp filter_for_existing_relations(conn, filters) do
query = "
WITH input_relations AS (
SELECT
UNNEST($1::text[]) AS schemaname,
UNNEST($2::text[]) AS tablename
)
SELECT ir.schemaname, ir.tablename
FROM input_relations ir
JOIN pg_class pc ON pc.relname = ir.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
"

relation_filter_map =
filters |> Enum.map(&{&1.relation, &1}) |> Map.new()

relations = Map.keys(relation_filter_map)

Postgrex.query!(conn, query, [
Enum.map(relations, &elem(&1, 0)),
Enum.map(relations, &elem(&1, 1))
])
|> Map.fetch!(:rows)
|> Enum.map(&List.to_tuple/1)
|> Enum.reduce([], fn rel, filters ->
case Map.get(relation_filter_map, rel) do
nil -> filters
filter -> [filter | filters]
end
end)
end

@spec make_table_clause(RelationFilter.t()) :: String.t()
defp make_table_clause(%RelationFilter{
relation: relation,
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ defmodule Electric.Shapes.Consumer do
} = state

shape_status.remove_shape(shape_status_state, state.shape_handle)
ShapeCache.Storage.cleanup!(state.storage)
publication_manager.remove_shape(state.shape, publication_manager_opts)
ShapeCache.Storage.cleanup!(state.storage)
end

defp reply_to_snapshot_waiters(_reply, %{awaiting_snapshot_start: []} = state) do
Expand Down

0 comments on commit be3538e

Please sign in to comment.