From b54068ff30c6281c143cbd7b937c8c39d1fbe78d Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Mon, 20 Sep 2021 14:07:09 -0400 Subject: [PATCH] remove namespace waiting logic --- server.go | 87 ++----------------------------------------------------- 1 file changed, 2 insertions(+), 85 deletions(-) diff --git a/server.go b/server.go index 36a1c917..e0acfaa7 100644 --- a/server.go +++ b/server.go @@ -7,19 +7,14 @@ package temporalite import ( "context" "fmt" - "sync" "time" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/workflowservice/v1" + "github.com/DataDog/temporalite/internal/liteconfig" "go.temporal.io/sdk/client" "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/config" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/temporal" - "google.golang.org/grpc" - - "github.com/DataDog/temporalite/internal/liteconfig" ) // Server wraps a temporal.Server. @@ -27,7 +22,6 @@ type Server struct { internal *temporal.Server frontendHostPort string config *liteconfig.Config - setupWaitGroup sync.WaitGroup } type ServerOption interface { @@ -70,60 +64,12 @@ func NewServer(opts ...ServerOption) (*Server, error) { frontendHostPort: cfg.PublicClient.HostPort, config: c, } - s.setupWaitGroup.Add(1) return s, nil } // Start temporal server. func (s *Server) Start() error { - if len(s.config.Namespaces) > 0 { - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - nsClient, err := s.newNamespaceClient(ctx) - if err != nil { - panic(err) - } - defer nsClient.Close() - - // Wait for each namespace to be ready - for _, ns := range s.config.Namespaces { - c, err := s.newClient(ctx, client.Options{Namespace: ns}) - if err != nil { - panic(err) - } - - // Wait up to 1 minute (20ms backoff x 3000 attempts) - var ( - maxAttempts = 3000 - backoff = 20 * time.Millisecond - ) - for i := 0; i < maxAttempts; i++ { - _, err = c.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{ - Namespace: ns, - }) - if err == nil { - if _, err := c.DescribeTaskQueue(ctx, "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil { - fmt.Println(err) - break - } - } - time.Sleep(backoff) - } - if err != nil { - panic(fmt.Sprintf("could not connect to namespace %q: %s", ns, err)) - } - - c.Close() - } - - s.setupWaitGroup.Done() - }() - } else { - s.setupWaitGroup.Done() - } - return s.internal.Start() } @@ -135,7 +81,7 @@ func (s *Server) Stop() { // NewClient initializes a client ready to communicate with the Temporal // server in the target namespace. func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client, error) { - return s.newClientBlocking(ctx, client.Options{Namespace: namespace}) + return s.NewClientWithOptions(ctx, client.Options{Namespace: namespace}) } // NewClientWithOptions is the same as NewClient but allows further customization. @@ -144,15 +90,6 @@ func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client // // Note that the HostPort and ConnectionOptions fields of client.Options will always be overridden. func (s *Server) NewClientWithOptions(ctx context.Context, options client.Options) (client.Client, error) { - return s.newClientBlocking(ctx, options) -} - -func (s *Server) newClientBlocking(ctx context.Context, options client.Options) (client.Client, error) { - s.setupWaitGroup.Wait() - return s.newClient(ctx, options) -} - -func (s *Server) newClient(ctx context.Context, options client.Options) (client.Client, error) { options.HostPort = s.frontendHostPort options.ConnectionOptions = client.ConnectionOptions{ DisableHealthCheck: false, @@ -161,29 +98,9 @@ func (s *Server) newClient(ctx context.Context, options client.Options) (client. return client.NewClient(options) } -func (s *Server) newNamespaceClient(ctx context.Context) (client.NamespaceClient, error) { - if err := s.healthCheckFrontend(ctx); err != nil { - return nil, err - } - return client.NewNamespaceClient(client.Options{ - HostPort: s.frontendHostPort, - ConnectionOptions: client.ConnectionOptions{ - DisableHealthCheck: false, - HealthCheckTimeout: timeoutFromContext(ctx, time.Minute), - }, - }) -} - func timeoutFromContext(ctx context.Context, defaultTimeout time.Duration) time.Duration { if deadline, ok := ctx.Deadline(); ok { return deadline.Sub(time.Now()) } return defaultTimeout } - -func (s *Server) healthCheckFrontend(ctx context.Context) error { - if _, err := grpc.DialContext(ctx, s.frontendHostPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil { - return fmt.Errorf("health check failed: %w", err) - } - return nil -}