Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #609 Publication with columns #763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- postgresql_publication - add possibility of creating publication with column list (https://github.com/ansible-collections/community.postgresql/pull/763).
206 changes: 179 additions & 27 deletions plugins/modules/postgresql_publication.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
the publication state will be changed.
aliases: [ login_db ]
type: str
columns:
description:
- List of tables and its columns to add to the publication.
- If no columns are passed for table, it will be published as a whole.
- Mutually exclusive with I(tables) and I(tables_in_schema).
type: dict
kacperk marked this conversation as resolved.
Show resolved Hide resolved
version_added: '3.8.0'
tables:
description:
- List of tables to add to the publication.
Expand All @@ -36,7 +43,7 @@
nothing will be changed.
- If you need to add all tables to the publication with the same name,
drop existent and create new without passing I(tables).
- Mutually exclusive with I(tables_in_schema).
- Mutually exclusive with I(tables_in_schema) and I(columns).
type: list
elements: str
tables_in_schema:
Expand All @@ -45,7 +52,7 @@
for all tables in those schemas.
- If you want to remove all schemas, explicitly pass an empty list C([]).
- Supported since PostgreSQL 15.
- Mutually exclusive with I(tables).
- Mutually exclusive with I(tables) and I(columns).
type: list
elements: str
version_added: '3.5.0'
Expand Down Expand Up @@ -131,6 +138,15 @@
- prices
- vehicles

- name: Create publication "acme" publishing only prices table and id and named from vehicles tables
community.postgresql.postgresql_publication:
name: acme
columns:
prices:
vehicles:
- id
- name

- name: Create a new publication "acme" for tables in schema "myschema"
community.postgresql.postgresql_publication:
db: test
Expand Down Expand Up @@ -210,19 +226,11 @@
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six import iteritems
from ansible_collections.community.postgresql.plugins.module_utils.database import (
check_input,
pg_quote_identifier,
)
check_input, pg_quote_identifier)
from ansible_collections.community.postgresql.plugins.module_utils.postgres import (
connect_to_db,
ensure_required_libs,
exec_sql,
get_conn_params,
get_server_version,
pg_cursor_args,
postgres_common_argument_spec,
set_comment,
)
connect_to_db, ensure_required_libs, exec_sql, get_conn_params,
get_server_version, pg_cursor_args, postgres_common_argument_spec,
set_comment)
Andersson007 marked this conversation as resolved.
Show resolved Hide resolved

SUPPORTED_PG_VERSION = 10000

Expand All @@ -231,6 +239,22 @@
# Module functions and classes #
################################

def normalize_table_name(table):
"""Add 'public.' to name of table where a schema identifier is absent
and add quotes to each element.

Args:
table (str): Table name.

Returns:
str: Normalized table name.
"""
if '.' not in table:
return pg_quote_identifier('public.%s' % table.strip(), 'table')
else:
return pg_quote_identifier(table.strip(), 'table')


def transform_tables_representation(tbl_list):
"""Add 'public.' to names of tables where a schema identifier is absent
and add quotes to each element.
Expand All @@ -242,14 +266,47 @@ def transform_tables_representation(tbl_list):
tbl_list (list): Changed list.
"""
for i, table in enumerate(tbl_list):
if '.' not in table:
tbl_list[i] = pg_quote_identifier('public.%s' % table.strip(), 'table')
else:
tbl_list[i] = pg_quote_identifier(table.strip(), 'table')
tbl_list[i] = normalize_table_name(table)

return tbl_list


def transform_columns_keys(columns):
"""Add quotes to each element of the columns list.

Args:
columns (dict): Dict with tables and columns.

Returns:
columns (dict): Changed dict.
"""
revmap_columns = {}
for table in columns:
revmap_columns[normalize_table_name(table)] = set(c.strip() for c in columns[table]) if columns[table] else None

return revmap_columns


