Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #609 Publication with columns #763

Merged
Merged
118 changes: 110 additions & 8 deletions plugins/modules/postgresql_publication.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
the publication state will be changed.
aliases: [ login_db ]
type: str
columns:
description:
- List of tables and its columns to add to the publication.
- If no columns are passed for table, it will be published as a whole.
- Mutually exclusive with I(tables) I(tables_in_schema).
kacperk marked this conversation as resolved.
Show resolved Hide resolved
type: dict
kacperk marked this conversation as resolved.
Show resolved Hide resolved
tables:
description:
- List of tables to add to the publication.
Expand All @@ -36,7 +42,7 @@
nothing will be changed.
- If you need to add all tables to the publication with the same name,
drop existent and create new without passing I(tables).
- Mutually exclusive with I(tables_in_schema).
- Mutually exclusive with I(tables_in_schema) I(columns).
kacperk marked this conversation as resolved.
Show resolved Hide resolved
type: list
elements: str
tables_in_schema:
Expand All @@ -45,7 +51,7 @@
for all tables in those schemas.
- If you want to remove all schemas, explicitly pass an empty list C([]).
- Supported since PostgreSQL 15.
- Mutually exclusive with I(tables).
- Mutually exclusive with I(tables) I(columns).
kacperk marked this conversation as resolved.
Show resolved Hide resolved
type: list
elements: str
version_added: '3.5.0'
Expand Down Expand Up @@ -130,6 +136,15 @@
tables:
- prices
- vehicles

- name: Create publication "acme" publishing only prices table and id and named from vehicles tables
community.postgresql.postgresql_publication:
name: acme
columns:
prices:
vehicles:
- id
- name

- name: Create a new publication "acme" for tables in schema "myschema"
community.postgresql.postgresql_publication:
Expand Down Expand Up @@ -250,6 +265,27 @@ def transform_tables_representation(tbl_list):
return tbl_list


def pg_quote_column_list(table, columns):
"""Convert a list of columns to a string.

Args:
table (str): Table name.
columns (list): List of columns.

Returns:
str: String with columns.
"""
if columns is None:
return pg_quote_identifier(table, 'table')

if len(columns) == 0:
return pg_quote_identifier(table, 'table')
kacperk marked this conversation as resolved.
Show resolved Hide resolved

quoted_columns = [pg_quote_identifier(c, 'column') for c in columns]
quoted_sql = "%s (%s)" % (pg_quote_identifier(table, 'table'), ', '.join(quoted_columns))
return quoted_sql


class PgPublication():
"""Class to work with PostgreSQL publication.

Expand Down Expand Up @@ -279,6 +315,7 @@ def __init__(self, module, cursor, name, pg_srv_ver):
'parameters': {},
'owner': '',
'schemas': [],
'columns': {}
}
self.exists = self.check_pub()

Expand Down Expand Up @@ -326,13 +363,15 @@ def check_pub(self):
# FOR TABLES IN SCHEMA statement is supported since PostgreSQL 15
if self.pg_srv_ver >= 150000:
self.attrs['schemas'] = self.__get_schema_pub_info()
if self.pg_srv_ver >= 150000:
self.attrs['columns'] = self.__get_columns_pub_info()
kacperk marked this conversation as resolved.
Show resolved Hide resolved
else:
self.attrs['alltables'] = True

# Publication exists:
return True

def create(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
def create(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
"""Create the publication.

Args:
Expand All @@ -353,7 +392,12 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr

query_fragments = ["CREATE PUBLICATION %s" % pg_quote_identifier(self.name, 'publication')]

