From 011f82049caa0de8ac3389e8f74e3b962806ebf9 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Wed, 4 Aug 2021 17:52:44 -0400 Subject: [PATCH] pre-create namespaces during database initialization --- .../sql/sqlplugin/sqlite/plugin.go | 54 ++++++++++++++++++- internal/liteconfig/config.go | 5 ++ server/server.go | 21 ++------ 3 files changed, 62 insertions(+), 18 deletions(-) diff --git a/internal/common/persistence/sql/sqlplugin/sqlite/plugin.go b/internal/common/persistence/sql/sqlplugin/sqlite/plugin.go index aa4f2d01..f8edfe66 100644 --- a/internal/common/persistence/sql/sqlplugin/sqlite/plugin.go +++ b/internal/common/persistence/sql/sqlplugin/sqlite/plugin.go @@ -25,6 +25,7 @@ package sqlite import ( + "context" gosql "database/sql" _ "embed" "errors" @@ -34,13 +35,18 @@ import ( "strings" "github.com/iancoleman/strcase" - "github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/tools" "github.com/jmoiron/sqlx" + "go.temporal.io/api/enums/v1" + "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/config" + "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/sql" "go.temporal.io/server/common/persistence/sql/sqlplugin" + "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/resolver" "go.temporal.io/server/tools/common/schema" + + "github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/tools" ) const ( @@ -78,6 +84,14 @@ func (p *plugin) CreateDB( return nil, err } p.mainDB = newDB(dbKind, cfg.DatabaseName, conn, nil) + + // Ensure namespaces exist + namespaces := strings.Split(cfg.ConnectAttributes["preCreateNamespaces"], ",") + for _, ns := range namespaces { + if err := createNamespaceIfNotExists(p.mainDB, ns); err != nil { + return nil, fmt.Errorf("error ensuring namespace exists: %w", err) + } + } } return p.mainDB, nil } @@ -168,3 +182,41 @@ func (p *plugin) createDBConnection(dbKind sqlplugin.DbKind, cfg *config.SQL, _ return db, nil } + +func createNamespaceIfNotExists(mainDB *db, namespace string) error { + // Return early if namespace already exists + rows, err := mainDB.SelectFromNamespace(context.Background(), sqlplugin.NamespaceFilter{ + Name: &namespace, + }) + if err == nil && len(rows) > 0 { + return nil + } + + nsID := primitives.NewUUID() + + d, err := serialization.NamespaceDetailToBlob(&persistence.NamespaceDetail{ + Info: &persistence.NamespaceInfo{ + Id: nsID.String(), + State: enums.NAMESPACE_STATE_REGISTERED, + Name: namespace, + }, + Config: &persistence.NamespaceConfig{}, + ReplicationConfig: &persistence.NamespaceReplicationConfig{}, + }) + if err != nil { + return err + } + + if _, err := mainDB.InsertIntoNamespace(context.Background(), &sqlplugin.NamespaceRow{ + ID: nsID, + Name: namespace, + Data: d.GetData(), + DataEncoding: d.GetEncodingType().String(), + IsGlobal: false, + NotificationVersion: 0, + }); err != nil { + return err + } + + return nil +} diff --git a/internal/liteconfig/config.go b/internal/liteconfig/config.go index 2fb3aad1..bb8334a1 100644 --- a/internal/liteconfig/config.go +++ b/internal/liteconfig/config.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/sqlite" @@ -71,6 +72,10 @@ func Convert(cfg *Config) *config.Config { sqliteConfig.DatabaseName = cfg.DatabaseFilePath } + if len(cfg.Namespaces) > 0 { + sqliteConfig.ConnectAttributes["preCreateNamespaces"] = strings.Join(cfg.Namespaces, ",") + } + var ( metricsPort = cfg.FrontendPort + 200 pprofPort = cfg.FrontendPort + 201 diff --git a/server/server.go b/server/server.go index fd76998e..12d4cce5 100644 --- a/server/server.go +++ b/server/server.go @@ -2,14 +2,12 @@ package server import ( "context" - "errors" "fmt" "sync" "time" "github.com/DataDog/temporalite/internal/liteconfig" enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/server/common/authorization" @@ -73,7 +71,7 @@ func New(opts ...Option) (*Server, error) { func (s *Server) Start() error { if len(s.config.Namespaces) > 0 { go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() nsClient, err := s.newNamespaceClient(ctx) if err != nil { @@ -81,20 +79,9 @@ func (s *Server) Start() error { } defer nsClient.Close() - // Create namespaces - var errNamespaceExists *serviceerror.NamespaceAlreadyExists - for _, ns := range s.config.Namespaces { - if err := nsClient.Register(ctx, &workflowservice.RegisterNamespaceRequest{ - Namespace: ns, - WorkflowExecutionRetentionPeriod: &s.config.DefaultNamespaceRetentionPeriod, - }); err != nil && !errors.As(err, &errNamespaceExists) { - panic(err) - } - } - // Wait for each namespace to be ready for _, ns := range s.config.Namespaces { - c, err := s.newClient(context.Background(), client.Options{Namespace: ns}) + c, err := s.newClient(ctx, client.Options{Namespace: ns}) if err != nil { panic(err) } @@ -105,11 +92,11 @@ func (s *Server) Start() error { backoff = 20 * time.Millisecond ) for i := 0; i < maxAttempts; i++ { - _, err = c.ListOpenWorkflow(context.Background(), &workflowservice.ListOpenWorkflowExecutionsRequest{ + _, err = c.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{ Namespace: ns, }) if err == nil { - if _, err := c.DescribeTaskQueue(context.Background(), "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil { + if _, err := c.DescribeTaskQueue(ctx, "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil { fmt.Println(err) break }