def pg_quote_column_list(table, columns):
"""Convert a list of columns to a string.

Args:
table (str): Table name.
columns (list): List of columns.

Returns:
str: String with columns.
"""
table = normalize_table_name(table)

if not columns:
return table

quoted_columns = [pg_quote_identifier(col, 'column') for col in columns]
quoted_sql = "%s (%s)" % (table, ', '.join(quoted_columns))
return quoted_sql


class PgPublication():
"""Class to work with PostgreSQL publication.

Expand Down Expand Up @@ -279,6 +336,7 @@ def __init__(self, module, cursor, name, pg_srv_ver):
'parameters': {},
'owner': '',
'schemas': [],
'columns': {}
}
self.exists = self.check_pub()

Expand Down Expand Up @@ -326,13 +384,18 @@ def check_pub(self):
# FOR TABLES IN SCHEMA statement is supported since PostgreSQL 15
if self.pg_srv_ver >= 150000:
self.attrs['schemas'] = self.__get_schema_pub_info()
column_info = self.__get_columns_pub_info()
columns = {}
for row in column_info:
columns[normalize_table_name(row["schema_dot_table"])] = set(row['columns'])
self.attrs['columns'] = columns
else:
self.attrs['alltables'] = True

# Publication exists:
return True

def create(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
def create(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
"""Create the publication.

Args:
Expand All @@ -353,7 +416,12 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr

