Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync-service): Clean up publication filters #2154

Merged
merged 24 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wild-bugs-raise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Introduced `PublicationManager` process to create and clean up publication filters.
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ defmodule Electric.Connection.Manager do
Electric.Connection.Supervisor.start_shapes_supervisor(
stack_id: state.stack_id,
shape_cache_opts: shape_cache_opts,
pool_opts: state.pool_opts,
replication_opts: state.replication_opts,
stack_events_registry: state.stack_events_registry,
tweaks: state.tweaks
)
Expand Down
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,18 @@ defmodule Electric.Connection.Supervisor do
def start_shapes_supervisor(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)
shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts)
db_pool_opts = Keyword.fetch!(opts, :pool_opts)
replication_opts = Keyword.fetch!(opts, :replication_opts)
inspector = Keyword.fetch!(shape_cache_opts, :inspector)

shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

publication_manager_spec =
{Electric.Replication.PublicationManager,
stack_id: stack_id,
publication_name: Keyword.fetch!(replication_opts, :publication_name),
db_pool: Keyword.fetch!(db_pool_opts, :name)}

shape_log_collector_spec =
{Electric.Replication.ShapeLogCollector, stack_id: stack_id, inspector: inspector}

Expand All @@ -51,6 +59,7 @@ defmodule Electric.Connection.Supervisor do
Electric.Replication.Supervisor,
stack_id: stack_id,
shape_cache: shape_cache_spec,
publication_manager: publication_manager_spec,
log_collector: shape_log_collector_spec
},
restart: :temporary
Expand Down
247 changes: 156 additions & 91 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ defmodule Electric.Postgres.Configuration do
a provided connection.
"""
require Logger
alias Electric.Replication.PublicationManager.RelationFilter
alias Electric.Utils
alias Electric.Shapes.Shape

@type filter() :: String.t() | nil
@type filters() :: %{Electric.relation() => filter()}
@type filters() :: %{Electric.relation() => RelationFilter.t()}

@pg_15 150_000

Expand All @@ -25,34 +24,75 @@ defmodule Electric.Postgres.Configuration do
"""
@spec configure_tables_for_replication!(
Postgrex.conn(),
[Shape.table_with_where_clause()],
(-> String.t()),
filters(),
String.t(),
float()
) ::
{:ok, [:ok]}
def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do
def configure_tables_for_replication!(pool, relation_filters, pg_version, publication_name) do
configure_tables_for_replication_internal!(
pool,
relations,
get_pg_version.(),
relation_filters,
pg_version,
publication_name
)
end

defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name)
@doc """
Get Postgres server version
"""
@spec get_pg_version(Postgrex.conn()) :: {:ok, non_neg_integer()} | {:error, term()}
def get_pg_version(conn) do
case Postgrex.query(
conn,
"SELECT current_setting('server_version_num') server_version_num",
[]
) do
{:ok, result} when result.num_rows == 1 ->
[[version_str]] = result.rows
{:ok, String.to_integer(version_str)}

{:error, err} ->
{:error, err}
end
end

defp configure_tables_for_replication_internal!(
pool,
relation_filters,
pg_version,
publication_name
)
when pg_version < @pg_15 do
Postgrex.transaction(pool, fn conn ->
for {relation, _} <- relations,
table = Utils.relation_to_sql(relation),
publication = Utils.quote_name(publication_name) do
publication = Utils.quote_name(publication_name)

relation_filters = filter_for_existing_relations(conn, relation_filters)

prev_published_tables =
get_publication_tables(conn, publication_name)
|> Enum.map(&Utils.relation_to_sql/1)
|> MapSet.new()

new_published_tables =
relation_filters
|> Map.keys()
|> Enum.map(&Utils.relation_to_sql/1)
|> MapSet.new()

alter_ops =
Enum.concat(
MapSet.difference(new_published_tables, prev_published_tables)
|> Enum.map(&{&1, "ADD"}),
MapSet.difference(prev_published_tables, new_published_tables)
|> Enum.map(&{&1, "DROP"})
)

for {table, op} <- alter_ops do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

# PG 14 and below do not support filters on tables of publications
case Postgrex.query(
conn,
"ALTER PUBLICATION #{publication} ADD TABLE #{table}",
[]
) do
case Postgrex.query(conn, "ALTER PUBLICATION #{publication} #{op} TABLE #{table}", []) do
{:ok, _} ->
Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", [])
:ok
Expand All @@ -68,41 +108,38 @@ defmodule Electric.Postgres.Configuration do
end
end

set_replica_identity!(conn, relations)
set_replica_identity!(conn, relation_filters)
end)
end

defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do
defp configure_tables_for_replication_internal!(
pool,
relation_filters,
_pg_version,
publication_name
) do
Postgrex.transaction(pool, fn conn ->
# We're using advisory locks to prevent race conditions when multiple
# processes try to read-then-update the publication configuration. We're not using `SELECT FOR UPDATE`
# because it doesn't read the value that was updated by other transaction holding the lock. This lock
# is thus acquired before reading the existing configuration, so the first read sees the latest value.
Postgrex.query!(conn, "SELECT pg_advisory_xact_lock($1)", [:erlang.phash2(publication_name)])
# Ensure that all tables are present in the publication
relation_filters = filter_for_existing_relations(conn, relation_filters)

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filters =
Enum.reduce(relations, filters, fn {relation, clause}, acc ->
Map.update(acc, relation, clause, &extend_where_clause(&1, clause))
end)

Postgrex.query!(conn, make_alter_publication_query(publication_name, filters), [])
# Update the entire publication with the new filters
Postgrex.query!(
conn,
make_alter_publication_query(publication_name, relation_filters),
[]
)

# `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table,
# but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will
# deadlock if the order is reversed.
set_replica_identity!(conn, relations)
set_replica_identity!(conn, relation_filters)

[:ok]
end)
end

