-
Notifications
You must be signed in to change notification settings - Fork 32
/
server.go
162 lines (135 loc) · 4.55 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package api
import (
"context"
"embed"
"fmt"
"github.com/gin-gonic/gin"
"github.com/ipfs/go-log"
"github.com/soheilhy/cmux"
"github.com/synapsecns/sanguine/core"
"github.com/synapsecns/sanguine/core/ginhelper"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/services/scribe/db"
"github.com/synapsecns/sanguine/services/scribe/db/datastore/sql/mysql"
"github.com/synapsecns/sanguine/services/scribe/db/datastore/sql/sqlite"
gqlServer "github.com/synapsecns/sanguine/services/scribe/graphql/server"
"github.com/synapsecns/sanguine/services/scribe/grpc/server"
"golang.org/x/sync/errgroup"
"net"
"net/http"
"os"
)
//go:embed static
var static embed.FS
// Config contains the config for the api.
type Config struct {
// Port is the http port for the api.
Port uint16
// Database is the database type.
// TODO: should be enum
Database string
// Path is the path to the database or db connection.
// TODO: should be renamed
Path string
// OmniRPCURL is the url of the omnirpc service.
OmniRPCURL string
// SkipMigrations skips the database migrations.
SkipMigrations bool
}
var logger = log.Logger("scribe-api")
// Start starts the api server.
func Start(ctx context.Context, cfg Config, handler metrics.Handler) error {
logger.Warnf("starting api server")
router := ginhelper.New(logger)
// wrap gin with metrics
router.GET(ginhelper.MetricsEndpoint, gin.WrapH(handler.Handler()))
eventDB, err := InitDB(ctx, cfg.Database, cfg.Path, handler, cfg.SkipMigrations)
if err != nil {
return fmt.Errorf("could not initialize database: %w", err)
}
router.Use(handler.Gin()...)
gqlServer.EnableGraphql(router, eventDB, cfg.OmniRPCURL, handler)
grpcServer, err := server.SetupGRPCServer(ctx, router, eventDB, handler)
if err != nil {
return fmt.Errorf("could not create grpc server: %w", err)
}
router.GET("static", gin.WrapH(http.FileServer(http.FS(static))))
fmt.Printf("started graphiql gqlServer on port: http://localhost:%d/graphiql\n", cfg.Port)
g, ctx := errgroup.WithContext(ctx)
var lc net.ListenConfig
listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", cfg.Port))
if err != nil {
return fmt.Errorf("could not listen on port %d", cfg.Port)
}
m := cmux.New(listener)
httpListener := m.Match(cmux.HTTP1Fast())
// fallback to grpc
grpcListener := m.Match(cmux.Any())
g.Go(func() error {
//nolint: gosec
// TODO: consider setting timeouts here: https://ieftimov.com/posts/make-resilient-golang-net-http-servers-using-timeouts-deadlines-context-cancellation/
err := http.Serve(httpListener, router)
if err != nil {
return fmt.Errorf("could not serve http: %w", err)
}
return nil
})
g.Go(func() error {
err = grpcServer.Serve(grpcListener)
if err != nil {
return fmt.Errorf("could not start grpc server: %w", err)
}
return nil
})
g.Go(func() error {
err := m.Serve()
if err != nil {
return fmt.Errorf("could not start server: %w", err)
}
return nil
})
g.Go(func() error {
<-ctx.Done()
grpcServer.Stop()
m.Close()
logger.Errorf("grpc server stopped")
return nil
})
err = g.Wait()
if err != nil {
return fmt.Errorf("server error: %w", err)
}
return nil
}
// InitDB initializes a database given a database type and path.
// TODO: use enum for database type.
func InitDB(ctx context.Context, databaseType string, path string, metrics metrics.Handler, skipMigrations bool) (db.EventDB, error) {
logger.Warnf("Starting database connection from api")
switch {
case databaseType == "sqlite":
sqliteStore, err := sqlite.NewSqliteStore(ctx, path, metrics, skipMigrations)
if err != nil {
return nil, fmt.Errorf("failed to create sqlite store: %w", err)
}
metrics.AddGormCallbacks(sqliteStore.DB())
return sqliteStore, nil
case databaseType == "mysql":
if os.Getenv("OVERRIDE_MYSQL") != "" {
dbname := os.Getenv("MYSQL_DATABASE")
connString := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", core.GetEnv("MYSQL_USER", "root"), os.Getenv("MYSQL_PASSWORD"), core.GetEnv("MYSQL_HOST", "127.0.0.1"), core.GetEnvInt("MYSQL_PORT", 3306), dbname)
mysqlStore, err := mysql.NewMysqlStore(ctx, connString, metrics, skipMigrations)
if err != nil {
return nil, fmt.Errorf("failed to create mysql store: %w", err)
}
metrics.AddGormCallbacks(mysqlStore.DB())
return mysqlStore, nil
}
mysqlStore, err := mysql.NewMysqlStore(ctx, path, metrics, skipMigrations)
if err != nil {
return nil, fmt.Errorf("failed to create mysql store: %w", err)
}
return mysqlStore, nil
default:
return nil, fmt.Errorf("invalid databaseType type: %s", databaseType)
}
}