Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from DataDog/jlegrone/test-logger
Browse files Browse the repository at this point in the history
Route temporaltest logs to testing.T logger
  • Loading branch information
jlegrone authored Sep 21, 2021
2 parents c9203bf + 76ee46d commit 3b71bff
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 120 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/urfave/cli/v2 v2.3.0
go.temporal.io/api v1.5.0
go.temporal.io/sdk v1.10.0
go.temporal.io/server v1.12.0
go.temporal.io/server v1.12.1-0.20210921161622-b20a256e8e4f
go.uber.org/zap v1.19.1
modernc.org/sqlite v1.13.0
)
186 changes: 117 additions & 69 deletions go.sum

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions internal/common/persistence/sql/sqlplugin/sqlite/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/api/enums/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
Expand All @@ -65,12 +66,20 @@ var (

type plugin struct {
mainDB *db
logger log.Logger
}

var _ sqlplugin.Plugin = (*plugin)(nil)

func RegisterPlugin(pluginName string) {
sql.RegisterPlugin(pluginName, &plugin{})
// Register registers the SQLite plugin with default configuration.
func Register() {
logger := log.NewCLILogger()
RegisterPluginWithLogger(PluginName, logger)
}

// RegisterPluginWithLogger registers the SQLite plugin with a custom logger and name.
func RegisterPluginWithLogger(pluginName string, logger log.Logger) {
sql.RegisterPlugin(pluginName, &plugin{logger: logger})
}

// CreateDB initialize the db object
Expand Down Expand Up @@ -175,6 +184,7 @@ func (p *plugin) createDBConnection(dbKind sqlplugin.DbKind, cfg *config.SQL, _
InitialVersion: "1.0",
Overwrite: false,
DisableVersioning: false,
Logger: p.logger,
}, conn)
}(cfg.DatabaseName); err != nil {
// Ignore error from running migrations twice against the same db.
Expand Down
2 changes: 1 addition & 1 deletion internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Convert(cfg *Config) *config.Config {
if cfg.Ephemeral {
pluginName = fmt.Sprintf("%s_%d", pluginName, rand.Uint32())
}
sqlite.RegisterPlugin(pluginName)
sqlite.RegisterPluginWithLogger(pluginName, cfg.Logger)

sqliteConfig := config.SQL{
PluginName: pluginName,
Expand Down
34 changes: 34 additions & 0 deletions temporaltest/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package temporaltest

import (
"testing"
)

type testLogger struct {
t *testing.T
}

func (tl *testLogger) logLevel(lvl, msg string, keyvals ...interface{}) {
if tl.t == nil {
return
}
args := []interface{}{lvl, msg}
args = append(args, keyvals...)
tl.t.Log(args...)
}

func (tl *testLogger) Debug(msg string, keyvals ...interface{}) {
tl.logLevel("DEBUG", msg, keyvals)
}

func (tl *testLogger) Info(msg string, keyvals ...interface{}) {
tl.logLevel("INFO ", msg, keyvals)
}

func (tl *testLogger) Warn(msg string, keyvals ...interface{}) {
tl.logLevel("WARN ", msg, keyvals)
}

func (tl *testLogger) Error(msg string, keyvals ...interface{}) {
tl.logLevel("ERROR", msg, keyvals)
}
32 changes: 32 additions & 0 deletions temporaltest/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2021 Datadog, Inc.

package temporaltest

import "testing"

type TestServerOption interface {
apply(*TestServer)
}

// WithT directs all worker and client logs to the test logger.
func WithT(t *testing.T) TestServerOption {
return newApplyFuncContainer(func(server *TestServer) {
server.t = t
})
}

type applyFuncContainer struct {
applyInternal func(*TestServer)
}

func (fso *applyFuncContainer) apply(ts *TestServer) {
fso.applyInternal(ts)
}

func newApplyFuncContainer(apply func(*TestServer)) *applyFuncContainer {
return &applyFuncContainer{
applyInternal: apply,
}
}
60 changes: 49 additions & 11 deletions temporaltest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/server/common/log"
"go.uber.org/zap"

"github.com/DataDog/temporalite"
)
Expand All @@ -24,6 +25,30 @@ type TestServer struct {
defaultTestNamespace string
defaultClient client.Client
clients []client.Client
workers []worker.Worker
t *testing.T
}

func (ts *TestServer) fatal(err error) {
if ts.t == nil {
panic(err)
}
ts.t.Fatal(err)
}

// Worker registers and starts a Temporal worker on the specified task queue.
func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker.Registry)) worker.Worker {
w := worker.New(ts.Client(), taskQueue, worker.Options{
WorkflowPanicPolicy: worker.FailWorkflow,
})
registerFunc(w)
ts.workers = append(ts.workers, w)

if err := w.Start(); err != nil {
ts.fatal(err)
}

return w
}