if tables:
if columns:
table_strings = []
for table, cols in columns:
table_strings.append(pg_quote_column_list(table, cols))
query_fragments.append("FOR TABLE %s" % ', '.join(table_strings))
elif tables:
query_fragments.append("FOR TABLE %s" % ', '.join(tables))
elif tables_in_schema:
tables_in_schema = [pg_quote_identifier(schema, 'schema') for schema in tables_in_schema]
Expand Down Expand Up @@ -383,7 +427,7 @@ def create(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr

return changed

def update(self, tables, tables_in_schema, params, owner, comment, check_mode=True):
def update(self, tables, tables_in_schema, columns, params, owner, comment, check_mode=True):
"""Update the publication.

Args:
Expand All @@ -403,6 +447,20 @@ def update(self, tables, tables_in_schema, params, owner, comment, check_mode=Tr
changed = False

# Add or drop tables from published tables suit:
if columns and not self.attrs['alltables']:
for table, cols in columns:
if table not in self.attrs['tables']:
changed = self.__pub_add_columns(table, cols, check_mode=check_mode)
else:
changed = self.__pub_set_columns(table, cols, check_mode=check_mode)

# Drop columns that are not in the passed columns:
for row in self.attrs['columns'].items():
if columns[row[0]] is None:
changed = self.__pub_drop_table(table, check_mode=check_mode)
elif columns and self.attrs['alltables']:
for table, cols in columns:
changed = self.__pub_set_columns(table, cols, check_mode=check_mode)
if tables and not self.attrs['alltables']:

# 1. If needs to add table to the publication:
Expand Down Expand Up @@ -537,6 +595,16 @@ def __get_tables_pub_info(self):
"FROM pg_publication_tables WHERE pubname = %(pname)s")
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)

def __get_columns_pub_info(self):
"""Get and return columns that are published by the publication.

Returns:
List of dicts with published columns.
"""
query = ("SELECT schemaname || '.' || tablename as schema_dot_table, attnames as columns "
"FROM pg_publication_tables WHERE pubname = %(pname)s")
return exec_sql(self, query, query_params={'pname': self.name}, add_to_executed=False)

def __get_schema_pub_info(self):
"""Get and return schemas added to the publication.

Expand Down Expand Up @@ -607,6 +675,36 @@ def __pub_set_tables(self, tables, check_mode=False):
', '.join(quoted_tables)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_add_columns(self, table, columns, check_mode=False):
""" Add table with specific columns to the publication.
Args:
table (str): Table name.
columns (list): List of columns.
Kwargs:
check_mode (bool): If True, don't actually change anything,
just make SQL, add it to ``self.executed_queries`` and return True.
Returns:
True if successful, False otherwise.
"""
query = ("ALTER PUBLICATION %s ADD TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
pg_quote_column_list(table, columns)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_set_columns(self, table, columns, check_mode=False):
"""Set columns that need to be published by the publication.
Args:
table (str): Table name.
columns (list): List of columns.
Kwargs:
check_mode (bool): If True, don't actually change anything,
just make SQL, add it to ``self.executed_queries`` and return True.
Returns:
True if successful, False otherwise.
"""
query = ("ALTER PUBLICATION %s SET TABLE %s" % (pg_quote_identifier(self.name, 'publication'),
pg_quote_column_list(table, columns)))
return self.__exec_sql(query, check_mode=check_mode)

def __pub_add_schema(self, schema, check_mode=False):
"""Add a schema to the publication.

Expand Down Expand Up @@ -720,11 +818,12 @@ def main():
trust_input=dict(type='bool', default=True),
comment=dict(type='str', default=None),
tables_in_schema=dict(type='list', elements='str', default=None),
columns=dict(type='dict', default=None),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
mutually_exclusive=[('tables', 'tables_in_schema')],
mutually_exclusive=[('tables', 'tables_in_schema', "columns")],
)

# Parameters handling:
Expand All @@ -738,6 +837,7 @@ def main():
trust_input = module.params['trust_input']
comment = module.params['comment']
tables_in_schema = module.params['tables_in_schema']
columns = module.params['columns']

if not trust_input:
# Check input for potentially dangerous elements:
Expand Down Expand Up @@ -775,6 +875,8 @@ def main():

if tables_in_schema is not None and pg_srv_ver < 150000:
module.fail_json(msg="Publication of tables in schema is supported by PostgreSQL 15 or greater")
if columns is not None and pg_srv_ver < 150000:
module.fail_json(msg="Publication of columns is supported by PostgreSQL 15 or greater")
kacperk marked this conversation as resolved.
Show resolved Hide resolved

# Nothing was changed by default:
changed = False
Expand All @@ -789,11 +891,11 @@ def main():
# If module.check_mode=True, nothing will be changed:
if state == 'present':
if not publication.exists:
changed = publication.create(tables, tables_in_schema, params, owner, comment,
changed = publication.create(tables, tables_in_schema, columns, params, owner, comment,
check_mode=module.check_mode)

else:
changed = publication.update(tables, tables_in_schema, params, owner, comment,
changed = publication.update(tables, tables_in_schema, columns, params, owner, comment,
check_mode=module.check_mode)

elif state == 'absent':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
name: '{{ item }}'
columns:
- id int
- name text
loop:
- '{{ test_table1 }}'
- '{{ test_schema }}.{{ test_table2 }}'
Expand Down Expand Up @@ -496,6 +497,27 @@
that:
- result.rowcount == 1

# Test
- name: postgresql_publication - add set columns for table in publication
when: postgres_version_resp.stdout is version('15', '>=')

<<: *task_parameters
postgresql_publication:
<<: *pg_parameters
name: '{{ test_pub }}'
columns:
'{{ test_table1 }}': []
'{{ test_schema }}.{{ test_table2 }}': ['name']
'{{ test_table3 }}': ['id']
trust_input: false
check_mode: true
kacperk marked this conversation as resolved.
Show resolved Hide resolved

- assert:
that:
- result is changed
- result.queries == ["ALTER PUBLICATION \"{{ test_pub }}\" SET TABLE \"{{ test_schema }}\".\"{{ test_table3 }}\" ("name")"]
- result.tables == ["\"public\".\"{{ test_table1 }}\"", "\"{{ test_schema }}\".\"{{ test_table2 }}\"", "\"public\".\"{{ test_table3 }}\""]

# https://github.com/ansible-collections/community.postgresql/pull/718
- name: Create publication for schema in check mode
when: postgres_version_resp.stdout is version('15', '>=')
Expand Down
Loading