query_fragments = ["CREATE PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')]

if tables:
if columns:
table_strings = []
for table in columns:
table_strings.append(pg_quote_column_list(table, columns[table]))
query_fragments.append("FOR TABLE %s" % ', '.join(table_strings))
elif tables:
query_fragments.append("FOR TABLE %s" % ', '.join(tables))
elif tables_in_schema:
tables_in_schema = [pg_quote_identifier(schema, 'schema') for schema in tables_in_schema]
Expand Down Expand Up @@ -383,7 +451,7 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr

return changed

def update(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
def update(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
"""Update the publication.

Args:
Expand All @@ -403,6 +471,34 @@ def update(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
changed = False

# Add or drop tables from published tables suit:
if columns and not self.attrs['alltables']:
need_set_columns = False
for table in columns:
if table not in self.attrs['tables']:
continue
elif not columns[table]:
all_columns = self.__get_table_columns(table)
if all_columns != self.attrs['columns'][table]:
need_set_columns = True
break
elif self.attrs['columns'][table] != columns[table]:
need_set_columns = True
break

if need_set_columns:
changed = self.__pub_set_columns(columns, check_mode=check_mode)
else:
# Add new tables to the publication:
for table in columns:
if table not in self.attrs['tables']:
changed = self.__pub_add_columns(table, columns[table], check_mode=check_mode)

# Drop redundant tables from the publication:
for table in self.attrs['columns']:
if table not in columns.keys():
changed = self.__pub_drop_table(table, check_mode=check_mode)
elif columns and self.attrs['alltables']:
changed = self.__pub_set_columns(columns, check_mode=check_mode)
if tables and not self.attrs['alltables']:

# 1. If needs to add table to the publication:
Expand Down Expand Up @@ -465,9 +561,8 @@ def update(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
changed = self.__pub_set_param(key, val, check_mode=check_mode)

# Update pub owner:
if owner:
if owner != self.attrs['owner']:
changed = self.__pub_set_owner(owner, check_mode=check_mode)
if owner and owner != self.attrs['owner']:
changed = self.__pub_set_owner(owner, check_mode=check_mode)

if comment is not None and comment != self.attrs['comment']:
changed = set_comment(self.cursor, comment, 'publication',
Expand Down Expand Up @@ -537,6 +632,16 @@ def __get_tables_pub_info(self):
"FROM pg_publication_tables WHERE pubname = %(pname)s")
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)

def __get_columns_pub_info(self):
"""Get and return columns that are published by the publication.

Returns:
List of dicts with published columns.
"""
query = ("SELECT schemaname || '.' || tablename as schema_dot_table, attnames as columns "
"FROM pg_publication_tables WHERE pubname = %(pname)s")
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)

def __get_schema_pub_info(self):
"""Get and return schemas added to the publication.

Expand All @@ -555,6 +660,17 @@ def __get_schema_pub_info(self):
list_of_schemas.extend(d.values())
return list_of_schemas

def __get_table_columns(self, table):
"""Get and return columns names of the table.

Returns:
Set of columns.
"""
query = ("SELECT attname as column_name FROM pg_attribute "
"WHERE attrelid = %(table)s::regclass and attnum > 0 AND NOT attisdropped;")
result = exec_sql(self, query, query_params={'table': table}, add_to_executed=False)
return set([row['column_name'] for row in result])

def __pub_add_table(self, table, check_mode=False):
"""Add a table to the publication.

Expand Down Expand Up @@ -607,6 +723,35 @@ def __pub_set_tables(self, tables, check_mode=False):
', '.join(quoted_tables)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_add_columns(self, table, columns, check_mode=False):
""" Add table with specific columns to the publication.
Args:
table (str): Table name.
columns (list): List of columns.
Kwargs:
check_mode (bool): If True, don't actually change anything,
just make SQL, add it to ``self.executed_queries`` and return True.
Returns:
True if successful, False otherwise.
"""
query = ("ALTER PUBLICATION %s ADD TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
pg_quote_column_list(table, columns)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_set_columns(self, columns_map, check_mode=False):
"""Set columns that need to be published by the publication.
Args:
columns_map (dict): Dictionary of all tables and list of columns.
Kwargs:
check_mode (bool): If True, don't actually change anything,
just make SQL, add it to ``self.executed_queries`` and return True.
Returns:
True if successful, False otherwise.
"""
table_list = [pg_quote_column_list(table, columns_map[table]) for table in columns_map]
query = ("ALTER PUBLICATION %s SET TABLE %s" % (pg_quote_identifier(self.name, 'publication'), ', '.join(table_list)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_add_schema(self, schema, check_mode=False):
"""Add a schema to the publication.

Expand Down Expand Up @@ -720,11 +865,12 @@ def main():
trust_input=dict(type='bool', default=True),
comment=dict(type='str', default=None),
tables_in_schema=dict(type='list', elements='str', default=None),
columns=dict(type='dict', default=None),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
mutually_exclusive=[('tables', 'tables_in_schema')],
mutually_exclusive=[('tables', 'tables_in_schema', "columns")],
)

# Parameters handling:
Expand All @@ -738,6 +884,7 @@ def main():
trust_input = module.params['trust_input']
comment = module.params['comment']
tables_in_schema = module.params['tables_in_schema']
columns = module.params['columns']

if not trust_input:
# Check input for potentially dangerous elements:
Expand Down Expand Up @@ -775,6 +922,8 @@ def main():

if tables_in_schema is not None and pg_srv_ver < 150000:
module.fail_json(msg="Publication of tables in schema is supported by PostgreSQL 15 or greater")
if columns and pg_srv_ver < 150000:
module.fail_json(msg="Publication of columns is supported by PostgreSQL 15 or greater")

# Nothing was changed by default:
changed = False
Expand All @@ -786,14 +935,17 @@ def main():
if tables:
tables = transform_tables_representation(tables)

if columns:
columns = transform_columns_keys(columns)

# If module.check_mode=True, nothing will be changed:
if state == 'present':
if not publication.exists:
changed = publication.create(tables, tables_in_schema, params, owner, comment,
changed = publication.create(tables, tables_in_schema, columns, params, owner, comment,
check_mode=module.check_mode)

else:
changed = publication.update(tables, tables_in_schema, params, owner, comment,
changed = publication.update(tables, tables_in_schema, columns, params, owner, comment,
check_mode=module.check_mode)

elif state == 'absent':
Expand Down
Loading