-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpg_follower_output.c
409 lines (343 loc) · 10.6 KB
/
pg_follower_output.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
/*-------------------------------------------------------------------------
*
* pg_follower_output.c
* logical decoding output plugin
*
* IDENTIFICATION
* pg_follower/pg_follower_output.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "replication/logical.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
/* Support routines */
static void output_insert(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change);
static void output_update(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change);
static void output_delete(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change);
/* Callback routines */
static void follower_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
static void follower_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
static void follower_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
static void follower_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void follower_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
static void follower_truncate(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
typedef struct
{
MemoryContext context;
} PgFollowerData;
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.
*
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
* if standard_conforming_strings were enabled.
*
* XXX: ported from test_decoding.c
*/
static void
print_literal(StringInfo s, Oid typid, char *outputstr)
{
const char *valptr;
switch (typid)
{
case INT2OID:
case INT4OID:
case INT8OID:
case OIDOID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
/* NB: We don't care about Inf, NaN et al. */
appendStringInfoString(s, outputstr);
break;
case BITOID:
case VARBITOID:
appendStringInfo(s, "B'%s'", outputstr);
break;
case BOOLOID:
if (strcmp(outputstr, "t") == 0)
appendStringInfoString(s, "true");
else
appendStringInfoString(s, "false");
break;
default:
appendStringInfoChar(s, '\'');
for (valptr = outputstr; *valptr; valptr++)
{
char ch = *valptr;
if (SQL_STR_DOUBLE(ch, false))
appendStringInfoChar(s, ch);
appendStringInfoChar(s, ch);
}
appendStringInfoChar(s, '\'');
break;
}
}
/*
* Construct a INSERT query. Format is:
*
* INSERT INTO $schema.$table ($type1 [, $type2 ...])
* VALUES ($value1 [, $value2 ...]);
*/
static void
output_insert(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change)
{
HeapTuple new_tuple;
TupleDesc descriptor;
bool first_try = true;
StringInfoData values;
Assert(change->action == REORDER_BUFFER_CHANGE_INSERT);
/* Extract information from arguments */
new_tuple = change->data.tp.newtuple;
descriptor = RelationGetDescr(relation);
/* Construction the query */
appendStringInfo(out, "INSERT INTO %s.%s ( ", schema_name,
RelationGetRelationName(relation));
initStringInfo(&values);
/*
* Seek each attributes to gather the datatype and value of them. System,
* invalid, and null attributes would be skipped.
*/
for (int atts = 0; atts < descriptor->natts; atts++)
{
Form_pg_attribute att = TupleDescAttr(descriptor, atts);
bool isnull;
Datum datum;
Oid typoutput;
bool typisvarlena;
/* Skip if the attribute is invalid */
if (att->attisdropped || att->attgenerated)
continue;
/* Get the Datum representation of this value */
datum = heap_getattr(new_tuple, atts + 1, descriptor, &isnull);
/* Skip if the attribute is NULL */
if (isnull)
continue;
/* Add a comma if this attribute is the second try */
if (!first_try)
{
appendStringInfoString(out, ", ");
appendStringInfoString(&values, ", ");
}
/*
* OK, let's start to write the each attributes. Since someone might
* be skipped, all to-be-written attributes must be explicitly
* described.
*/
appendStringInfo(out, "%s", quote_identifier(NameStr(att->attname)));
getTypeOutputInfo(att->atttypid, &typoutput, &typisvarlena);
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(datum))
appendStringInfoString(&values, "unchanged-toast-datum");
else if (!typisvarlena)
print_literal(&values, att->atttypid,
OidOutputFunctionCall(typoutput, datum));
else
{
Datum val;
val = PointerGetDatum(PG_DETOAST_DATUM(datum));
print_literal(&values, att->atttypid,
OidOutputFunctionCall(typoutput, val));
}
first_try = false;
}
appendStringInfo(out, " ) VALUES ( %s );", values.data);
pfree(values.data);
pfree(schema_name);
}
/*
* Construct an UPDATE query. Not implemented yet.
*/
static void
output_update(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change)
{
/* HeapTuple old_tuple; */
/* HeapTuple new_tuple; */
Assert(change->action == REORDER_BUFFER_CHANGE_UPDATE);
/* Extract information from arguments */
/* old_tuple = change->data.tp.oldtuple; */
/* new_tuple = change->data.tp.newtuple; */
/* Construction the query */
appendStringInfo(out, "UPDATE %s.%s SET ", schema_name,
RelationGetRelationName(relation));
}
/*
* Construct a DELETE query. Not implemented yet.
*/
static void
output_delete(StringInfo out, Relation relation, char *schema_name,
ReorderBufferChange *change)
{
/* HeapTuple old_tuple; */
Assert(change->action == REORDER_BUFFER_CHANGE_DELETE);
/* Extract information from arguments */
/* old_tuple = change->data.tp.oldtuple; */
/* Construction the query */
appendStringInfo(out, "DELETE FROM %s.%s ", schema_name,
RelationGetRelationName(relation));
}
/* Callback routines */
/*
* Startup callback which is called whenever a replication slot is created.
*
* output_plugin_options is not validated here because this module won't use.
*/
static void
follower_startup(LogicalDecodingContext *ctx, OutputPluginOptions *options,
bool is_init)
{
PgFollowerData *data = palloc(sizeof(PgFollowerData));
/* Create our memory context for private allocations. */
data->context = AllocSetContextCreate(ctx->context,
"pg_follower output context",
ALLOCSET_DEFAULT_SIZES);
ctx->output_plugin_private = data;
/* Only textual format is supported. */
options->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
}
/*
* BEGIN callback which is called whenever a start of a committed transaction
* has been decoded.
*/
static void
follower_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "BEGIN;");
OutputPluginWrite(ctx, true);
}
/*
* Change callback which is called for every individual row modification
* inside a transaction.
*/
static void
follower_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
char *schema_name;
PgFollowerData *data = (PgFollowerData *) ctx->output_plugin_private;
MemoryContext old;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
schema_name = get_namespace_name(RelationGetNamespace(relation));
/* Swtich based on the actual action */
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
OutputPluginPrepareWrite(ctx, true);
output_insert(ctx->out, relation, schema_name, change);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
OutputPluginPrepareWrite(ctx, true);
output_update(ctx->out, relation, schema_name, change);
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_DELETE:
OutputPluginPrepareWrite(ctx, true);
output_delete(ctx->out, relation, schema_name, change);
OutputPluginWrite(ctx, true);
break;
default:
elog(ERROR, "unknown change");
break;
}
pfree(schema_name);
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
/*
* COMMIT callback which is called whenever a transaction commit has been
* decoded.
*/
static void
follower_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "COMMIT;");
OutputPluginWrite(ctx, true);
}
/*
* message callback which is called whenever a logical decoding message has been
* decoded.
*
* In terms of pg_follower, DDL commands would be recorded as logical message.
*/
static void
follower_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message)
{
/* Skip if the message is not related with pg_follower */
if (strcmp(prefix, "pg_follower") != 0)
return;
/* DDL command must be transported as transactional message */
Assert(transactional);
/* Replicate the given message as-is */
OutputPluginPrepareWrite(ctx, true);
appendBinaryStringInfo(ctx->out, message, message_size);
OutputPluginWrite(ctx, true);
}
/*
* TRUNCATE callback which is called whenever a truncate command is executed.
*/
static void
follower_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "TRUNCATE ");
for (int i = 0; i < nrelations; i++)
{
Form_pg_class entry = relations[i]->rd_rel;
if (i > 0)
appendStringInfoString(ctx->out, ", ");
appendStringInfoString(ctx->out, NameStr(entry->relname));
/* Check whether RESTART IDENTITY and CASCADE options are specified */
if (change->data.truncate.restart_seqs)
appendStringInfoString(ctx->out, " RESET IDENTITY");
if (change->data.truncate.cascade)
appendStringInfoString(ctx->out, " CASCADE");
}
appendStringInfoString(ctx->out, ";");
OutputPluginWrite(ctx, true);
}
/* Specify output plugin callbacks */
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
cb->startup_cb = follower_startup;
cb->begin_cb = follower_begin;
cb->change_cb = follower_change;
cb->commit_cb = follower_commit;
cb->message_cb = follower_message;
cb->truncate_cb = follower_truncate;
}