-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync--0.1.sql
246 lines (199 loc) · 6.53 KB
/
sync--0.1.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
\echo Use "CREATE EXTENSION sync;" to load this file. \quit
COMMENT ON EXTENSION sync
IS 'Extension PostgreSQL pour pouvoir synchroniser des tables entre plusieurs bases de données';
CREATE TABLE IF NOT EXISTS sync.db_id
(
db_id uuid NOT NULL PRIMARY KEY,
is_main boolean NOT NULL DEFAULT FALSE
);
INSERT INTO sync.db_id(db_id) VALUES (public.gen_random_uuid());
CREATE OR REPLACE FUNCTION sync.db_id_locker() RETURNS trigger LANGUAGE plpgsql AS
$BODY$
BEGIN
RAISE EXCEPTION 'sync.db_id is locked!';
END;
$BODY$;
CREATE TRIGGER db_id_lock
BEFORE INSERT OR DELETE
ON sync.db_id
FOR EACH ROW
EXECUTE PROCEDURE sync.db_id_locker();
CREATE OR REPLACE FUNCTION sync.set_database_as_main() RETURNS void LANGUAGE sql AS
$$ UPDATE sync.db_id SET is_main = TRUE; $$;
COMMENT ON FUNCTION sync.set_database_as_main() IS 'Set the database as the main database';
CREATE OR REPLACE FUNCTION sync.db_id() RETURNS uuid LANGUAGE sql STABLE AS
$$ SELECT db_id FROM sync.db_id; $$;
COMMENT ON FUNCTION sync.db_id() IS 'Returns database ID';
CREATE OR REPLACE FUNCTION sync.is_main() RETURNS boolean LANGUAGE sql STABLE AS
$$ SELECT is_main FROM sync.db_id; $$;
COMMENT ON FUNCTION sync.is_main() IS 'Returns whether this database is the main database';
CREATE OR REPLACE FUNCTION sync.is_server() RETURNS boolean LANGUAGE sql STABLE AS
$$ SELECT is_main FROM sync.db_id; $$;
COMMENT ON FUNCTION sync.is_server() IS 'Returns whether this database is the main database';
CREATE OR REPLACE FUNCTION sync.is_replica() RETURNS boolean LANGUAGE sql STABLE AS
$$ SELECT NOT is_main FROM sync.db_id; $$;
COMMENT ON FUNCTION sync.is_replica() IS 'Returns whether this database is a replica';
CREATE TABLE IF NOT EXISTS sync.metadata
(
table_id regclass primary key,
synced_at TIMESTAMP WITH TIME ZONE,
download boolean not null default true,
upload boolean not null default true
);
SELECT pg_catalog.pg_extension_config_dump('sync.metadata', '');
CREATE OR REPLACE FUNCTION sync.update_metadata()
RETURNS void
LANGUAGE plpgsql
AS
$BODY$
DECLARE
_record record;
BEGIN
FOR _record IN (SELECT table_id FROM sync.metadata)
LOOP
EXECUTE FORMAT(
$$
UPDATE sync.metadata
SET synced_at = (SELECT MAX(pgs_synced_at) FROM %I)
WHERE table_id = %L::regclass;
$$,
_record.table_id,
_record.table_id
);
END LOOP;
END;
$BODY$;
SELECT sync.update_metadata();
CREATE OR REPLACE FUNCTION sync.install_tracer(_table REGCLASS, _download BOOLEAN DEFAULT TRUE, _upload BOOLEAN DEFAULT TRUE)
RETURNS void
LANGUAGE plpgsql
AS
$BODY$
DECLARE
_primary_keys TEXT = (
SELECT string_agg(quote_literal(attname), ',')
FROM pg_index
JOIN pg_attribute ON attrelid = indrelid AND attnum = ANY(indkey)
WHERE indrelid = _table AND indisprimary
);
_index_name TEXT = (select relname from pg_class where oid = _table) || '_pgs_synced_at_idx';
BEGIN
IF _primary_keys IS NULL THEN
RAISE WARNING 'no primary key detected, delete operations will not be available';
END IF;
BEGIN
EXECUTE FORMAT('ALTER TABLE %s ADD COLUMN pgs_is_active BOOLEAN DEFAULT TRUE;', _table);
EXCEPTION WHEN duplicate_column THEN
RAISE NOTICE 'pgs_is_active already exists';
END;
BEGIN
EXECUTE FORMAT('ALTER TABLE %s ADD COLUMN pgs_changed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT statement_timestamp();', _table);
EXCEPTION WHEN duplicate_column THEN
RAISE NOTICE 'pgs_changed_at already exists';
END;
BEGIN
EXECUTE FORMAT('ALTER TABLE %s ADD COLUMN pgs_synced_at TIMESTAMP WITH TIME ZONE DEFAULT statement_timestamp();', _table);
EXCEPTION WHEN duplicate_column THEN
RAISE NOTICE 'pgs_synced_at already exists';
END;
EXECUTE FORMAT('DROP TRIGGER IF EXISTS pgs_trace_changes ON %s;', _table);
EXECUTE FORMAT(
$$
CREATE TRIGGER pgs_trace_changes
BEFORE INSERT OR UPDATE OR DELETE
ON %s
FOR EACH ROW
EXECUTE PROCEDURE sync.trace_changes(%s)
$$,
_table,
_primary_keys
);
EXECUTE FORMAT('DROP INDEX IF EXISTS %I;', _index_name);
EXECUTE FORMAT(
'CREATE INDEX %I ON %s (pgs_synced_at DESC NULLS FIRST)',
_index_name,
_table
);
EXECUTE FORMAT(
$$
INSERT INTO sync.metadata(table_id, synced_at, download, upload)
SELECT %L::regclass, max(pgs_synced_at), %L::boolean, %L::boolean
FROM %s
ON CONFLICT (table_id) DO UPDATE
SET synced_at = EXCLUDED.synced_at,
download = EXCLUDED.download,
upload = EXCLUDED.upload;
$$,
_table,
_download,
_upload,
_table
);
END;
$BODY$;
CREATE OR REPLACE FUNCTION sync.trace_changes()
RETURNS trigger
LANGUAGE plpgsql
AS
$BODY$
DECLARE _sql_delete text;
BEGIN
IF TG_OP = 'DELETE' THEN
IF OLD.pgs_is_active IS NULL THEN
RETURN OLD;
ELSE
IF TG_NARGS = 0 THEN
RAISE EXCEPTION 'primary key is missing as trigger args';
END IF;
IF OLD.pgs_is_active THEN
_sql_delete := FORMAT(
'UPDATE %I.%I SET pgs_is_active = FALSE WHERE (%s) = (%s)',
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
array_to_string(array(SELECT quote_ident(c) FROM UNNEST(TG_ARGV) t(c)), ','),
array_to_string(array(SELECT '$1.' || quote_ident(c) FROM UNNEST(TG_ARGV) t(c)), ',')
);
EXECUTE _sql_delete USING OLD;
END IF;
RETURN NULL;
END IF;
ELSE
NEW.pgs_changed_at = statement_timestamp();
IF sync.is_main() THEN
NEW.pgs_synced_at = statement_timestamp();
ELSE
NEW.pgs_synced_at = NULL;
END IF;
RETURN NEW;
END IF;
END;
$BODY$;
CREATE OR REPLACE FUNCTION sync.uninstall_tracer(_table REGCLASS)
RETURNS void
LANGUAGE plpgsql
AS
$BODY$
DECLARE
_index_name TEXT = (select relname from pg_class where oid = _table) || '_pgs_synced_at_idx';
BEGIN
EXECUTE FORMAT('ALTER TABLE %s DROP COLUMN IF EXISTS pgs_is_active;', _table);
EXECUTE FORMAT('ALTER TABLE %s DROP COLUMN IF EXISTS pgs_changed_at;', _table);
EXECUTE FORMAT('ALTER TABLE %s DROP COLUMN IF EXISTS pgs_synced_at;', _table);
EXECUTE FORMAT('DROP TRIGGER IF EXISTS pgs_trace_changes ON %s;', _table);
EXECUTE FORMAT('DROP INDEX IF EXISTS %I;', _index_name);
EXECUTE FORMAT(
'DELETE FROM sync.metadata WHERE table_id = %L::regclass;',
_table
);
END;
$BODY$;
GRANT USAGE ON SCHEMA sync TO PUBLIC;
GRANT SELECT ON TABLE sync.db_id TO PUBLIC;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE sync.metadata TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.db_id() TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.is_main() TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.is_server() TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.is_replica() TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.install_tracer(REGCLASS, BOOLEAN, BOOLEAN) TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.uninstall_tracer(REGCLASS) TO PUBLIC;
GRANT EXECUTE ON FUNCTION sync.trace_changes() TO PUBLIC;