Skip to content

Commit

Permalink
Fix initial copy with different order of columns
Browse files Browse the repository at this point in the history
  • Loading branch information
PJMODOS committed Oct 8, 2016
1 parent af77ddc commit 88ff7ff
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 62 deletions.
39 changes: 21 additions & 18 deletions expected/add_table.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
SELECT * FROM pglogical_regress_variables()
\gset
\c :provider_dsn
CREATE TABLE public.test_publicschema(id serial primary key, data text);
\c :subscriber_dsn
CREATE TABLE public.test_publicschema(data text, id serial primary key);
\c :provider_dsn
SELECT pglogical.replicate_ddl_command($$
CREATE SCHEMA "strange.schema-IS";
CREATE TABLE public.test_publicschema(id serial primary key, data text);
CREATE TABLE public.test_nosync(id serial primary key, data text);
CREATE TABLE "strange.schema-IS".test_strangeschema(id serial primary key);
CREATE TABLE "strange.schema-IS".test_diff_repset(id serial primary key, data text DEFAULT '');
Expand Down Expand Up @@ -102,8 +105,8 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);

\c :subscriber_dsn
SELECT * FROM public.test_publicschema;
id | data
----+------
data | id
------+----
(0 rows)

\c :provider_dsn
Expand Down Expand Up @@ -177,12 +180,12 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);

\c :subscriber_dsn
SELECT * FROM public.test_publicschema;
id | data
----+------
1 | a
2 | b
3 | c
4 | d
data | id
------+----
a | 1
b | 2
c | 3
d | 4
(4 rows)

SELECT * FROM "strange.schema-IS".test_strangeschema;
Expand Down Expand Up @@ -229,9 +232,9 @@ SELECT * FROM public.test_nosync;

DELETE FROM public.test_publicschema WHERE id > 1;
SELECT * FROM public.test_publicschema;
id | data
----+------
1 | a
data | id
------+----
a | 1
(1 row)

SELECT * FROM pglogical.alter_subscription_resynchronize_table('test_subscription', 'test_publicschema');
Expand Down Expand Up @@ -261,12 +264,12 @@ SELECT sync_kind, sync_subid, sync_nspname, sync_relname, sync_status FROM pglog
(4 rows)

SELECT * FROM public.test_publicschema;
id | data
----+------
1 | a
2 | b
3 | c
4 | d
data | id
------+----
a | 1
b | 2
c | 3
d | 4
(4 rows)

\x
Expand Down
9 changes: 6 additions & 3 deletions expected/replication_set.out
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), pid) FROM pg_stat_r
(1 row)

-- show initial replication sets
SELECT * FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
nspname | relname | set_name
-------------------+--------------------+----------
normalschema | test_normalschema |
Expand Down Expand Up @@ -113,7 +114,8 @@ ERROR: replication set repset_replicate_instrunc cannot be altered to replicate
SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true);
ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY
-- check the replication sets
SELECT * FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
nspname | relname | set_name
-------------------+--------------------+---------------------------
normalschema | test_normalschema | repset_replicate_instrunc
Expand All @@ -129,7 +131,8 @@ SELECT * FROM pglogical.replication_set_add_all_tables('default_insert_only', '{
t
(1 row)

SELECT * FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
SELECT nspname, relname, set_name FROM pglogical.tables
WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3;
nspname | relname | set_name
-------------------+--------------------+---------------------------
normalschema | test_normalschema | repset_replicate_instrunc
Expand Down
37 changes: 37 additions & 0 deletions pglogical--1.2.0--1.3.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,40 @@ CREATE FUNCTION pglogical.create_subscription(subscription_name name, provider_d
replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = false,
synchronize_data boolean = true, forward_origins text[] = '{all}', apply_delay interval DEFAULT '0')
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_subscription';

DROP VIEW pglogical.TABLES;
CREATE VIEW pglogical.TABLES AS
WITH set_relations AS (
SELECT s.set_name, r.set_reloid
FROM pglogical.replication_set_table r,
pglogical.replication_set s,
pglogical.local_node n
WHERE s.set_nodeid = n.node_id
AND s.set_id = r.set_id
),
user_tables AS (
SELECT r.oid, n.nspname, r.relname, r.relreplident
FROM pg_catalog.pg_class r,
pg_catalog.pg_namespace n
WHERE r.relkind = 'r'
AND r.relpersistence = 'p'
AND n.oid = r.relnamespace
AND n.nspname !~ '^pg_'
AND n.nspname != 'information_schema'
AND n.nspname != 'pglogical'
)
SELECT r.oid AS relid, n.nspname, r.relname, s.set_name
FROM pg_catalog.pg_namespace n,
pg_catalog.pg_class r,
set_relations s
WHERE r.relkind = 'r'
AND n.oid = r.relnamespace
AND r.oid = s.set_reloid
UNION
SELECT t.oid AS relid, t.nspname, t.relname, NULL
FROM user_tables t
WHERE t.oid NOT IN (SELECT set_reloid FROM set_relations);

CREATE FUNCTION pglogical.show_repset_table_info(relation regclass, repsets text[], OUT relid oid, OUT nspname text,
OUT relname text, OUT att_filter text[], OUT has_row_filter boolean)
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_repset_table_info';
11 changes: 8 additions & 3 deletions pglogical--1.3.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ CREATE VIEW pglogical.TABLES AS
AND n.nspname != 'information_schema'
AND n.nspname != 'pglogical'
)
SELECT n.nspname, r.relname, s.set_name
SELECT r.oid AS relid, n.nspname, r.relname, s.set_name
FROM pg_catalog.pg_namespace n,
pg_catalog.pg_class r,
set_relations s
WHERE r.relkind = 'r'
AND n.oid = r.relnamespace
AND r.oid = s.set_reloid
UNION
SELECT t.nspname, t.relname, NULL
SELECT t.oid AS relid, t.nspname, t.relname, NULL
FROM user_tables t
WHERE t.oid NOT IN (SELECT set_reloid FROM set_relations);