// Client returns a Temporal client configured for making requests to the server.
Expand All @@ -43,13 +68,16 @@ func (ts *TestServer) NewClientWithOptions(opts client.Options) client.Client {
if opts.Namespace == "" {
opts.Namespace = ts.defaultTestNamespace
}
if opts.Logger == nil {
opts.Logger = &testLogger{ts.t}
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

c, err := ts.server.NewClientWithOptions(ctx, opts)
if err != nil {
panic(fmt.Errorf("error creating client: %w", err))
ts.fatal(fmt.Errorf("error creating client: %w", err))
}

ts.clients = append(ts.clients, c)
Expand All @@ -59,6 +87,9 @@ func (ts *TestServer) NewClientWithOptions(opts client.Options) client.Client {

// Stop closes test clients and shuts down the server.
func (ts *TestServer) Stop() {
for _, w := range ts.workers {
w.Stop()
}
for _, c := range ts.clients {
c.Close()
}
Expand All @@ -67,28 +98,35 @@ func (ts *TestServer) Stop() {

// NewServer starts and returns a new TestServer. The caller should call Stop
// when finished, to shut it down.
func NewServer() *TestServer {
func NewServer(opts ...TestServerOption) *TestServer {
rand.Seed(time.Now().UnixNano())
testNamespace := fmt.Sprintf("temporaltest-%d", rand.Intn(999999))

ts := TestServer{
defaultTestNamespace: testNamespace,
}

// Apply options
for _, opt := range opts {
opt.apply(&ts)
}

s, err := temporalite.NewServer(
temporalite.WithNamespaces(testNamespace),
temporalite.WithNamespaces(ts.defaultTestNamespace),
temporalite.WithPersistenceDisabled(),
temporalite.WithDynamicPorts(),
temporalite.WithLogger(log.NewZapLogger(zap.NewNop())),
temporalite.WithLogger(log.NewNoopLogger()),
)
if err != nil {
panic(fmt.Errorf("error creating server: %w", err))
ts.fatal(fmt.Errorf("error creating server: %w", err))
}
ts.server = s

go func() {
if err := s.Start(); err != nil {
panic(fmt.Errorf("error starting server: %w", err))
ts.fatal(fmt.Errorf("error starting server: %w", err))
}
}()

return &TestServer{
server: s,
defaultTestNamespace: testNamespace,
}
return &ts
}
59 changes: 23 additions & 36 deletions temporaltest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,32 @@ package temporaltest_test

import (
"context"
"fmt"
"testing"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/DataDog/temporalite/internal/examples/helloworld"
"github.com/DataDog/temporalite/temporaltest"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

// to be used in example code
var t *testing.T

func ExampleNewServer_testWorker() {
// Create test Temporal server and client
ts := temporaltest.NewServer()
c := ts.Client()
ts := temporaltest.NewServer(temporaltest.WithT(t))
// Stop server and close clients when tests complete
defer ts.Stop()

// Register a new worker on the `hello_world` task queue
w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)
// Start worker
if err := w.Start(); err != nil {
t.Fatal(err)
}
// Stop worker when tests complete
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})

// Create a test client
c := ts.Client()

// Start test workflow
wfr, err := c.ExecuteWorkflow(
Expand All @@ -53,29 +50,23 @@ func ExampleNewServer_testWorker() {
t.Fatal(err)
}

// Fail if result has unexpected value
if result != "Hello world" {
t.Fatalf("unexpected result: %q", result)
}
// Print result
fmt.Println(result)
// Output: Hello world
}

func TestNewServer(t *testing.T) {
ts := temporaltest.NewServer()
c := ts.Client()
ts := temporaltest.NewServer(temporaltest.WithT(t))
defer ts.Stop()

w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)

if err := w.Start(); err != nil {
t.Fatal(err)
}
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

wfr, err := c.ExecuteWorkflow(
wfr, err := ts.Client().ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{TaskQueue: "hello_world"},
helloworld.Greet,
Expand All @@ -97,20 +88,16 @@ func TestNewServer(t *testing.T) {

func BenchmarkRunWorkflow(b *testing.B) {
ts := temporaltest.NewServer()
c := ts.Client()
defer ts.Stop()

w := worker.New(c, "hello_world", worker.Options{})
helloworld.RegisterWorkflowsAndActivities(w)

if err := w.Start(); err != nil {
panic(err)
}
defer w.Stop()
ts.Worker("hello_world", func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
})
c := ts.Client()

for i := 0; i < b.N; i++ {
func(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wfr, err := c.ExecuteWorkflow(
Expand Down

0 comments on commit 3b71bff

Please sign in to comment.