-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpsql-streamer.toml
137 lines (99 loc) · 4.6 KB
/
psql-streamer.toml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# HTTP interface for stats & health checking
http = "127.0.0.1:12080"
# Interval between stats logging from sources and sinks
# Set to 0 to disable
tickerInterval = "10s"
# Path to a BoltDB containing the parameters used by the service
# Currently it's only needed for PostgreSQL source an can be omitted if this source isn't used.
boltdb = "/tmp/psql-streamer.bolt"
[source.db1]
type = "postgres"
# PostgreSQL connection DSN
dsn = "dbname=test sslmode=disable"
# Name of the publication in PostgreSQL created with CREATE PUBLICATION
publication = "pub1"
# Replication slot represents a unique replication client in the database.
# PostgreSQL stores the current client WAL positions in the slot (they're updated by the status messages from the client)
# Be aware that PostgreSQL has a configurable limit on the number of replication slots.
# Only one client can occupy the replication slot at any given time.
replicationSlot = "db1"
# How frequently to retry sending to sinks in case of an error
sendRetryInterval = "1s"
# How frequently to try to restart replication if it dies for some reason
startRetryInterval = "2s"
# How many messages to get from PostgreSQL before pushing them into sinks
batchSize = 400
# How long to wait for the batchSize to fill before flushing the buffer
batchFlushInterval = "1s"
# PostgreSQL connection timeout
timeout = "2s"
# If set to something larger than zero - will override the value that is read from BoltDB.
# The source will ask PostgreSQL to start streaming from this position.
walPositionOverride = 0
# This affects for how long PostgreSQL will retain it's WAL logs.
# Service will confirm current walPos minus walRetain bytes as flushed-to-disk to PostgreSQL.
# IMPORTANT: the walRetain should be large enough to accomodate batchSize number of events,
# or in case of crash you might lose some events (PostgreSQL will not allow to go backwards in log deep enough)
walRetain = 1048576
# Verbose mode (logs every event - one line)
verbose = false
# Enable debug
debug = false
[source.kafka1]
type = "kafka"
# List of kafka brokers in host:port form
hosts = [ "kafka1:9092", "kafka2:9092", "kafka3:9092" ]
# List of Kafka topics to fetch events from
topics = [ "topic1", "topic2" ]
# When running several instances with the same groupID on the same Kafka cluster/topic then
# the events will be distributed evenly between consumers
groupID = "bar"
# How frequently to retry sending events to sinks in case of failure
sendRetryInterval = "100ms"
# How many messages to get from Kafka before pushing them into sinks.
# This also sets Kafka library internal queue size
batchSize = 400
# How long to wait for batchSize to fill before flushing the buffer
batchFlushInterval = "1s"
# Verbose mode (logs every event - one line)
verbose = false
# Enable debug mode
debug = false
[sink.kafka1]
type = "kafka"
sources = [ "kafka1" ]
# List of kafka brokers in host:port form
hosts = [ "kafka1:9092", "kafka2:9092", "kafka3:9092" ]
# A list of handlers to enable (currently only "passthrough" is available)
handlers = [ "passthrough" ]
# Mapping between table name of event and Kafka topic
tableTopicMapping = { table1 = "topic1", table2 = "topic2" }
# This specifies the topic where the messages for which the mapping wasn't found will go.
# If it's empty or undefined then they will be discarded.
# At least one of 'tableTopicMapping' or 'fallbackTopic' should be specified.
topicFallback = "garbageBin"
# Timeout for Kafka message sending
timeout = "2s"
# Maximum delivery attempts that Kafka client will do internally
maxAttempts = 3
# How frequently to poll cluster for partition changes
rebalanceInterval = "5s"
# -1 : to require all replicas to ACK
# 0 : no ACKs required
# >0 : wait for this number of replicas to ACK
requiredAcks = -1
# Enable asynchronous operation. The Kafka library will use batches to dispatch events more efficiently.
# Be careful - in this mode you can lose events because Kafka library will not report back the errors
# and we'll always report success to the source as if the messages were sent successfully.
async = false
# Number of messages to buffer before sending to Kafka
batchSize = 100
# How frequently to flush the queue if batchSize is not yet exceeded.
# Be advised that this parameter is also used with 'async = false' and messages will be sent no faster than 1sec/batchTimeout.
# Decreasing this parameter leads to more frequent polling of queue which increases CPU usage even when idle.
# With 5ms each sink uses around 10% of one core on a MacBook's 3.3Ghz Core i5 7287U.
batchTimeout = "10ms"
# Verbose mode (logs every non-skipped event - one line)
verbose = false
# Enable debug
debug = false