-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconvertseq.go
326 lines (290 loc) · 11 KB
/
convertseq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package main
import (
"database/sql"
"flag"
"fmt"
"log"
"os"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
)
var (
mode = flag.String("mode", "", "Mode of operation: sync or restore")
syncUser = flag.String("syncUser", "root", "Sync user")
syncIP = flag.String("syncIP", "127.0.0.1", "Sync IP address")
syncPort = flag.Int("syncPort", 4000, "Sync port")
syncPasswd = flag.String("syncPasswd", "", "Sync password")
syncInterval = flag.Duration("syncInterval", 5*time.Second, "Sync interval")
restoreUser = flag.String("restoreUser", "root", "Restore user")
restoreIP = flag.String("restoreIP", "127.0.0.1", "Restore IP address")
restorePort = flag.Int("restorePort", 4000, "Restore port")
restorePasswd = flag.String("restorePasswd", "", "Restore password")
restoreWorkers = flag.Int("restoreWorkers", 5, "Number of workers for restore operation")
//added Schema parameter by Swee
syncSchema = flag.String("syncSchema", "test", "Sync Schema")
restoreSchema = flag.String("restoreSchema", "test", "Restore Schema")
logFilePath = flag.String("logFilePath", "error.log", "Path to error log file")
)
const sequenceQuery = `
SELECT SEQUENCE_SCHEMA, SEQUENCE_NAME,
CASE
WHEN CACHE = 0 AND CYCLE = 0 THEN CONCAT('CREATE SEQUENCE ', SEQUENCE_SCHEMA, '.', SEQUENCE_NAME, ' START WITH ', START, ' MINVALUE ', MIN_VALUE, ' MAXVALUE ', MAX_VALUE, ' INCREMENT BY ', INCREMENT, ' NOCACHE NOCYCLE;')
WHEN CACHE = 1 AND CYCLE = 0 THEN CONCAT('CREATE SEQUENCE ', SEQUENCE_SCHEMA, '.', SEQUENCE_NAME, ' START WITH ', START, ' MINVALUE ', MIN_VALUE, ' MAXVALUE ', MAX_VALUE, ' INCREMENT BY ', INCREMENT, ' CACHE ', CACHE_VALUE, ' NOCYCLE;')
WHEN CACHE = 0 AND CYCLE = 1 THEN CONCAT('CREATE SEQUENCE ', SEQUENCE_SCHEMA, '.', SEQUENCE_NAME, ' START WITH ', START, ' MINVALUE ', MIN_VALUE, ' MAXVALUE ', MAX_VALUE, ' INCREMENT BY ', INCREMENT, ' NOCACHE CYCLE;')
WHEN CACHE = 1 AND CYCLE = 1 THEN CONCAT('CREATE SEQUENCE ', SEQUENCE_SCHEMA, '.', SEQUENCE_NAME, ' START WITH ', START, ' MINVALUE ', MIN_VALUE, ' MAXVALUE ', MAX_VALUE, ' INCREMENT BY ', INCREMENT, ' CACHE ', CACHE_VALUE, ' CYCLE;')
END AS create_sql
FROM information_schema.sequences;
`
func main() {
flag.Parse()
// Added by Swee: Set the log output to the file
logFile, err := os.OpenFile(*logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
os.Exit(1)
}
defer logFile.Close()
log.SetOutput(logFile)
if *mode != "sync" && *mode != "restore" {
fmt.Println("Usage: go run main.go -mode=<sync|restore>")
os.Exit(1)
}
var dsn string
switch *mode {
case "sync":
//added Schema parameter by Swee
dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/", *syncUser, *syncPasswd, *syncIP, *syncPort)
case "restore":
//added Schema parameter by Swee
dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/", *restoreUser, *restorePasswd, *restoreIP, *restorePort)
}
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
switch *mode {
case "sync":
syncSeq(db, *syncSchema)
case "restore":
restoreSeq(db, *restoreSchema)
}
}
func syncSeq(db *sql.DB, schema string) {
//add CREATE DATABASE by Muhaira
_, err1 := db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema))
if err1 != nil {
log.Fatalf("Failed to create database: %v", err1)
}
// Create table if not exists
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.sequence_sync (
schema_name varchar(64) NOT NULL,
sequence_name varchar(64) NOT NULL,
current_value BIGINT UNSIGNED NULL,
create_sql varchar(300) NULL,
update_time DATETIME NULL,
PRIMARY KEY (schema_name, sequence_name)
);`)
if err != nil {
log.Fatalf("Failed to create table: %v", err)
}
for {
// Insert or update sequence information
trx, err := db.Begin()
if err != nil {
log.Fatalf("Failed to execute begin statement: %v", err)
}
_, err = trx.Exec(`REPLACE INTO ` + schema + `.sequence_sync (schema_name, sequence_name, create_sql) ` + sequenceQuery)
if err != nil {
log.Fatalf("Failed to insert or update sequence information: %v", err)
}
// Read data from sequence_sync to show table next_row_id and filter only the type is sequence
rows, err := db.Query("SELECT schema_name, sequence_name FROM " + schema + ".sequence_sync")
if err != nil {
log.Fatalf("Failed to query sequence_sync: %v", err)
}
existingSequences := make(map[string]bool)
for rows.Next() {
var schemaName, sequenceName string
if err := rows.Scan(&schemaName, &sequenceName); err != nil {
log.Fatalf("Failed to scan row: %v", err)
}
query := fmt.Sprintf("SHOW TABLE `%s`.`%s` NEXT_ROW_ID", schemaName, sequenceName)
results, err := db.Query(query)
if err != nil {
log.Printf("Failed to show next_row_id for sequence %s.%s: %v", schemaName, sequenceName, err)
continue
}
var nextNotCachedValue int64
for results.Next() {
var dbName, tableName, columnName, nextGlobalRowID, idType string
if err := results.Scan(&dbName, &tableName, &columnName, &nextGlobalRowID, &idType); err != nil {
log.Fatalf("Failed to scan result: %v", err)
}
if idType == "SEQUENCE" {
nextNotCachedValue, _ = strconv.ParseInt(nextGlobalRowID, 10, 64)
}
}
if err := results.Err(); err != nil {
log.Fatalf("Error iterating over results: %v", err)
}
results.Close()
// Mark sequence as existing
existingSequences[schemaName+"."+sequenceName] = true
// Directly execute the update statement
updateStatement := fmt.Sprintf("UPDATE "+schema+".sequence_sync SET current_value=%d, update_time=NOW() WHERE schema_name='%s' AND sequence_name='%s';", nextNotCachedValue, schemaName, sequenceName)
_, err = trx.Exec(updateStatement)
if err != nil {
log.Fatalf("Failed to update sequence value: %v", err)
}
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over rows: %v", err)
}
rows.Close()
// Remove sequences from sequence_sync that no longer exist in the database
rows, err = db.Query("SELECT schema_name, sequence_name FROM " + schema + ".sequence_sync")
if err != nil {
log.Fatalf("Failed to query sequence_sync: %v", err)
}
for rows.Next() {
var schemaName, sequenceName string
if err := rows.Scan(&schemaName, &sequenceName); err != nil {
log.Fatalf("Failed to scan row: %v", err)
}
if !existingSequences[schemaName+"."+sequenceName] {
_, err = trx.Exec("DELETE FROM "+schema+".sequence_sync WHERE schema_name = ? AND sequence_name = ?", schemaName, sequenceName)
if err != nil {
log.Fatalf("Failed to delete sequence from sequence_sync: %v", err)
}
}
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over rows: %v", err)
}
rows.Close()
trx.Commit()
fmt.Printf("All sequences updated at %s.\n", time.Now().Format("2006-01-02 15:04:05"))
time.Sleep(*syncInterval)
}
}
func restoreSeq(db *sql.DB, schema string) {
// Retrieve desired sequences from sequence_sync
desiredSequences := make(map[string]string)
rows, err := db.Query("SELECT CONCAT(schema_name, '.', sequence_name) AS seq_name, create_sql FROM " + schema + ".sequence_sync;")
if err != nil {
log.Fatalf("Failed to execute query: %v", err)
}
defer rows.Close()
for rows.Next() {
var seqName, createSQL string
if err := rows.Scan(&seqName, &createSQL); err != nil {
log.Fatalf("Failed to scan result: %v", err)
}
desiredSequences[seqName] = createSQL
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over results: %v", err)
}
// Retrieve existing sequences from information_schema.sequences
existingSequences := make(map[string]string)
rows, err = db.Query(sequenceQuery)
if err != nil {
log.Fatalf("Failed to execute query: %v", err)
}
defer rows.Close()
for rows.Next() {
var seqSchema, seqName, createSQL string
if err := rows.Scan(&seqSchema, &seqName, &createSQL); err != nil {
log.Fatalf("Failed to scan result: %v", err)
}
fullSeqName := seqSchema + "." + seqName
existingSequences[fullSeqName] = createSQL
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over results: %v", err)
}
// Compare and update sequences
var desiredCreateSQLs []string
var existingDropSQLs []string
var existingCreateSQLs []string
for seqName, desiredCreateSQL := range desiredSequences {
existingCreateSQL, exists := existingSequences[seqName]
if !exists {
// Sequence doesn't exist, create it
desiredCreateSQLs = append(desiredCreateSQLs, desiredCreateSQL)
} else if existingCreateSQL != desiredCreateSQL {
// Sequence exists but createSQL is different, drop and recreate it
existingDropSQLs = append(existingDropSQLs, "DROP SEQUENCE "+seqName+";")
existingCreateSQLs = append(existingCreateSQLs, desiredCreateSQL)
}
}
// Optionally, drop sequences not in desiredSequences
var dropSQLs []string
for seqName := range existingSequences {
if _, exists := desiredSequences[seqName]; !exists {
dropSQLs = append(dropSQLs, "DROP SEQUENCE "+seqName+";")
}
}
// Execute DDL statements
fmt.Println("Creating new sequences...")
executeSQLStatements(db, desiredCreateSQLs)
fmt.Println("Recreating changed sequences...")
executeSQLStatements(db, existingDropSQLs)
executeSQLStatements(db, existingCreateSQLs)
fmt.Println("Dropping no longer needed sequences...")
executeSQLStatements(db, dropSQLs)
// Set current values for sequences with current_value
var setvalStatements []string
rows, err = db.Query("SELECT CONCAT('SELECT setval(', schema_name, '.', sequence_name, ', ', current_value, ');') FROM " + schema + ".sequence_sync WHERE current_value IS NOT NULL;")
if err != nil {
log.Fatalf("Failed to execute query: %v", err)
}
defer rows.Close()
for rows.Next() {
var stmt string
if err := rows.Scan(&stmt); err != nil {
log.Fatalf("Failed to scan result: %v", err)
}
setvalStatements = append(setvalStatements, stmt)
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over results: %v", err)
}
// Execute setval statements
executeSQLStatements(db, setvalStatements)
}
// executeSQLStatements executes SQL statements concurrently using multiple workers
func executeSQLStatements(db *sql.DB, statements []string) {
type sqlExecResult struct {
sql string
err error
}
numWorkers := *restoreWorkers
jobs := make(chan string, len(statements))
results := make(chan sqlExecResult, len(statements))
// Worker function
worker := func(jobs <-chan string, results chan<- sqlExecResult) {
for sqlStmt := range jobs {
_, err := db.Exec(sqlStmt)
results <- sqlExecResult{sql: sqlStmt, err: err}
}
}
// Start workers
for w := 0; w < numWorkers; w++ {
go worker(jobs, results)
}
// Send jobs to workers
for _, sqlStmt := range statements {
jobs <- sqlStmt
}
close(jobs)
// Collect results
for i := 0; i < len(statements); i++ {
result := <-results
if result.err != nil {
log.Fatalf("Failed to execute SQL statement '%s': %v", result.sql, result.err)
}
}
}