defp set_replica_identity!(conn, relations) do
for {relation, _} <- relations,
defp set_replica_identity!(conn, relation_filters) do
for %RelationFilter{relation: relation} <- Map.values(relation_filters),
table = Utils.relation_to_sql(relation) do
%Postgrex.Result{rows: [[correct_identity?]]} =
Postgrex.query!(
Expand All @@ -118,72 +155,100 @@ defmodule Electric.Postgres.Configuration do
end
end

# Returns the filters grouped by table for the given publication.
@spec get_publication_filters(Postgrex.conn(), String.t()) :: filters()
defp get_publication_filters(conn, publication) do
@spec get_publication_tables(Postgrex.conn(), String.t()) :: list(Electric.relation())
defp get_publication_tables(conn, publication) do
Postgrex.query!(
conn,
"SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1",
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = $1",
[publication]
)
|> Map.fetch!(:rows)
|> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)})
|> Map.new()
end

@doc """
Drops all tables from the given publication.
"""
@spec drop_all_publication_tables(Postgrex.conn(), String.t()) :: Postgrex.Result.t()
def drop_all_publication_tables(conn, publication_name) do
Postgrex.query!(
conn,
"
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '#{publication_name}')
LOOP
EXECUTE 'ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE ' || r.schemaname || '.' || r.tablename || ';';
END LOOP;
END $$;
",
[]
)
end

# Joins the existing filter for the table with the where clause for the table.
# If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`.
@spec extend_where_clause(filter(), filter()) :: filter()
defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do
nil
end

defp extend_where_clause(filter, where_clause) do
"(#{filter} OR #{where_clause})"
|> Enum.map(&(Enum.take(&1, 2) |> List.to_tuple()))
end

# Makes an SQL query that alters the given publication whith the given tables and filters.
@spec make_alter_publication_query(String.t(), filters()) :: String.t()
defp make_alter_publication_query(publication_name, filters) do
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
case Map.values(filters) do
[] ->
"""
DO $$
DECLARE
tables TEXT;
BEGIN
SELECT string_agg(format('%I.%I', schemaname, tablename), ', ')
INTO tables
FROM pg_publication_tables
WHERE pubname = '#{publication_name}' ;

IF tables IS NOT NULL THEN
EXECUTE format('ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE %s', tables);
END IF;
END $$;
"""

filters ->
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
end
end

@spec make_table_clause({Electric.relation(), filter()}) :: String.t()
defp make_table_clause({{schema, tbl}, nil}) do
Utils.relation_to_sql({schema, tbl})
@spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters()
defp filter_for_existing_relations(conn, filters) do
query = """
WITH input_relations AS (
SELECT
UNNEST($1::text[]) AS schemaname,
UNNEST($2::text[]) AS tablename
)
SELECT ir.schemaname, ir.tablename
FROM input_relations ir
JOIN pg_class pc ON pc.relname = ir.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
"""

relations = Map.keys(filters)

Postgrex.query!(conn, query, [
Enum.map(relations, &elem(&1, 0)),
Enum.map(relations, &elem(&1, 1))
])
|> Map.fetch!(:rows)
|> Enum.map(&List.to_tuple/1)
|> Enum.reduce(%{}, fn rel, new_filters ->
case Map.get(filters, rel) do
nil -> new_filters
filter -> Map.put(new_filters, rel, filter)
end
end)
end

defp make_table_clause({{schema, tbl}, where}) do
table = Utils.relation_to_sql({schema, tbl})
table <> " WHERE " <> where
@spec make_table_clause(RelationFilter.t()) :: String.t()
defp make_table_clause(%RelationFilter{
relation: relation,
where_clauses: where_clauses
# selected_columns: cols
}) do
table = Utils.relation_to_sql(relation)

# NOTE: cannot filter on columns with REPLICA IDENTITY FULL
# cols = if cols == nil, do: "", else: " (#{Enum.join(cols, ", ")})"
cols = ""

where =
if where_clauses == nil,
do: "",
else:
" WHERE " <>
"(#{where_clauses |> Enum.map(& &1.query) |> Enum.join(" OR ")})"

table <> cols <> where
end
end
22 changes: 22 additions & 0 deletions packages/sync-service/lib/electric/replication/eval/expr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,26 @@ defmodule Electric.Replication.Eval.Expr do
used_refs: used_refs(),
returns: Env.pg_type()
}

@doc """
Returns a flat list of all used refs used in the expression
that point to the current table

## Examples

iex> used_refs = %{["id"] => :int8, ["created_at"] => :timestamp}
iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs})
["created_at", "id"]

iex> used_refs = %{["id"] => :int8, ["potato", "created_at"] => :timestamp}
iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs, returns: :int8})
["id"]
"""
@spec unqualified_refs(t()) :: [String.t()]
def unqualified_refs(%__MODULE__{used_refs: used_refs}) do
used_refs
# Keep only used refs that are pointing to current table
|> Enum.filter(&match?({[_], _}, &1))
|> Enum.map(fn {[key], _} -> key end)
end
end
Loading
Loading