-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
93 lines (80 loc) · 2.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgproto3/v2"
)
var pgurl = flag.String("pgurl", "", "pgurl")
var slotName = flag.String("slot_name", "test_slot_1", "slot name")
var startLSN = flag.String("start_lsn", "A0/A000000", "start lsn")
func main() {
flag.Parse()
const outputPlugin = "test_decoding"
conn, err := pgconn.Connect(context.Background(), *pgurl)
if err != nil {
log.Fatalln("failed to connect to PostgreSQL server:", err)
}
defer conn.Close(context.Background())
clientXLogPos, err := pglogrepl.ParseLSN(*startLSN)
if err != nil {
log.Panic(err)
}
err = pglogrepl.StartReplication(context.Background(), conn, *slotName, clientXLogPos, pglogrepl.StartReplicationOptions{PluginArgs: []string{}})
if err != nil {
log.Fatalln("StartReplication failed:", err)
}
log.Println("Logical replication started on slot", *slotName)
standbyMessageTimeout := time.Second * 10
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
for {
if time.Now().After(nextStandbyMessageDeadline) {
err = pglogrepl.SendStandbyStatusUpdate(context.Background(), conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos})
if err != nil {
log.Fatalln("SendStandbyStatusUpdate failed:", err)
}
log.Println("Sent Standby status message")
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
ctx, cancel := context.WithDeadline(context.Background(), nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil {
if pgconn.Timeout(err) {
continue
}
log.Fatalln("ReceiveMessage failed:", err)
}
if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
log.Fatalf("received Postgres WAL error: %+v", errMsg)
}
msg, ok := rawMsg.(*pgproto3.CopyData)
if !ok {
log.Printf("Received unexpected message: %T\n", rawMsg)
continue
}
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
if err != nil {
log.Fatalln("ParsePrimaryKeepaliveMessage failed:", err)
}
log.Println("Primary Keepalive Message =>", "ServerWALEnd:", pkm.ServerWALEnd, "ServerTime:", pkm.ServerTime, "ReplyRequested:", pkm.ReplyRequested)
if pkm.ReplyRequested {
nextStandbyMessageDeadline = time.Time{}
}
case pglogrepl.XLogDataByteID:
xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
log.Fatalln("ParseXLogData failed:", err)
}
log.Printf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)
fmt.Printf("data: %s\n", xld.WALData)
clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
}
}
}