-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
85 lines (72 loc) · 2.41 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main
import (
"context"
"fmt"
"log"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/spf13/viper"
)
func updateTables(connEU, connCloud driver.Conn) {
for _, table := range tables {
if err := updateTable(context.Background(), connEU, connCloud, viper.GetString("CLICKHOUSE_EU_DATABASE"), table, false, true); err != nil {
panic(err)
}
}
}
func updateTable(ctx context.Context, connEU, connCloud driver.Conn, database, table string, truncate, dryRun bool) error {
if truncate {
truncateStatement := createTruncateStatement(database, table)
fmt.Printf("on CH Cloud Truncating table %s.%s\nTruncate statement:\n%s\n", database, table, truncateStatement)
if !dryRun {
if err := connCloud.Exec(ctx, truncateStatement); err != nil {
return err
}
}
}
insertStatement := createInsertStatement(
viper.GetString("CLICKHOUSE_CLOUD_HOSTNAME"),
viper.GetInt("CLICKHOUSE_CLOUD_PORT"),
database,
table,
viper.GetString("CLICKHOUSE_CLOUD_USERNAME"),
viper.GetString("CLICKHOUSE_CLOUD_PASSWORD"))
fmt.Printf("on EU CH inserting into table %s.%s\nInsert statement:\n%s\n", database, table, insertStatement)
if !dryRun {
if err := connEU.Exec(ctx, insertStatement); err != nil {
return err
}
}
selectStatement := createSelectStatement(database, table)
fmt.Printf("on CH Cloud selecting from table %s.%s\nSelect statement:\n%s\n", database, table, selectStatement)
rows, err := connCloud.Query(ctx, selectStatement)
if err != nil {
return err
}
for rows.Next() {
var (
tableRows uint64
)
if err := rows.Scan(
&tableRows,
); err != nil {
log.Fatal(err)
}
log.Printf("Rows in table: %d", tableRows)
}
return nil
}
func createSelectStatement(database, table string) string {
const SELECT_QUERY = `SELECT count(1) table_rows FROM %s.%s`
return fmt.Sprintf(SELECT_QUERY, database, table)
}
func createTruncateStatement(database, table string) string {
const TRUNCATE_QUERY = `TRUNCATE TABLE %s.%s settings max_table_size_to_drop = '9999999999999999999'`
return fmt.Sprintf(TRUNCATE_QUERY, database, table)
}
func createInsertStatement(host string, port int, database, table, username, password string) string {
const INSERT_QUERY = `INSERT INTO FUNCTION
remoteSecure('%s:%d', '%s.%s', '%s', '%s')
SELECT * from %s.%s.
settings max_execution_time = 999999999`
return fmt.Sprintf(INSERT_QUERY, host, port, database, table, username, password, database, table)
}