Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

add sqlite pragma support #30

Merged
merged 11 commits into from
Dec 31, 2021
29 changes: 28 additions & 1 deletion cmd/temporalite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
goLog "log"
"net"
"os"
"strings"

"github.com/urfave/cli/v2"
"go.temporal.io/server/common/headers"
Expand All @@ -35,6 +36,7 @@ const (
ipFlag = "ip"
logFormatFlag = "log-format"
namespaceFlag = "namespace"
pragmaFLag = "sqlite-pragma"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
pragmaFLag = "sqlite-pragma"
pragmaFlag = "sqlite-pragma"

)

func init() {
Expand All @@ -52,7 +54,6 @@ func buildCLI() *cli.App {
app.Name = "temporal"
app.Usage = "Temporal server"
app.Version = headers.ServerVersion

app.Commands = []*cli.Command{
{
Name: "start",
Expand All @@ -64,6 +65,13 @@ func buildCLI() *cli.App {
Value: defaultCfg.Ephemeral,
Usage: "enable the in-memory storage driver **data will be lost on restart**",
},
&cli.StringSliceFlag{
Name: pragmaFLag,
Aliases: []string{"sp"},
Usage: fmt.Sprintf("specify sqlite pragma statements in pragma=value format. allowed pragmas: %v", liteconfig.GetAllowedPragmas()),
EnvVars: nil,
Value: nil,
},
&cli.StringFlag{
Name: dbPathFlag,
Aliases: []string{"f"},
Expand Down Expand Up @@ -103,6 +111,7 @@ func buildCLI() *cli.App {
if c.IsSet(ephemeralFlag) && c.IsSet(dbPathFlag) {
return cli.Exit(fmt.Sprintf("ERROR: only one of %q or %q flags may be passed at a time", ephemeralFlag, dbPathFlag), 1)
}

switch c.String(logFormatFlag) {
case "json", "pretty":
default:
Expand All @@ -117,10 +126,16 @@ func buildCLI() *cli.App {
return nil
},
Action: func(c *cli.Context) error {
pragmas, err := getPragmaMap(c.StringSlice(pragmaFLag))
if err != nil {
return err
}

opts := []temporalite.ServerOption{
temporalite.WithFrontendPort(c.Int(portFlag)),
temporalite.WithDatabaseFilePath(c.String(dbPathFlag)),
temporalite.WithNamespaces(c.StringSlice(namespaceFlag)...),
temporalite.WithSQLitePragmas(pragmas),
temporalite.WithUpstreamOptions(
temporal.InterruptOn(temporal.InterruptCh()),
),
Expand Down Expand Up @@ -161,3 +176,15 @@ func buildCLI() *cli.App {

return app
}

func getPragmaMap(input []string) (map[string]string, error) {
result := make(map[string]string)
for _, pragma := range input {
vals := strings.Split(pragma, "=")
if len(vals) != 2 {
return nil, fmt.Errorf("ERROR: pragma statements must be in KEY=VALUE format, got %q", pragma)
}
result[vals[0]] = vals[1]
}
return result, nil
}
21 changes: 21 additions & 0 deletions internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"time"

"go.temporal.io/server/common/cluster"
Expand All @@ -31,12 +32,27 @@ type Config struct {
FrontendPort int
DynamicPorts bool
Namespaces []string
SQLitePragmas map[string]string
Logger log.Logger
UpstreamOptions []temporal.ServerOption
portProvider *portProvider
FrontendIP string
}

var SupportedPragmas = map[string]struct{}{
"journal_mode": {},
"synchronous": {},
}

func GetAllowedPragmas() []string {
var allowedPragmaList []string
for k := range SupportedPragmas {
allowedPragmaList = append(allowedPragmaList, k)
}
sort.Strings(allowedPragmaList)
return allowedPragmaList
}

func NewDefaultConfig() (*Config, error) {
userConfigDir, err := os.UserConfigDir()
if err != nil {
Expand All @@ -49,6 +65,7 @@ func NewDefaultConfig() (*Config, error) {
FrontendPort: 0,
DynamicPorts: false,
Namespaces: nil,
SQLitePragmas: nil,
Logger: log.NewZapLogger(log.BuildZapLogger(log.Config{
Stdout: true,
Level: "debug",
Expand Down Expand Up @@ -79,6 +96,10 @@ func Convert(cfg *Config) *config.Config {
sqliteConfig.ConnectAttributes["mode"] = "rwc"
}

for k, v := range cfg.SQLitePragmas {
sqliteConfig.ConnectAttributes["_"+k] = v
}

var metricsPort, pprofPort int
if cfg.DynamicPorts {
if cfg.FrontendPort == 0 {
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ func WithNamespaces(namespaces ...string) ServerOption {
})
}

// WithSQLitePragmas applies pragma statements to SQLite on Temporal start.
func WithSQLitePragmas(pragmas map[string]string) ServerOption {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
if cfg.SQLitePragmas == nil {
cfg.SQLitePragmas = make(map[string]string)
}
for k, v := range pragmas {
cfg.SQLitePragmas[k] = v
}
})
}

// WithUpstreamOptions registers Temporal server options.
func WithUpstreamOptions(options ...temporal.ServerOption) ServerOption {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
Expand Down
8 changes: 8 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/DataDog/temporalite/internal/liteconfig"
Expand Down Expand Up @@ -39,6 +40,13 @@ func NewServer(opts ...ServerOption) (*Server, error) {
for _, opt := range opts {
opt.apply(c)
}

for pragma := range c.SQLitePragmas {
if _, ok := liteconfig.SupportedPragmas[strings.ToLower(pragma)]; !ok {
return nil, fmt.Errorf("ERROR: unsupported pragma %q, %v allowed", pragma, liteconfig.GetAllowedPragmas())
}
}

cfg := liteconfig.Convert(c)
sqlConfig := cfg.Persistence.DataStores[liteconfig.PersistenceStoreName].SQL

Expand Down