diff --git a/body_jetstream/body_jetstream_test.go b/body_jetstream/body_jetstream_test.go index f294f58..8ec54eb 100644 --- a/body_jetstream/body_jetstream_test.go +++ b/body_jetstream/body_jetstream_test.go @@ -60,7 +60,7 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) { } // Read from X-Large-Body-Id js, err := nc.JetStream() - integrationtest.FailOnErr("Error getting JetStream Client: %s", err, t) + integrationtest.FailOnErr("Error getting JetStream ClientConn: %s", err, t) os, err := js.ObjectStore(bucket) integrationtest.FailOnErr("Error getting ObjectStore "+bucket+": %s", err, t) resBytes, err := os.GetBytes(id) @@ -111,13 +111,13 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) { } // we share the same NATS Server and Caddy Server for all testcases - _, nc := integrationtest.StartTestNats(t) + tn := integrationtest.StartTestNats(t) caddyTester := integrationtest.NewCaddyTester(t) for _, testcase := range cases { t.Run(testcase.description, func(t *testing.T) { - subscription, err := nc.SubscribeSync("greet.>") + subscription, err := tn.ClientConn.SubscribeSync("greet.>") defer subscription.Unsubscribe() integrationtest.FailOnErr("error subscribing to greet.>: %w", err, t) @@ -137,7 +137,7 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) { } else { t.Logf("Received message: %+v", msg) } - testcase.assertNatsMessage(msg, nc, t) + testcase.assertNatsMessage(msg, tn.ClientConn, t) }) } } diff --git a/dev.sh b/dev.sh index 02fa14a..7d4fe85 100755 --- a/dev.sh +++ b/dev.sh @@ -15,7 +15,7 @@ set -e function run-tests() { go mod tidy # we cannot run in parallel right now, because the tests all boot up caddy servers and NATS servers on fixed ports. - go test -p 1 -v github.com/sandstorm/caddy-nats-bridge/... + go test -count=1 -p 1 -v github.com/sandstorm/caddy-nats-bridge/... } diff --git a/go.mod b/go.mod index 1e24d73..399017b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ toolchain go1.23.3 require ( github.com/caddyserver/caddy/v2 v2.8.4 - github.com/nats-io/nats.go v1.33.1 + github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nuid v1.0.1 go.uber.org/zap v1.27.0 ) diff --git a/go.sum b/go.sum index 29c4cde..50e85f1 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28g github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= -github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= -github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= diff --git a/integrationtest/caddy_nats_test.go b/integrationtest/caddy_nats_test.go index a4ac27c..3edc043 100644 --- a/integrationtest/caddy_nats_test.go +++ b/integrationtest/caddy_nats_test.go @@ -6,11 +6,11 @@ import ( ) // TestNats tries to experiment with NATS request / reply in a testcase. -func TestNats(t *testing.T) { - _, nc := StartTestNats(t) +func TestNatsExample(t *testing.T) { + tn := StartTestNats(t) - sub, _ := nc.SubscribeSync("greet.*") - nc.Publish("greet.joe", []byte("hello")) + sub, _ := tn.ClientConn.SubscribeSync("greet.*") + tn.ClientConn.Publish("greet.joe", []byte("hello")) msg, err := sub.NextMsg(10 * time.Millisecond) if err != nil { diff --git a/integrationtest/natshelper.go b/integrationtest/natshelper.go index fa006be..1c8c677 100644 --- a/integrationtest/natshelper.go +++ b/integrationtest/natshelper.go @@ -10,11 +10,21 @@ import ( const TEST_PORT = 8369 -func StartTestNats(t *testing.T) (*server.Server, *nats.Conn) { - natsServer := RunServerOnPort(TEST_PORT) - t.Cleanup(func() { - natsServer.Shutdown() - }) +type TestNats struct { + Server *server.Server + ClientConn *nats.Conn +} + +func (tn *TestNats) RestartServer(t *testing.T) { + t.Logf("Shutting down NATS Server") + tn.Server.Shutdown() + t.Logf("Starting NATS Server") + tn.Server = runServerOnPort(TEST_PORT) + t.Logf("Started NATS Server") +} + +func StartTestNats(t *testing.T) TestNats { + natsServer := runServerOnPort(TEST_PORT) serverUrl := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT) natsClient, err := nats.Connect(serverUrl) @@ -25,19 +35,25 @@ func StartTestNats(t *testing.T) (*server.Server, *nats.Conn) { natsClient.Drain() }) - return natsServer, natsClient + tn := TestNats{ + Server: natsServer, + ClientConn: natsClient, + } + + t.Cleanup(func() { + tn.Server.Shutdown() + }) + + return tn } -func RunServerOnPort(port int) *server.Server { +func runServerOnPort(port int) *server.Server { opts := natsserver.DefaultTestOptions opts.Port = port opts.JetStream = true opts.Debug = true opts.Trace = true opts.NoLog = false - return RunServerWithOptions(&opts) -} -func RunServerWithOptions(opts *server.Options) *server.Server { - return natsserver.RunServer(opts) + return natsserver.RunServer(&opts) } diff --git a/logoutput/logoutput_test.go b/logoutput/logoutput_test.go index 2fee4ab..a49476d 100644 --- a/logoutput/logoutput_test.go +++ b/logoutput/logoutput_test.go @@ -2,7 +2,10 @@ package logoutput_test import ( "encoding/json" + "github.com/caddyserver/caddy/v2/caddytest" "github.com/nats-io/nats.go" + "io" + "strings" ) import ( @@ -39,17 +42,122 @@ type logMsgExample struct { func TestLogRequestToNats(t *testing.T) { type testCase struct { - description string - sendHttpRequestAndAssertResponse func() error - handleNatsMessage func(msg *nats.Msg, nc *nats.Conn) error - CaddyfileSnippet string + description string + sendHttpRequestAndAssertResponse func(t *testing.T, tn *integrationtest.TestNats) error + handleNatsMessage func(msg *nats.Msg, nc *nats.Conn) error + CaddyfileSnippet string + shouldReloadCaddyBeforeExecutingTest bool } // Testcases cases := []testCase{ { description: "HTTP request logging to NATS", - sendHttpRequestAndAssertResponse: func() error { + sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error { + // 1) send initial HTTP request (will be validated on the NATS handler side) + req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil) + if err != nil { + return err + } + _, err = http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("HTTP request failed: %w", err) + } + + return nil + }, + CaddyfileSnippet: ` + log { + output nats my.subject + } + route /test/* { + respond 200 + } + `, + handleNatsMessage: func(msg *nats.Msg, nc *nats.Conn) error { + // 2) validate incoming NATS request (converted from HTTP) + if msg.Subject != "my.subject" { + t.Fatalf("Subject not correct, expected 'my.subject', actual: %s", msg.Subject) + } + // {"level":"info","ts":1709835557.872107,"logger":"http.log.access.log0","msg":"handled request","request":{"remote_ip":"127.0.0.1","remote_port":"50174","client_ip":"127.0.0.1","proto":"HTTP/1.1","method":"GET","host":"localhost:8889","uri":"/test/hi","headers":{"User-Agent":["Go-http-client/1.1"],"Accept-Encoding":["gzip"]}},"bytes_read":0,"user_id":"","duration":0.000015458,"size":0,"status":200,"resp_headers":{"Server":["Caddy"],"Content-Type":[]}} + var logMsg logMsgExample + err := json.Unmarshal(msg.Data, &logMsg) + if err != nil { + return err + } + + if logMsg.Level != "info" { + t.Fatalf("msg.level not correct, actual: %s", logMsg.Level) + } + if logMsg.Msg != "handled request" { + t.Fatalf("msg.msg not correct, actual: %s", logMsg.Msg) + } + if logMsg.Status != 200 { + t.Fatalf("msg.status not correct, actual: %d", logMsg.Status) + } + + return nil + }, + }, + + { + description: "HTTP request logging to NATS should also work after reload of caddy", + sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error { + forceReloadCaddy(t) + + // 1) send initial HTTP request (will be validated on the NATS handler side) + req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil) + if err != nil { + return err + } + _, err = http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("HTTP request failed: %w", err) + } + + return nil + }, + CaddyfileSnippet: ` + log { + output nats my.subject + } + route /test/* { + respond 200 + } + `, + handleNatsMessage: func(msg *nats.Msg, nc *nats.Conn) error { + // 2) validate incoming NATS request (converted from HTTP) + if msg.Subject != "my.subject" { + t.Fatalf("Subject not correct, expected 'my.subject', actual: %s", msg.Subject) + } + // {"level":"info","ts":1709835557.872107,"logger":"http.log.access.log0","msg":"handled request","request":{"remote_ip":"127.0.0.1","remote_port":"50174","client_ip":"127.0.0.1","proto":"HTTP/1.1","method":"GET","host":"localhost:8889","uri":"/test/hi","headers":{"User-Agent":["Go-http-client/1.1"],"Accept-Encoding":["gzip"]}},"bytes_read":0,"user_id":"","duration":0.000015458,"size":0,"status":200,"resp_headers":{"Server":["Caddy"],"Content-Type":[]}} + var logMsg logMsgExample + err := json.Unmarshal(msg.Data, &logMsg) + if err != nil { + return err + } + + if logMsg.Level != "info" { + t.Fatalf("msg.level not correct, actual: %s", logMsg.Level) + } + if logMsg.Msg != "handled request" { + t.Fatalf("msg.msg not correct, actual: %s", logMsg.Msg) + } + if logMsg.Status != 200 { + t.Fatalf("msg.status not correct, actual: %d", logMsg.Status) + } + + return nil + }, + }, + + { + description: "!!! UNSTABLE TEST !!! HTTP request logging to NATS should also work after restart of NATS", + sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error { + tn.RestartServer(t) + // not nice ;) + time.Sleep(1 * time.Second) + // 1) send initial HTTP request (will be validated on the NATS handler side) req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil) if err != nil { @@ -98,13 +206,13 @@ func TestLogRequestToNats(t *testing.T) { } // we share the same NATS Server and Caddy Server for all testcases - _, nc := integrationtest.StartTestNats(t) + testNats := integrationtest.StartTestNats(t) caddyTester := integrationtest.NewCaddyTester(t) for _, testcase := range cases { t.Run(testcase.description, func(t *testing.T) { - subscription, err := nc.SubscribeSync(">") + subscription, err := testNats.ClientConn.SubscribeSync(">") defer subscription.Unsubscribe() integrationtest.FailOnErr("error subscribing to >: %w", err, t) @@ -117,17 +225,17 @@ func TestLogRequestToNats(t *testing.T) { // HTTP Request and assertion Goroutine httpResultChan := make(chan error) go func() { - httpResultChan <- testcase.sendHttpRequestAndAssertResponse() + httpResultChan <- testcase.sendHttpRequestAndAssertResponse(t, &testNats) }() // handle NATS message and generate response. - msg, err := subscription.NextMsg(10 * time.Millisecond) + msg, err := subscription.NextMsg(10000 * time.Millisecond) if err != nil { t.Fatalf("message not received: %v", err) } else { t.Logf("Received message: %+v", msg) } - err = testcase.handleNatsMessage(msg, nc) + err = testcase.handleNatsMessage(msg, testNats.ClientConn) if err != nil { t.Fatalf("error with NATS message: %s", err) } @@ -140,3 +248,35 @@ func TestLogRequestToNats(t *testing.T) { }) } } + +func forceReloadCaddy(t *testing.T) { + res, err := http.Get(fmt.Sprintf("http://localhost:%d/config/", caddytest.Default.AdminPort)) + if err != nil { + t.Logf("Error reading config: %s", err) + t.FailNow() + return + } + defer res.Body.Close() + body, _ := io.ReadAll(res.Body) + + client := &http.Client{ + Timeout: caddytest.Default.LoadRequestTimeout, + } + req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d/load", caddytest.Default.AdminPort), strings.NewReader(string(body))) + if err != nil { + t.Logf("failed to create request: %s", err) + t.FailNow() + return + } + + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Cache-Control", "must-revalidate") + + res, err = client.Do(req) + if err != nil { + t.Logf("unable to contact caddy server: %s", err) + t.FailNow() + return + } + return +} diff --git a/natsbridge/nats_bridge_app.go b/natsbridge/nats_bridge_app.go index 1af2991..0ac3be1 100644 --- a/natsbridge/nats_bridge_app.go +++ b/natsbridge/nats_bridge_app.go @@ -99,6 +99,16 @@ func (app *NatsBridgeApp) Start() error { opts = append(opts, opt) } + // we retry forever + opts = append(opts, nats.MaxReconnects(-1)) + opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { + app.logger.Info("NATS disconnected") + })) + opts = append(opts, nats.RetryOnFailedConnect(true)) + opts = append(opts, nats.ReconnectHandler(func(conn *nats.Conn) { + app.logger.Info("NATS reconnected") + })) + server.Conn, err = nats.Connect(server.NatsUrl, opts...) if err != nil { return fmt.Errorf("could not connect to %s : %w", server.NatsUrl, err) @@ -118,12 +128,8 @@ func (app *NatsBridgeApp) Start() error { } func (app *NatsBridgeApp) Stop() error { - defer func() { - for _, server := range app.Servers { - app.logger.Info("closing NATS connection", zap.String("url", server.Conn.ConnectedUrlRedacted())) - server.Conn.Close() - } - }() + // we do NOT close the connections from NATS server, as otherwise we'd disconnect + // on a reload of the Caddy server. app.logger.Info("stopping all NATS subscriptions") for _, server := range app.Servers { diff --git a/publish/publish_test.go b/publish/publish_test.go index 3a4dea6..a03b6ef 100644 --- a/publish/publish_test.go +++ b/publish/publish_test.go @@ -136,13 +136,13 @@ func TestPublishToNats(t *testing.T) { } // we share the same NATS Server and Caddy Server for all testcases - _, nc := integrationtest.StartTestNats(t) + tn := integrationtest.StartTestNats(t) caddyTester := integrationtest.NewCaddyTester(t) for _, testcase := range cases { t.Run(testcase.description, func(t *testing.T) { - subscription, err := nc.SubscribeSync("greet.>") + subscription, err := tn.ClientConn.SubscribeSync("greet.>") defer subscription.Unsubscribe() integrationtest.FailOnErr("error subscribing to greet.>: %w", err, t) @@ -162,7 +162,7 @@ func TestPublishToNats(t *testing.T) { } else { t.Logf("Received message: %+v", msg) } - testcase.assertNatsMessage(msg, nc, t) + testcase.assertNatsMessage(msg, tn.ClientConn, t) }) } } diff --git a/request/request_test.go b/request/request_test.go index 34e327e..b306288 100644 --- a/request/request_test.go +++ b/request/request_test.go @@ -127,13 +127,13 @@ func TestRequestToNats(t *testing.T) { } // we share the same NATS Server and Caddy Server for all testcases - _, nc := integrationtest.StartTestNats(t) + tn := integrationtest.StartTestNats(t) caddyTester := integrationtest.NewCaddyTester(t) for _, testcase := range cases { t.Run(testcase.description, func(t *testing.T) { - subscription, err := nc.SubscribeSync("greet.>") + subscription, err := tn.ClientConn.SubscribeSync("greet.>") defer subscription.Unsubscribe() integrationtest.FailOnErr("error subscribing to greet.>: %w", err, t) @@ -156,7 +156,7 @@ func TestRequestToNats(t *testing.T) { } else { t.Logf("Received message: %+v", msg) } - err = testcase.handleNatsMessage(msg, nc) + err = testcase.handleNatsMessage(msg, tn.ClientConn) if err != nil { t.Fatalf("error with NATS message: %s", err) } diff --git a/subscribe/subscribe_test.go b/subscribe/subscribe_test.go index 537d177..18eb4d1 100644 --- a/subscribe/subscribe_test.go +++ b/subscribe/subscribe_test.go @@ -221,7 +221,7 @@ func TestSubscribeRequestToNats(t *testing.T) { } // we share the same NATS Server and Caddy Server for all testcases - _, nc := integrationtest.StartTestNats(t) + tn := integrationtest.StartTestNats(t) caddyTester := integrationtest.NewCaddyTester(t) for _, testcase := range cases { @@ -247,7 +247,7 @@ func TestSubscribeRequestToNats(t *testing.T) { // send the actual NATS request natsResultChan := make(chan error) go func() { - natsResultChan <- testcase.sendNatsRequest(nc) + natsResultChan <- testcase.sendNatsRequest(tn.ClientConn) }() // wait until both NATS and HTTP goroutines are finished;