Expand All @@ -155,7 +155,8 @@ RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglo
CREATE FUNCTION pglogical.drop_replication_set(set_name name, ifexists boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_replication_set';

CREATE FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean DEFAULT false, columns text[] DEFAULT NULL, row_filter pg_node_tree DEFAULT NULL)
CREATE FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean DEFAULT false,
att_filter text[] DEFAULT NULL, row_filter pg_node_tree DEFAULT NULL)
RETURNS boolean CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_table';
CREATE FUNCTION pglogical.replication_set_add_all_tables(set_name name, schema_names text[], synchronize_data boolean DEFAULT false)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_all_tables';
Expand All @@ -178,6 +179,10 @@ RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alte
CREATE FUNCTION pglogical.synchronize_sequence(relation regclass)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_synchronize_sequence';

CREATE FUNCTION pglogical.show_repset_table_info(relation regclass, repsets text[], OUT relid oid, OUT nspname text,
OUT relname text, OUT att_filter text[], OUT has_row_filter boolean)
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_repset_table_info';

CREATE FUNCTION pglogical.show_subscription_table(subscription_name name, relation regclass, OUT nspname text, OUT relname text, OUT status text)
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_table';

Expand Down
82 changes: 82 additions & 0 deletions pglogical.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,88 @@ textarray_to_list(ArrayType *textarray)
return res;
}

/*
* Deconstruct the text representation of a 1-dimensional Postgres array
* into individual items.
*
* On success, returns true and sets *itemarray and *nitems to describe
* an array of individual strings. On parse failure, returns false;
* *itemarray may exist or be NULL.
*
* NOTE: free'ing itemarray is sufficient to deallocate the working storage.
*/
bool
parsePGArray(const char *atext, char ***itemarray, int *nitems)
{
int inputlen;
char **items;
char *strings;
int curitem;

/*
* We expect input in the form of "{item,item,item}" where any item is
* either raw data, or surrounded by double quotes (in which case embedded
* characters including backslashes and quotes are backslashed).
*
* We build the result as an array of pointers followed by the actual
* string data, all in one malloc block for convenience of deallocation.
* The worst-case storage need is not more than one pointer and one
* character for each input character (consider "{,,,,,,,,,,}").
*/
*itemarray = NULL;
*nitems = 0;
inputlen = strlen(atext);
if (inputlen < 2 || atext[0] != '{' || atext[inputlen - 1] != '}')
return false; /* bad input */
items = (char **) malloc(inputlen * (sizeof(char *) + sizeof(char)));
if (items == NULL)
return false; /* out of memory */
*itemarray = items;
strings = (char *) (items + inputlen);

atext++; /* advance over initial '{' */
curitem = 0;
while (*atext != '}')
{
if (*atext == '\0')
return false; /* premature end of string */
items[curitem] = strings;
while (*atext != '}' && *atext != ',')
{
if (*atext == '\0')
return false; /* premature end of string */
if (*atext != '"')
*strings++ = *atext++; /* copy unquoted data */
else
{
/* process quoted substring */
atext++;
while (*atext != '"')
{
if (*atext == '\0')
return false; /* premature end of string */
if (*atext == '\\')
{
atext++;
if (*atext == '\0')
return false; /* premature end of string */
}
*strings++ = *atext++; /* copy quoted data */
}
atext++;
}
}
*strings++ = '\0';
if (*atext == ',')
atext++;
curitem++;
}
if (atext[1] != '\0')
return false; /* bogus syntax (embedded '}') */
*nitems = curitem;
return true;
}

/*
* Get oid of our queue table.
*/
Expand Down
1 change: 1 addition & 0 deletions pglogical.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extern char *pglogical_extra_connection_options;
extern char *shorten_hash(const char *str, int maxlen);

extern List *textarray_to_list(ArrayType *textarray);
extern bool parsePGArray(const char *atext, char ***itemarray, int *nitems);

extern Oid get_pglogical_table_oid(const char *table);

Expand Down
Loading

0 comments on commit 88ff7ff

Please sign in to comment.