diff --git a/cmd/temporalite/main.go b/cmd/temporalite/main.go index 4b879c91..a9414eb2 100644 --- a/cmd/temporalite/main.go +++ b/cmd/temporalite/main.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/urfave/cli/v2" + "go.temporal.io/server/common/config" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/temporal" @@ -34,16 +35,18 @@ var ( ) const ( - ephemeralFlag = "ephemeral" - dbPathFlag = "filename" - portFlag = "port" - uiPortFlag = "ui-port" - headlessFlag = "headless" - ipFlag = "ip" - logFormatFlag = "log-format" - logLevelFlag = "log-level" - namespaceFlag = "namespace" - pragmaFlag = "sqlite-pragma" + ephemeralFlag = "ephemeral" + dbPathFlag = "filename" + portFlag = "port" + metricsPortFlag = "metrics-port" + uiPortFlag = "ui-port" + headlessFlag = "headless" + ipFlag = "ip" + logFormatFlag = "log-format" + logLevelFlag = "log-level" + namespaceFlag = "namespace" + pragmaFlag = "sqlite-pragma" + configFlag = "config" ) func init() { @@ -91,6 +94,11 @@ func buildCLI() *cli.App { Usage: "port for the temporal-frontend GRPC service", Value: liteconfig.DefaultFrontendPort, }, + &cli.IntFlag{ + Name: metricsPortFlag, + Usage: "port for the metrics listener", + Value: liteconfig.DefaultMetricsPort, + }, &cli.IntFlag{ Name: uiPortFlag, Usage: "port for the temporal web UI", @@ -125,6 +133,13 @@ func buildCLI() *cli.App { EnvVars: nil, Value: nil, }, + &cli.StringFlag{ + Name: configFlag, + Aliases: []string{"c"}, + Usage: `config dir path`, + EnvVars: []string{config.EnvKeyConfigDir}, + Value: "", + }, }, Before: func(c *cli.Context) error { if c.Args().Len() > 0 { @@ -151,13 +166,21 @@ func buildCLI() *cli.App { return cli.Exit(fmt.Sprintf("bad value %q passed for flag %q", c.String(ipFlag), ipFlag), 1) } + if c.IsSet(configFlag) { + cfgPath := c.String(configFlag) + if _, err := os.Stat(cfgPath); os.IsNotExist(err) { + return cli.Exit(fmt.Sprintf("bad value %q passed for flag %q: file not found", c.String(configFlag), configFlag), 1) + } + } + return nil }, Action: func(c *cli.Context) error { var ( - ip = c.String(ipFlag) - serverPort = c.Int(portFlag) - uiPort = serverPort + 1000 + ip = c.String(ipFlag) + serverPort = c.Int(portFlag) + metricsPort = c.Int(metricsPortFlag) + uiPort = serverPort + 1000 ) if c.IsSet(uiPortFlag) { @@ -169,9 +192,18 @@ func buildCLI() *cli.App { return err } + baseConfig := &config.Config{} + if c.IsSet(configFlag) { + baseConfig, err = config.LoadConfig("temporalite", c.String(configFlag), "") + if err != nil { + return err + } + } + opts := []temporalite.ServerOption{ temporalite.WithDynamicPorts(), temporalite.WithFrontendPort(serverPort), + temporalite.WithMetricsPort(metricsPort), temporalite.WithFrontendIP(ip), temporalite.WithDatabaseFilePath(c.String(dbPathFlag)), temporalite.WithNamespaces(c.StringSlice(namespaceFlag)...), @@ -179,6 +211,7 @@ func buildCLI() *cli.App { temporalite.WithUpstreamOptions( temporal.InterruptOn(temporal.InterruptCh()), ), + temporalite.WithBaseConfig(baseConfig), } if !c.Bool(headlessFlag) { opt := newUIOption(fmt.Sprintf(":%d", c.Int(portFlag)), ip, uiPort) diff --git a/go.mod b/go.mod index 3a414332..b1bd8e83 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.18 require ( github.com/google/go-licenses v0.0.0-20210816172045-3099c18c36e1 github.com/google/licenseclassifier v0.0.0-20210722185704-3043a050f148 - github.com/temporalio/ui-server/v2 v2.1.0 - github.com/urfave/cli/v2 v2.10.2 + github.com/temporalio/ui-server/v2 v2.2.1 + github.com/urfave/cli/v2 v2.10.3 go.temporal.io/sdk v1.15.0 go.temporal.io/server v1.17.1 go.uber.org/zap v1.21.0 diff --git a/go.sum b/go.sum index ee44ef25..a741dc7b 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/temporalio/ringpop-go v0.0.0-20211012191444-6f91b5915e95 h1:8G34e73qtrkG9sDFcSC8uot0np3e6BLoMZxleTcpUiA= github.com/temporalio/ringpop-go v0.0.0-20211012191444-6f91b5915e95/go.mod h1:Ek9J8CAfI1IwVSqHpTOgj7FjzRSJ5SM/ud52eCmkhsw= -github.com/temporalio/ui-server/v2 v2.1.0 h1:4FBewRV23ROZKrAUWjk6i7e2pSoYrbtRRjD+AaIyQ6g= -github.com/temporalio/ui-server/v2 v2.1.0/go.mod h1:jkfDOq8EhtyqkvYjdmM/IqQZZPHR5HZ4yhtMFLBYEPg= +github.com/temporalio/ui-server/v2 v2.2.1 h1:1zeDJu98vH9LPjUXcL1Qd3vWGMfvBmGHyO29i4hwJGs= +github.com/temporalio/ui-server/v2 v2.2.1/go.mod h1:4DY/eD5gKByMf4zdwM9wqb3eTDSG9rlhFU2ws0wCBso= github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= @@ -511,8 +511,8 @@ github.com/uber/tchannel-go v1.16.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJV github.com/uber/tchannel-go v1.22.3 h1:rmIIlBLM2gvel//NxNWvhxvtbtMSeu68v3T95KZELCs= github.com/uber/tchannel-go v1.22.3/go.mod h1:ef6HlYPRg9hZvajXmgPEHy7CtHKY9RgmAZxyNAD7N18= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/urfave/cli/v2 v2.10.2 h1:x3p8awjp/2arX+Nl/G2040AZpOCHS/eMJJ1/a+mye4Y= -github.com/urfave/cli/v2 v2.10.2/go.mod h1:f8iq5LtQ/bLxafbdBSLPPNsgaW0l/2fYYEHhAyPlwvo= +github.com/urfave/cli/v2 v2.10.3 h1:oi571Fxz5aHugfBAJd5nkwSk3fzATXtMlpxdLylSCMo= +github.com/urfave/cli/v2 v2.10.3/go.mod h1:f8iq5LtQ/bLxafbdBSLPPNsgaW0l/2fYYEHhAyPlwvo= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= diff --git a/internal/liteconfig/config.go b/internal/liteconfig/config.go index 4b624577..42753ffd 100644 --- a/internal/liteconfig/config.go +++ b/internal/liteconfig/config.go @@ -24,6 +24,7 @@ const ( broadcastAddress = "127.0.0.1" PersistenceStoreName = "sqlite-default" DefaultFrontendPort = 7233 + DefaultMetricsPort = 0 ) // UIServer abstracts the github.com/temporalio/ui-server project to @@ -48,6 +49,7 @@ type Config struct { Ephemeral bool DatabaseFilePath string FrontendPort int + MetricsPort int DynamicPorts bool Namespaces []string SQLitePragmas map[string]string @@ -56,6 +58,7 @@ type Config struct { portProvider *portProvider FrontendIP string UIServer UIServer + BaseConfig *config.Config } var SupportedPragmas = map[string]struct{}{ @@ -82,6 +85,7 @@ func NewDefaultConfig() (*Config, error) { Ephemeral: false, DatabaseFilePath: filepath.Join(userConfigDir, "temporalite/db/default.db"), FrontendPort: 0, + MetricsPort: 0, UIServer: noopUIServer{}, DynamicPorts: false, Namespaces: nil, @@ -93,6 +97,7 @@ func NewDefaultConfig() (*Config, error) { })), portProvider: &portProvider{}, FrontendIP: "", + BaseConfig: &config.Config{}, }, nil } @@ -120,115 +125,117 @@ func Convert(cfg *Config) *config.Config { sqliteConfig.ConnectAttributes["_"+k] = v } - var metricsPort, pprofPort int + var pprofPort int if cfg.DynamicPorts { if cfg.FrontendPort == 0 { cfg.FrontendPort = cfg.portProvider.mustGetFreePort() } - metricsPort = cfg.portProvider.mustGetFreePort() + if cfg.MetricsPort == 0 { + cfg.MetricsPort = cfg.portProvider.mustGetFreePort() + } pprofPort = cfg.portProvider.mustGetFreePort() } else { if cfg.FrontendPort == 0 { cfg.FrontendPort = DefaultFrontendPort } - metricsPort = cfg.FrontendPort + 200 + if cfg.MetricsPort == 0 { + cfg.MetricsPort = cfg.FrontendPort + 200 + } pprofPort = cfg.FrontendPort + 201 } - return &config.Config{ - Global: config.Global{ - Membership: config.Membership{ - MaxJoinDuration: 30 * time.Second, - BroadcastAddress: broadcastAddress, - }, - Metrics: &metrics.Config{ - Prometheus: &metrics.PrometheusConfig{ - ListenAddress: fmt.Sprintf("%s:%d", broadcastAddress, metricsPort), - HandlerPath: "/metrics", - }, - }, - PProf: config.PProf{Port: pprofPort}, + baseConfig := cfg.BaseConfig + baseConfig.Global.Membership = config.Membership{ + MaxJoinDuration: 30 * time.Second, + BroadcastAddress: broadcastAddress, + } + baseConfig.Global.Metrics = &metrics.Config{ + Prometheus: &metrics.PrometheusConfig{ + ListenAddress: fmt.Sprintf("%s:%d", cfg.FrontendIP, cfg.MetricsPort), + HandlerPath: "/metrics", }, - Persistence: config.Persistence{ - DefaultStore: PersistenceStoreName, - VisibilityStore: PersistenceStoreName, - NumHistoryShards: 1, - DataStores: map[string]config.DataStore{ - PersistenceStoreName: {SQL: &sqliteConfig}, - }, + } + baseConfig.Global.PProf = config.PProf{Port: pprofPort} + baseConfig.Persistence = config.Persistence{ + DefaultStore: PersistenceStoreName, + VisibilityStore: PersistenceStoreName, + NumHistoryShards: 1, + DataStores: map[string]config.DataStore{ + PersistenceStoreName: {SQL: &sqliteConfig}, }, - ClusterMetadata: &cluster.Config{ - EnableGlobalNamespace: false, - FailoverVersionIncrement: 10, - MasterClusterName: "active", - CurrentClusterName: "active", - ClusterInformation: map[string]cluster.ClusterInformation{ - "active": { - Enabled: true, - InitialFailoverVersion: 1, - RPCAddress: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort), - }, + } + baseConfig.ClusterMetadata = &cluster.Config{ + EnableGlobalNamespace: false, + FailoverVersionIncrement: 10, + MasterClusterName: "active", + CurrentClusterName: "active", + ClusterInformation: map[string]cluster.ClusterInformation{ + "active": { + Enabled: true, + InitialFailoverVersion: 1, + RPCAddress: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort), }, }, - DCRedirectionPolicy: config.DCRedirectionPolicy{ - Policy: "noop", + } + baseConfig.DCRedirectionPolicy = config.DCRedirectionPolicy{ + Policy: "noop", + } + baseConfig.Services = map[string]config.Service{ + "frontend": cfg.mustGetService(0), + "history": cfg.mustGetService(1), + "matching": cfg.mustGetService(2), + "worker": cfg.mustGetService(3), + } + baseConfig.Archival = config.Archival{ + History: config.HistoryArchival{ + State: "disabled", + EnableRead: false, + Provider: nil, }, - Services: map[string]config.Service{ - "frontend": cfg.mustGetService(0), - "history": cfg.mustGetService(1), - "matching": cfg.mustGetService(2), - "worker": cfg.mustGetService(3), + Visibility: config.VisibilityArchival{ + State: "disabled", + EnableRead: false, + Provider: nil, }, - Archival: config.Archival{ - History: config.HistoryArchival{ - State: "disabled", - EnableRead: false, - Provider: nil, - }, - Visibility: config.VisibilityArchival{ - State: "disabled", - EnableRead: false, - Provider: nil, + } + baseConfig.PublicClient = config.PublicClient{ + HostPort: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort), + } + baseConfig.NamespaceDefaults = config.NamespaceDefaults{ + Archival: config.ArchivalNamespaceDefaults{ + History: config.HistoryArchivalNamespaceDefaults{ + State: "disabled", }, - }, - PublicClient: config.PublicClient{ - HostPort: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort), - }, - NamespaceDefaults: config.NamespaceDefaults{ - Archival: config.ArchivalNamespaceDefaults{ - History: config.HistoryArchivalNamespaceDefaults{ - State: "disabled", - }, - Visibility: config.VisibilityArchivalNamespaceDefaults{ - State: "disabled", - }, + Visibility: config.VisibilityArchivalNamespaceDefaults{ + State: "disabled", }, }, } + return baseConfig } -func (o *Config) mustGetService(frontendPortOffset int) config.Service { +func (cfg *Config) mustGetService(frontendPortOffset int) config.Service { svc := config.Service{ RPC: config.RPC{ - GRPCPort: o.FrontendPort + frontendPortOffset, - MembershipPort: o.FrontendPort + 100 + frontendPortOffset, + GRPCPort: cfg.FrontendPort + frontendPortOffset, + MembershipPort: cfg.FrontendPort + 100 + frontendPortOffset, BindOnLocalHost: true, BindOnIP: "", }, } // Assign any open port when configured to use dynamic ports - if o.DynamicPorts { + if cfg.DynamicPorts { if frontendPortOffset != 0 { - svc.RPC.GRPCPort = o.portProvider.mustGetFreePort() + svc.RPC.GRPCPort = cfg.portProvider.mustGetFreePort() } - svc.RPC.MembershipPort = o.portProvider.mustGetFreePort() + svc.RPC.MembershipPort = cfg.portProvider.mustGetFreePort() } // Optionally bind frontend to IPv4 address - if frontendPortOffset == 0 && o.FrontendIP != "" { + if frontendPortOffset == 0 && cfg.FrontendIP != "" { svc.RPC.BindOnLocalHost = false - svc.RPC.BindOnIP = o.FrontendIP + svc.RPC.BindOnIP = cfg.FrontendIP } return svc diff --git a/options.go b/options.go index d8b98cd5..0526daaf 100644 --- a/options.go +++ b/options.go @@ -5,6 +5,7 @@ package temporalite import ( + "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" "go.temporal.io/server/temporal" @@ -56,6 +57,15 @@ func WithFrontendPort(port int) ServerOption { }) } +// WithMetricsPort sets the listening port for metrics. +// +// When unspecified, the port will be system-chosen. +func WithMetricsPort(port int) ServerOption { + return newApplyFuncContainer(func(cfg *liteconfig.Config) { + cfg.MetricsPort = port + }) +} + // WithFrontendIP binds the temporal-frontend GRPC service to a specific IP (eg. `0.0.0.0`) // Check net.ParseIP for supported syntax; only IPv4 is supported. // @@ -99,6 +109,16 @@ func WithUpstreamOptions(options ...temporal.ServerOption) ServerOption { }) } +// WithBaseConfig sets the default Temporal server configuration. +// +// Storage and client configuration will always be overridden, however base config can be +// used to enable settings like TLS or authentication. +func WithBaseConfig(base *config.Config) ServerOption { + return newApplyFuncContainer(func(cfg *liteconfig.Config) { + cfg.BaseConfig = base + }) +} + type applyFuncContainer struct { applyInternal func(*liteconfig.Config) } diff --git a/temporaltest/options.go b/temporaltest/options.go index 30f63549..487b15e5 100644 --- a/temporaltest/options.go +++ b/temporaltest/options.go @@ -4,7 +4,13 @@ package temporaltest -import "testing" +import ( + "testing" + + "go.temporal.io/sdk/client" + + "github.com/DataDog/temporalite" +) type TestServerOption interface { apply(*TestServer) @@ -20,6 +26,20 @@ func WithT(t *testing.T) TestServerOption { }) } +// WithBaseClientOptions configures options for the default clients and workers connected to the test server. +func WithBaseClientOptions(o client.Options) TestServerOption { + return newApplyFuncContainer(func(server *TestServer) { + server.defaultClientOptions = o + }) +} + +// WithTemporaliteOptions provides the ability to use additional Temporalite options, including temporalite.WithUpstreamOptions. +func WithTemporaliteOptions(options ...temporalite.ServerOption) TestServerOption { + return newApplyFuncContainer(func(server *TestServer) { + server.serverOptions = append(server.serverOptions, options...) + }) +} + type applyFuncContainer struct { applyInternal func(*TestServer) } diff --git a/temporaltest/server.go b/temporaltest/server.go index b7f67512..9519326b 100644 --- a/temporaltest/server.go +++ b/temporaltest/server.go @@ -27,6 +27,8 @@ type TestServer struct { clients []client.Client workers []worker.Worker t *testing.T + defaultClientOptions client.Options + serverOptions []temporalite.ServerOption } func (ts *TestServer) fatal(err error) { @@ -56,7 +58,7 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker // be closed on TestServer.Stop. func (ts *TestServer) Client() client.Client { if ts.defaultClient == nil { - ts.defaultClient = ts.NewClientWithOptions(client.Options{}) + ts.defaultClient = ts.NewClientWithOptions(ts.defaultClientOptions) } return ts.defaultClient } @@ -119,12 +121,16 @@ func NewServer(opts ...TestServerOption) *TestServer { }) } - s, err := temporalite.NewServer( + // Order of these options matters. When there are conflicts, options later in the list take precedence. + // Always specify options that are required for temporaltest last to avoid accidental overrides. + ts.serverOptions = append(ts.serverOptions, temporalite.WithNamespaces(ts.defaultTestNamespace), temporalite.WithPersistenceDisabled(), temporalite.WithDynamicPorts(), temporalite.WithLogger(log.NewNoopLogger()), ) + + s, err := temporalite.NewServer(ts.serverOptions...) if err != nil { ts.fatal(fmt.Errorf("error creating server: %w", err)) }