-
Notifications
You must be signed in to change notification settings - Fork 56
/
baskets_sql.go
543 lines (456 loc) · 16.4 KB
/
baskets_sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"regexp"
"strings"
"time"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
)
// DbTypeSQL defines name of SQL database storage
const DbTypeSQL = "sql"
// List of DDL statements to create database schema for baskets
var sqlSchema = []string{
`CREATE TABLE rb_baskets (
basket_name varchar(250) PRIMARY KEY,
token varchar(100) NOT NULL,
capacity integer NOT NULL,
forward_url text NOT NULL,
proxy_response boolean NOT NULL,
insecure_tls boolean NOT NULL,
expand_path boolean NOT NULL,
requests_count integer NOT NULL DEFAULT 0,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
`CREATE TABLE rb_responses (
basket_name varchar(250) NOT NULL,
http_method varchar(20) NOT NULL,
response text NOT NULL,
PRIMARY KEY (basket_name, http_method),
FOREIGN KEY (basket_name) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE
)`,
`CREATE TABLE rb_requests (
basket_name varchar(250) NOT NULL,
request text NOT NULL,
created_at timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
FOREIGN KEY (basket_name) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE
)`,
`CREATE INDEX rb_requests_name_time_index ON rb_requests (basket_name, created_at)`,
`CREATE TABLE rb_version (
version integer NOT NULL
)`,
`INSERT INTO rb_version (version) VALUES (1)`}
// Basket interface //
type sqlBasket struct {
db *sql.DB
dbType string // postgresql, mysql, oracle, etc.
name string
}
func (basket *sqlBasket) getInt(sql string, defaultValue int) int {
var value int
if err := basket.db.QueryRow(unifySQL(basket.dbType, sql), basket.name).Scan(&value); err != nil {
log.Printf("[error] failed to get counter info about basket: %s - %s", basket.name, err)
return defaultValue
}
return value
}
func (basket *sqlBasket) applyLimit(capacity int) {
// keep the number of requests up to specified capacity
size := basket.Size()
if size > capacity {
var cleanupSQL string
// Note: 'ctid' is PostgreSQL specific
// see example for MySQL here: https://stackoverflow.com/questions/5170546
switch basket.dbType {
case "postgres":
cleanupSQL = "DELETE FROM rb_requests WHERE ctid IN (SELECT ctid FROM rb_requests WHERE basket_name = $1 ORDER BY created_at LIMIT $2)"
default:
cleanupSQL = "DELETE FROM rb_requests WHERE basket_name = ? ORDER BY created_at LIMIT ?"
}
if _, err := basket.db.Exec(cleanupSQL, basket.name, size-capacity); err != nil {
log.Printf("[error] failed to shrink collected requests: %s - %s", basket.name, err)
}
}
}
func (basket *sqlBasket) getTotalRequestsCount() int {
return basket.getInt("SELECT requests_count FROM rb_baskets WHERE basket_name = $1", 0)
}
func (basket *sqlBasket) getLastRequestDate() int64 {
var value time.Time
if err := basket.db.QueryRow(unifySQL(basket.dbType,
"SELECT MAX(created_at) FROM rb_requests WHERE basket_name = $1"), basket.name).Scan(&value); err != nil {
log.Printf("[error] failed to get last request date of basket: %s - %s", basket.name, err)
return 0
}
return value.UnixNano() / toMs
}
func (basket *sqlBasket) Config() BasketConfig {
config := BasketConfig{}
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT capacity, forward_url, proxy_response, insecure_tls, expand_path FROM rb_baskets WHERE basket_name = $1"),
basket.name).Scan(&config.Capacity, &config.ForwardURL, &config.ProxyResponse, &config.InsecureTLS, &config.ExpandPath)
if err != nil {
log.Printf("[error] failed to get basket config: %s - %s", basket.name, err)
}
return config
}
func (basket *sqlBasket) Update(config BasketConfig) {
_, err := basket.db.Exec(
unifySQL(basket.dbType, "UPDATE rb_baskets SET capacity = $1, forward_url = $2, proxy_response = $3, insecure_tls = $4, expand_path = $5 WHERE basket_name = $6"),
config.Capacity, config.ForwardURL, config.ProxyResponse, config.InsecureTLS, config.ExpandPath, basket.name)
if err != nil {
log.Printf("[error] failed to update basket config: %s - %s", basket.name, err)
} else {
// apply new basket limits
basket.applyLimit(config.Capacity)
}
}
func (basket *sqlBasket) Authorize(token string) bool {
var found int
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT COUNT(*) FROM rb_baskets WHERE basket_name = $1 AND token = $2"),
basket.name, token).Scan(&found)
if err != nil {
log.Printf("[error] failed authorize access to basket: %s - %s", basket.name, err)
return false
}
return found > 0
}
func (basket *sqlBasket) GetResponse(method string) *ResponseConfig {
var resp string
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT response FROM rb_responses WHERE basket_name = $1 AND http_method = $2"),
basket.name, method).Scan(&resp)
if err == sql.ErrNoRows {
// no response for this basket + HTTP method
return nil
} else if err != nil {
log.Printf("[error] failed to get response for HTTP %s method of basket: %s - %s", method, basket.name, err)
return nil
}
response := new(ResponseConfig)
if err := json.Unmarshal([]byte(resp), response); err != nil {
log.Printf("[error] failed to parse response for HTTP %s method of basket: %s - %s", method, basket.name, err)
return nil
}
return response
}
func (basket *sqlBasket) SetResponse(method string, response ResponseConfig) {
if respb, err := json.Marshal(response); err == nil {
// delete existing if present
basket.db.Exec(unifySQL(basket.dbType, "DELETE FROM rb_responses WHERE basket_name = $1 AND http_method = $2"), basket.name, method)
// insert new response (ignore concurrency)
_, err = basket.db.Exec(
unifySQL(basket.dbType, "INSERT INTO rb_responses (basket_name, http_method, response) VALUES ($1, $2, $3)"),
basket.name, method, string(respb))
if err != nil {
log.Printf("[error] failed to update response for HTTP %s method of basket: %s - %s", method, basket.name, err)
}
}
}
func (basket *sqlBasket) Add(req *http.Request) *RequestData {
data := ToRequestData(req)
if datab, err := json.Marshal(data); err == nil {
_, err = basket.db.Exec(
unifySQL(basket.dbType, "INSERT INTO rb_requests (basket_name, request) VALUES ($1, $2)"), basket.name, string(datab))
if err != nil {
log.Printf("[error] failed to collect incoming HTTP request in basket: %s - %s", basket.name, err)
} else {
// update global counter
_, err = basket.db.Exec(
unifySQL(basket.dbType, "UPDATE rb_baskets SET requests_count = requests_count + 1 WHERE basket_name = $1"), basket.name)
if err != nil {
log.Printf("[error] failed to update requests counter of basket: %s - %s", basket.name, err)
}
// apply limit if necessary
// TODO: replace 200 with serverConfig.InitCapacity
basket.applyLimit(basket.getInt("SELECT capacity FROM rb_baskets WHERE basket_name = $1", 200))
}
}
return data
}
func (basket *sqlBasket) Clear() {
if _, err := basket.db.Exec(unifySQL(basket.dbType, "DELETE FROM rb_requests WHERE basket_name = $1"), basket.name); err != nil {
log.Printf("[error] failed to delete collected requests in basket: %s - %s", basket.name, err)
}
}
func (basket *sqlBasket) Size() int {
return basket.getInt("SELECT COUNT(*) FROM rb_requests WHERE basket_name = $1", 0)
}
func (basket *sqlBasket) GetRequests(max int, skip int) RequestsPage {
page := RequestsPage{make([]*RequestData, 0, max), basket.Size(), basket.getTotalRequestsCount(), false}
if max > 0 {
requests, err := basket.db.Query(
unifySQL(basket.dbType, "SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3"),
basket.name, max+1, skip)
if err != nil {
log.Printf("[error] failed to get requests of basket: %s - %s", basket.name, err)
return page
}
defer requests.Close()
var req string
for len(page.Requests) < max && requests.Next() {
if err = requests.Scan(&req); err == nil {
request := new(RequestData)
if err = json.Unmarshal([]byte(req), request); err != nil {
log.Printf("[error] failed to parse HTTP request data in basket: %s - %s", basket.name, err)
} else {
page.Requests = append(page.Requests, request)
}
}
}
page.HasMore = requests.Next()
} else {
page.HasMore = page.Count > skip
}
return page
}
func (basket *sqlBasket) FindRequests(query string, in string, max int, skip int) RequestsQueryPage {
page := RequestsQueryPage{make([]*RequestData, 0, max), false}
if max > 0 {
requests, err := basket.db.Query(
unifySQL(basket.dbType, "SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC"), basket.name)
if err != nil {
log.Printf("[error] failed to find requests of basket: %s - %s", basket.name, err)
return page
}
defer requests.Close()
skipped := 0
var req string
for len(page.Requests) < max && requests.Next() {
if err = requests.Scan(&req); err == nil {
request := new(RequestData)
if err = json.Unmarshal([]byte(req), request); err != nil {
log.Printf("[error] failed to parse HTTP request data in basket: %s - %s", basket.name, err)
} else {
// filter
if request.Matches(query, in) {
if skipped < skip {
skipped++
} else {
page.Requests = append(page.Requests, request)
}
}
}
}
}
page.HasMore = requests.Next()
} else {
page.HasMore = true
}
return page
}
/// BasketsDatabase interface ///
type sqlDatabase struct {
db *sql.DB
dbType string // postgresql, mysql, oracle, etc.
}
func (sdb *sqlDatabase) getInt(sql string, defaultValue int) int {
var value int
if err := sdb.db.QueryRow(sql).Scan(&value); err != nil {
log.Printf("[error] failed to query for int result, query: %s - %s", sql, err)
return defaultValue
}
return value
}
func (sdb *sqlDatabase) getTopBaskets(sql string, max int) []*BasketInfo {
top := make([]*BasketInfo, 0, max)
names, err := sdb.db.Query(unifySQL(sdb.dbType, sql), max)
if err != nil {
log.Printf("[error] failed to find top baskets: %s", err)
return top
}
defer names.Close()
var name string
for names.Next() {
if err = names.Scan(&name); err == nil {
basket := &sqlBasket{sdb.db, sdb.dbType, name}
reqCount := basket.Size()
var lastRequestDate int64
if reqCount > 0 {
lastRequestDate = basket.getLastRequestDate()
}
top = append(top, &BasketInfo{
Name: name,
RequestsCount: reqCount,
RequestsTotalCount: basket.getTotalRequestsCount(),
LastRequestDate: lastRequestDate})
}
}
return top
}
func (sdb *sqlDatabase) Create(name string, config BasketConfig) (BasketAuth, error) {
auth := BasketAuth{}
token, err := GenerateToken()
if err != nil {
return auth, fmt.Errorf("failed to generate token: %s", err)
}
basket, err := sdb.db.Exec(
unifySQL(sdb.dbType, "INSERT INTO rb_baskets (basket_name, token, capacity, forward_url, proxy_response, insecure_tls, expand_path) VALUES($1, $2, $3, $4, $5, $6, $7)"),
name, token, config.Capacity, config.ForwardURL, config.ProxyResponse, config.InsecureTLS, config.ExpandPath)
if err != nil {
return auth, fmt.Errorf("failed to create basket: %s - %s", name, err)
}
if _, err := basket.RowsAffected(); err != nil {
return auth, err
}
auth.Token = token
return auth, nil
}
func (sdb *sqlDatabase) Get(name string) Basket {
var bname string
err := sdb.db.QueryRow(unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets WHERE basket_name = $1"), name).Scan(&bname)
if err == sql.ErrNoRows {
log.Printf("[warn] no basket found: %s", name)
return nil
} else if err != nil {
log.Printf("[error] failed to get basket: %s - %s", name, err)
return nil
}
return &sqlBasket{sdb.db, sdb.dbType, name}
}
func (sdb *sqlDatabase) Delete(name string) {
if _, err := sdb.db.Exec(unifySQL(sdb.dbType, "DELETE FROM rb_baskets WHERE basket_name = $1"), name); err != nil {
log.Printf("[error] failed to delete basket: %s - %s", name, err)
}
}
func (sdb *sqlDatabase) Size() int {
return sdb.getInt("SELECT COUNT(*) FROM rb_baskets", 0)
}
func (sdb *sqlDatabase) GetNames(max int, skip int) BasketNamesPage {
page := BasketNamesPage{make([]string, 0, max), sdb.Size(), false}
names, err := sdb.db.Query(unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets ORDER BY basket_name LIMIT $1 OFFSET $2"), max+1, skip)
if err != nil {
log.Printf("[error] failed to get basket names: %s", err)
return page
}
defer names.Close()
var name string
for len(page.Names) < max && names.Next() {
if err = names.Scan(&name); err == nil {
page.Names = append(page.Names, name)
}
}
page.HasMore = names.Next()
return page
}
func (sdb *sqlDatabase) FindNames(query string, max int, skip int) BasketNamesQueryPage {
page := BasketNamesQueryPage{make([]string, 0, max), false}
names, err := sdb.db.Query(
unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets WHERE basket_name LIKE $1 ORDER BY basket_name LIMIT $2 OFFSET $3"),
"%"+query+"%", max+1, skip)
if err != nil {
log.Printf("[error] failed to find basket names: %s", err)
return page
}
defer names.Close()
var name string
for len(page.Names) < max && names.Next() {
if err = names.Scan(&name); err == nil {
page.Names = append(page.Names, name)
}
}
page.HasMore = names.Next()
return page
}
func (sdb *sqlDatabase) GetStats(max int) DatabaseStats {
stats := DatabaseStats{}
stats.BasketsCount = sdb.getInt("SELECT COUNT(*) FROM rb_baskets", 0)
stats.EmptyBasketsCount = sdb.getInt("SELECT COUNT(*) FROM rb_baskets WHERE requests_count = 0", 0)
stats.RequestsCount = sdb.getInt("SELECT COUNT(*) FROM rb_requests", 0)
stats.RequestsTotalCount = sdb.getInt("SELECT COALESCE(SUM(requests_count), 0) FROM rb_baskets", 0)
stats.MaxBasketSize = sdb.getInt("SELECT COALESCE(MAX(requests_count), 0) FROM rb_baskets", 0)
stats.TopBasketsBySize = sdb.getTopBaskets("SELECT basket_name FROM rb_baskets ORDER BY requests_count DESC LIMIT $1", max)
stats.TopBasketsByDate = sdb.getTopBaskets("SELECT basket_name FROM rb_requests GROUP BY basket_name ORDER BY MAX(created_at) DESC LIMIT $1", max)
stats.UpdateAvarage()
return stats
}
func (sdb *sqlDatabase) Release() {
log.Printf("[info] closing SQL database, releasing any open resources")
sdb.db.Close()
}
// NewSQLDatabase creates an instance of Baskets Database backed with SQL DB
func NewSQLDatabase(connection string) BasketsDatabase {
log.Print("[info] using SQL database to store baskets")
driver, source := parseConnection(connection)
if len(driver) == 0 {
return nil
}
log.Printf("[info] SQL database type: %s", driver)
db, err := sql.Open(driver, source)
if err != nil {
log.Printf("[error] failed to open database connection: %s - %s", connection, err)
return nil
}
if err = db.Ping(); err != nil {
log.Printf("[error] database connection is not alive: %s - %s", connection, err)
} else if err = initSchema(db); err != nil {
log.Printf("[error] failed to initialize SQL schema: %s", err)
} else {
return &sqlDatabase{db, driver}
}
db.Close()
return nil
}
var pgParams = regexp.MustCompile(`\$\d+`)
func unifySQL(dbType string, sql string) string {
switch dbType {
case "mysql", "sqlite3":
// replace $n with ?
return pgParams.ReplaceAllString(sql, "?")
// case "postgres", "sqlserver":
default:
// statements are already designed to work with postgresql
return sql
}
}
func parseConnection(connection string) (string, string) {
if parts := strings.Split(connection, "://"); len(parts) > 1 {
driver := parts[0]
source := parts[1]
switch driver {
case "postgres":
return driver, connection
case "mysql":
return driver, source + "?parseTime=true"
// case "sqlite3":
// return driver, source
default:
return driver, connection
}
}
log.Printf("[error] failed to detect database type from connection: %s", connection)
return "", connection
}
func initSchema(db *sql.DB) error {
switch version := getSchemaVersion(db); version {
case 0:
return createSchema(db)
case 1:
log.Printf("[info] database schema already exists, version: %v", version)
return nil
default:
return fmt.Errorf("unknown database schema version: %v", version)
}
}
func getSchemaVersion(db *sql.DB) int {
var version int
if err := db.QueryRow("SELECT version FROM rb_version").Scan(&version); err != nil {
return 0
}
return version
}
func createSchema(db *sql.DB) error {
log.Printf("[info] creating database schema")
for idx, stmt := range sqlSchema {
if _, err := db.Exec(stmt); err != nil {
return fmt.Errorf("error in SQL statement #%v - %s", idx, err)
}
}
log.Printf("[info] database is created, version: %v", getSchemaVersion(db))
return nil
}