diff --git a/js.go b/js.go index 362d75117..a249266e3 100644 --- a/js.go +++ b/js.go @@ -1528,13 +1528,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { + if stream == _EMPTY_ { stream, err = js.StreamNameBySubject(subj) if err != nil { return nil, err } - } else { - stream = o.stream } // With an explicit durable name, we can lookup the consumer first diff --git a/micro/example_package_test.go b/micro/example_package_test.go new file mode 100644 index 000000000..113e8825d --- /dev/null +++ b/micro/example_package_test.go @@ -0,0 +1,84 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "fmt" + "log" + "strconv" + "time" + + "github.com/nats-io/nats.go" +) + +func Example() { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + // Service handler is a function which takes Service.Request as argument. + // req.Respond or req.Error should be used to respond to the request. + incrementHandler := func(req *Request) error { + val, err := strconv.Atoi(string(req.Data)) + if err != nil { + req.Error("400", "request data should be a number", nil) + return nil + } + + responseData := val + 1 + req.Respond([]byte(strconv.Itoa(responseData))) + return nil + } + + config := Config{ + Name: "IncrementService", + Version: "0.1.0", + Description: "Increment numbers", + Endpoint: Endpoint{ + // service handler + Handler: incrementHandler, + // a unique subject serving as a service endpoint + Subject: "numbers.increment", + }, + } + // Multiple instances of the servcice with the same name can be created. + // Requests to a service with the same name will be load-balanced. + for i := 0; i < 5; i++ { + svc, err := AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer svc.Stop() + } + + // send a request to a service + resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) + if err != nil { + log.Fatal(err) + } + responseVal, err := strconv.Atoi(string(resp.Data)) + if err != nil { + log.Fatal(err) + } + fmt.Println(responseVal) + + // + // Output: 4 + // +} diff --git a/micro/example_test.go b/micro/example_test.go new file mode 100644 index 000000000..3e50497ac --- /dev/null +++ b/micro/example_test.go @@ -0,0 +1,267 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "fmt" + "log" + "reflect" + + "github.com/nats-io/nats.go" +) + +func ExampleAddService() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req *Request) error { + req.Respond(req.Data) + return nil + } + + config := Config{ + Name: "EchoService", + Version: "v1.0.0", + Description: "Send back what you receive", + Endpoint: Endpoint{ + Subject: "echo", + Handler: echoHandler, + }, + + // DoneHandler can be set to customize behavior on stopping a service. + DoneHandler: func(srv Service) { + info := srv.Info() + fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID) + }, + + // ErrorHandler can be used to customize behavior on service execution error. + ErrorHandler: func(srv Service, err *NATSError) { + info := srv.Info() + fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description) + }, + } + + srv, err := AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer srv.Stop() +} + +func ExampleService_Info() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) error { return nil }, + }, + } + + srv, _ := AddService(nc, config) + + // service info + info := srv.Info() + + fmt.Println(info.ID) + fmt.Println(info.Name) + fmt.Println(info.Description) + fmt.Println(info.Version) + fmt.Println(info.Subject) +} + +func ExampleService_Stats() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) error { return nil }, + }, + } + + srv, _ := AddService(nc, config) + + // stats of a service instance + stats := srv.Stats() + + fmt.Println(stats.AverageProcessingTime) + fmt.Println(stats.ProcessingTime) + +} + +func ExampleService_Stop() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) error { return nil }, + }, + } + + srv, _ := AddService(nc, config) + + // stop a service + err = srv.Stop() + if err != nil { + log.Fatal(err) + } + + // stop is idempotent so multiple executions will not return an error + err = srv.Stop() + if err != nil { + log.Fatal(err) + } +} + +func ExampleService_Stopped() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) error { return nil }, + }, + } + + srv, _ := AddService(nc, config) + + // stop a service + err = srv.Stop() + if err != nil { + log.Fatal(err) + } + + if srv.Stopped() { + fmt.Println("service stopped") + } +} + +func ExampleService_Reset() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + config := Config{ + Name: "EchoService", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "echo", + Handler: func(*Request) error { return nil }, + }, + } + + srv, _ := AddService(nc, config) + + // reset endpoint stats on this service + srv.Reset() + + empty := Stats{ + ServiceIdentity: srv.Info().ServiceIdentity, + } + if !reflect.DeepEqual(srv.Stats(), empty) { + log.Fatal("Expected endpoint stats to be empty") + } +} + +func ExampleControlSubject() { + + // subject used to get PING from all services + subjectPINGAll, _ := ControlSubject(PingVerb, "", "") + fmt.Println(subjectPINGAll) + + // subject used to get PING from services with provided name + subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "") + fmt.Println(subjectPINGName) + + // subject used to get PING from a service with provided name and ID + subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123") + fmt.Println(subjectPINGInstance) + + // Output: + // $SRV.PING + // $SRV.PING.COOLSERVICE + // $SRV.PING.COOLSERVICE.123 +} + +func ExampleRequest_Respond() { + handler := func(req *Request) { + // respond to the request + if err := req.Respond(req.Data); err != nil { + log.Fatal(err) + } + } + + fmt.Printf("%T", handler) +} + +func ExampleRequest_RespondJSON() { + type Point struct { + X int `json:"x"` + Y int `json:"y"` + } + + handler := func(req *Request) { + resp := Point{5, 10} + // respond to the request + // response will be serialized to {"x":5,"y":10} + if err := req.RespondJSON(resp); err != nil { + log.Fatal(err) + } + } + + fmt.Printf("%T", handler) +} + +func ExampleRequest_Error() { + handler := func(req *Request) error { + // respond with an error + // Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response + if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil { + return err + } + return nil + } + + fmt.Printf("%T", handler) +} diff --git a/micro/request.go b/micro/request.go new file mode 100644 index 000000000..f7cc4a580 --- /dev/null +++ b/micro/request.go @@ -0,0 +1,79 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/nats-io/nats.go" +) + +type ( + Request struct { + *nats.Msg + } + + // RequestHandler is a function used as a Handler for a service. + // It takes a request, which contains the data (payload and headers) of the request, + // as well as exposes methods to respond to the request. + // + // RequestHandler returns an error - if returned, the request will be accounted form in stats (in num_requests), + // and last_error will be set with the value. + RequestHandler func(*Request) error +) + +var ( + ErrRespond = errors.New("NATS error when sending response") + ErrMarshalResponse = errors.New("marshaling response") + ErrArgRequired = errors.New("argument required") +) + +func (r *Request) Respond(response []byte) error { + if err := r.Msg.Respond(response); err != nil { + return fmt.Errorf("%w: %s", ErrRespond, err) + } + + return nil +} + +func (r *Request) RespondJSON(response interface{}) error { + resp, err := json.Marshal(response) + if err != nil { + return ErrMarshalResponse + } + + return r.Respond(resp) +} + +// Error prepares and publishes error response from a handler. +// A response error should be set containing an error code and description. +// Optionally, data can be set as response payload. +func (r *Request) Error(code, description string, data []byte) error { + if code == "" { + return fmt.Errorf("%w: error code", ErrArgRequired) + } + if description == "" { + return fmt.Errorf("%w: description", ErrArgRequired) + } + response := &nats.Msg{ + Header: nats.Header{ + ErrorHeader: []string{description}, + ErrorCodeHeader: []string{code}, + }, + } + response.Data = data + return r.RespondMsg(response) +} diff --git a/micro/service.go b/micro/service.go new file mode 100644 index 000000000..6943f170b --- /dev/null +++ b/micro/service.go @@ -0,0 +1,592 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. + +type ( + Service interface { + // Info returns the service info. + Info() Info + + // Stats returns statisctics for the service endpoint and all monitoring endpoints. + Stats() Stats + + // Reset resets all statistics on a service instance. + Reset() + + // Stop drains the endpoint subscriptions and marks the service as stopped. + Stop() error + + // Stopped informs whether [Stop] was executed on the service. + Stopped() bool + } + + // ErrHandler is a function used to configure a custom error handler for a service, + ErrHandler func(Service, *NATSError) + + // DoneHandler is a function used to configure a custom done handler for a service. + DoneHandler func(Service) + + // StatsHandleris a function used to configure a custom STATS endpoint. + // It should return a value which can be serialized to JSON. + StatsHandler func(Endpoint) interface{} + + // ServiceIdentity contains fields helping to identidy a service instance. + ServiceIdentity struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + } + + // Stats is the type returned by STATS monitoring endpoint. + // It contains stats for a specific endpoint (either request handler or monitoring enpoints). + Stats struct { + ServiceIdentity + NumRequests int `json:"num_requests"` + NumErrors int `json:"num_errors"` + LastError string `json:"last_error"` + ProcessingTime time.Duration `json:"processing_time"` + AverageProcessingTime time.Duration `json:"average_processing_time"` + Started string `json:"started"` + Data json.RawMessage `json:"data,omitempty"` + } + + // Ping is the response type for PING monitoring endpoint. + Ping ServiceIdentity + + // Info is the basic information about a service type. + Info struct { + ServiceIdentity + Description string `json:"description"` + Subject string `json:"subject"` + } + + // SchemaResp is the response value for SCHEMA requests. + SchemaResp struct { + ServiceIdentity + Schema Schema `json:"schema"` + } + + // Schema can be used to configure a schema for a service. + // It is olso returned by the SCHEMA monitoring service (if set). + Schema struct { + Request string `json:"request"` + Response string `json:"response"` + } + + // Endpoint is used to configure a subject and handler for a service. + Endpoint struct { + Subject string `json:"subject"` + Handler RequestHandler + } + + // Verb represents a name of the monitoring service. + Verb int64 + + // Config is a configuration of a service. + Config struct { + Name string `json:"name"` + Version string `json:"version"` + Description string `json:"description"` + Schema Schema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatsHandler StatsHandler + DoneHandler DoneHandler + ErrorHandler ErrHandler + } + + // NATSError represents an error returned by a NATS Subscription. + // It contains a subject on which the subscription failed, so that + // it can be linked with a specific service endpoint. + NATSError struct { + Subject string + Description string + } + + // service represents a configured NATS service. + // It should be created using [Add] in order to configure the appropriate NATS subscriptions + // for request handler and monitoring. + service struct { + // Config contains a configuration of the service + Config + + m sync.Mutex + id string + reqSub *nats.Subscription + verbSubs map[string]*nats.Subscription + stats *Stats + conn *nats.Conn + natsHandlers handlers + stopped bool + + asyncDispatcher asyncCallbacksHandler + } + + handlers struct { + closed nats.ConnHandler + asyncErr nats.ErrHandler + } + + asyncCallbacksHandler struct { + cbQueue chan func() + } +) + +const ( + // Queue Group name used across all services + QG = "q" + + // APIPrefix is the root of all control subjects + APIPrefix = "$SRV" +) + +// Service Error headers +const ( + ErrorHeader = "Nats-Service-Error" + ErrorCodeHeader = "Nats-Service-Error-Code" +) + +// Verbs being used to set up a specific control subject. +const ( + PingVerb Verb = iota + StatsVerb + InfoVerb + SchemaVerb +) + +var ( + // this regular expression is suggested regexp for semver validation: https://semver.org/ + semVerRegexp = regexp.MustCompile(`^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`) + serviceNameRegexp = regexp.MustCompile(`^[A-Za-z0-9\-_]+$`) +) + +// Common errors returned by the Service framework. +var ( + // ErrConfigValidation is returned when service configuration is invalid + ErrConfigValidation = errors.New("validation") + + // ErrVerbNotSupported is returned when invalid [Verb] is used (PING, SCHEMA, INFO, STATS) + ErrVerbNotSupported = errors.New("unsupported verb") + + // ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name + ErrServiceNameRequired = errors.New("service name is required to generate ID control subject") +) + +func (s Verb) String() string { + switch s { + case PingVerb: + return "PING" + case StatsVerb: + return "STATS" + case InfoVerb: + return "INFO" + case SchemaVerb: + return "SCHEMA" + default: + return "" + } +} + +// AddService adds a microservice. +// It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as +// the actual service handler on the subject provided in config.Endpoint +// A service name, version and Endpoint configuration are required to add a service. +// AddService returns a [Service] interface, allowing service menagement. +// Each service is assigned a unique ID. +func AddService(nc *nats.Conn, config Config) (Service, error) { + if err := config.valid(); err != nil { + return nil, err + } + + id := nuid.Next() + svc := &service{ + Config: config, + conn: nc, + id: id, + asyncDispatcher: asyncCallbacksHandler{ + cbQueue: make(chan func(), 100), + }, + } + svcIdentity := ServiceIdentity{ + Name: config.Name, + ID: id, + Version: config.Version, + } + svc.verbSubs = make(map[string]*nats.Subscription) + svc.stats = &Stats{ + ServiceIdentity: svcIdentity, + } + + svc.setupAsyncCallbacks() + + go svc.asyncDispatcher.asyncCBDispatcher() + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { + svc.reqHandler(&Request{Msg: m}) + }) + if err != nil { + svc.asyncDispatcher.close() + return nil, err + } + + ping := Ping(svcIdentity) + + infoHandler := func(req *Request) error { + response, _ := json.Marshal(svc.Info()) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) }) + } + } + return nil + } + + pingHandler := func(req *Request) error { + response, _ := json.Marshal(ping) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) }) + } + } + return nil + } + + statsHandler := func(req *Request) error { + response, _ := json.Marshal(svc.Stats()) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) }) + } + } + return nil + } + + schema := SchemaResp{ + ServiceIdentity: svcIdentity, + Schema: config.Schema, + } + schemaHandler := func(req *Request) error { + response, _ := json.Marshal(schema) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil { + svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) }) + } + } + return nil + } + + if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + if err := svc.verbHandlers(nc, PingVerb, pingHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + if err := svc.verbHandlers(nc, StatsVerb, statsHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + + if err := svc.verbHandlers(nc, SchemaVerb, schemaHandler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + svc.stats.Started = time.Now().Format(time.RFC3339) + + return svc, nil +} + +// dispatch is responsible for calling any async callbacks +func (ac *asyncCallbacksHandler) asyncCBDispatcher() { + for { + f := <-ac.cbQueue + if f == nil { + return + } + f() + } +} + +// dispatch is responsible for calling any async callbacks +func (ac *asyncCallbacksHandler) push(f func()) { + ac.cbQueue <- f +} + +func (ac *asyncCallbacksHandler) close() { + close(ac.cbQueue) +} + +func (s *Config) valid() error { + if !serviceNameRegexp.MatchString(s.Name) { + return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) + } + if !semVerRegexp.MatchString(s.Version) { + return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) + } + return s.Endpoint.valid() +} + +func (e *Endpoint) valid() error { + if e.Subject == "" { + return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) + } + if e.Handler == nil { + return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) + } + return nil +} + +func (svc *service) setupAsyncCallbacks() { + svc.natsHandlers.closed = svc.conn.ClosedHandler() + if svc.natsHandlers.closed != nil { + svc.conn.SetClosedHandler(func(c *nats.Conn) { + svc.Stop() + svc.natsHandlers.closed(c) + }) + } else { + svc.conn.SetClosedHandler(func(c *nats.Conn) { + svc.Stop() + }) + } + + svc.natsHandlers.asyncErr = svc.conn.ErrorHandler() + if svc.natsHandlers.asyncErr != nil { + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + svc.natsHandlers.asyncErr(c, s, err) + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ + Subject: s.Subject, + Description: err.Error(), + }) + } + svc.Stop() + svc.natsHandlers.asyncErr(c, s, err) + }) + } else { + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + return + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ + Subject: s.Subject, + Description: err.Error(), + }) + } + svc.Stop() + }) + } +} + +func (svc *service) matchSubscriptionSubject(subj string) bool { + if svc.reqSub.Subject == subj { + return true + } + for _, verbSub := range svc.verbSubs { + if verbSub.Subject == subj { + return true + } + } + return false +} + +// verbHandlers generates control handlers for a specific verb. +// Each request generates 3 subscriptions, one for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service instance. +func (svc *service) verbHandlers(nc *nats.Conn, verb Verb, handler RequestHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc.addInternalHandler(nc, verb, svc.Config.Name, "", name, handler); err != nil { + return err + } + return svc.addInternalHandler(nc, verb, svc.Config.Name, svc.id, verb.String(), handler) +} + +// addInternalHandler registers a control subject handler. +func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name string, handler RequestHandler) error { + subj, err := ControlSubject(verb, kind, id) + if err != nil { + s.Stop() + return err + } + + s.verbSubs[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { + handler(&Request{Msg: msg}) + }) + if err != nil { + s.Stop() + return err + } + return nil +} + +// reqHandler itself +func (s *service) reqHandler(req *Request) { + start := time.Now() + err := s.Endpoint.Handler(req) + s.m.Lock() + s.stats.NumRequests++ + s.stats.ProcessingTime += time.Since(start) + avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests) + s.stats.AverageProcessingTime = time.Duration(avgProcessingTime) + + if err != nil { + s.stats.NumErrors++ + s.stats.LastError = err.Error() + } + s.m.Unlock() +} + +// Stop drains the endpoint subscriptions and marks the service as stopped. +func (s *service) Stop() error { + s.m.Lock() + if s.stopped { + return nil + } + defer s.m.Unlock() + if s.reqSub != nil { + if err := s.reqSub.Drain(); err != nil { + return fmt.Errorf("draining subscription for request handler: %w", err) + } + s.reqSub = nil + } + var keys []string + for key, sub := range s.verbSubs { + keys = append(keys, key) + if err := sub.Drain(); err != nil { + return fmt.Errorf("draining subscription for subject %q: %w", sub.Subject, err) + } + } + for _, key := range keys { + delete(s.verbSubs, key) + } + restoreAsyncHandlers(s.conn, s.natsHandlers) + s.stopped = true + if s.DoneHandler != nil { + s.asyncDispatcher.push(func() { s.DoneHandler(s) }) + s.asyncDispatcher.close() + } + return nil +} + +func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { + nc.SetClosedHandler(handlers.closed) + nc.SetErrorHandler(handlers.asyncErr) +} + +// ID returns the service instance's unique ID. +func (s *service) Info() Info { + return Info{ + ServiceIdentity: ServiceIdentity{ + Name: s.Config.Name, + ID: s.id, + Version: s.Config.Version, + }, + Description: s.Config.Description, + Subject: s.Config.Endpoint.Subject, + } +} + +// Stats returns statisctics for the service endpoint and all monitoring endpoints. +func (s *service) Stats() Stats { + s.m.Lock() + defer s.m.Unlock() + if s.StatsHandler != nil { + s.stats.Data, _ = json.Marshal(s.StatsHandler(s.Endpoint)) + } + info := s.Info() + return Stats{ + ServiceIdentity: ServiceIdentity{ + Name: info.Name, + ID: info.ID, + Version: info.Version, + }, + NumRequests: s.stats.NumRequests, + NumErrors: s.stats.NumErrors, + ProcessingTime: s.stats.ProcessingTime, + AverageProcessingTime: s.stats.AverageProcessingTime, + Started: s.stats.Started, + Data: s.stats.Data, + } +} + +// Reset resets all statistics on a service instance. +func (s *service) Reset() { + s.m.Lock() + s.stats = &Stats{ + ServiceIdentity: s.Info().ServiceIdentity, + } + s.m.Unlock() +} + +// Stopped informs whether [Stop] was executed on the service. +func (s *service) Stopped() bool { + s.m.Lock() + defer s.m.Unlock() + return s.stopped +} + +// ControlSubject returns monitoring subjects used by the Service. +// Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). +// Depending on whether kind and id are provided, ControlSubject will return one of the following: +// - verb only: subject used to monitor all available services +// - verb and kind: subject used to monitor services with the provided name +// - verb, name and id: subject used to monitor an instance of a service with the provided ID +func ControlSubject(verb Verb, name, id string) (string, error) { + verbStr := verb.String() + if verbStr == "" { + return "", fmt.Errorf("%w: %q", ErrVerbNotSupported, verbStr) + } + if name == "" && id != "" { + return "", ErrServiceNameRequired + } + name = strings.ToUpper(name) + if name == "" && id == "" { + return fmt.Sprintf("%s.%s", APIPrefix, verbStr), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", APIPrefix, verbStr, name), nil + } + return fmt.Sprintf("%s.%s.%s.%s", APIPrefix, verbStr, name, id), nil +} + +func (e *NATSError) Error() string { + return fmt.Sprintf("%q: %s", e.Subject, e.Description) +} diff --git a/micro/service_test.go b/micro/service_test.go new file mode 100644 index 000000000..4cf4ccd0c --- /dev/null +++ b/micro/service_test.go @@ -0,0 +1,1122 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package micro + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "math/rand" + "reflect" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" +) + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(req *Request) error { + if rand.Intn(10) == 0 { + if err := req.Error("400", "client error!", nil); err != nil { + t.Fatalf("Unexpected error when sending error response: %v", err) + } + + // for client-side errors, return nil to avoid tracking the errors in stats + return nil + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + if err := req.Respond([]byte("42")); err != nil { + if err := req.Error("500", "Unexpected error!", nil); err != nil { + t.Fatalf("Unexpected error when sending error response: %v", err) + } + return err + } + return nil + } + + var svcs []Service + + // Create 5 service responders. + config := Config{ + Name: "CoolAddService", + Version: "0.1.0", + Description: "Add things together", + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: Schema{Request: "", Response: ""}, + } + + for i := 0; i < 5; i++ { + svc, err := AddService(nc, config) + if err != nil { + t.Fatalf("Expected to create Service, got %v", err) + } + defer svc.Stop() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + info := svc.Info() + if info.Name != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", info.Name) + } + if len(info.Description) == 0 || len(info.Version) == 0 { + t.Fatalf("Expected non empty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main ServiceImpl. + subj, err := ControlSubject(InfoVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + var inf Info + if err := json.Unmarshal(info.Data, &inf); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } + + // Ping all services. Multiple responses. + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + pingSubject, err := ControlSubject(PingVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var pingCount int + for { + _, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + pingCount++ + } + if pingCount != 5 { + t.Fatalf("Expected 5 ping responses, got: %d", pingCount) + } + + // Get stats from all services + statsInbox := nats.NewInbox() + sub, err = nc.SubscribeSync(statsInbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + statsSubject, err := ControlSubject(StatsVerb, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stats := make([]Stats, 0) + var requestsNum int + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + var srvStats Stats + if err := json.Unmarshal(resp.Data, &srvStats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + requestsNum += srvStats.NumRequests + stats = append(stats, srvStats) + } + if len(stats) != 5 { + t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) + } + + // Services should process 50 requests total + if requestsNum != 50 { + t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) + } + // Reset stats for a service + svcs[0].Reset() + emptyStats := Stats{ + ServiceIdentity: svcs[0].Info().ServiceIdentity, + } + + if !reflect.DeepEqual(svcs[0].Stats(), emptyStats) { + t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats()) + } + +} + +func TestAddService(t *testing.T) { + testHandler := func(*Request) error { return nil } + errNats := make(chan struct{}) + errService := make(chan struct{}) + closedNats := make(chan struct{}) + doneService := make(chan struct{}) + + tests := []struct { + name string + givenConfig Config + natsClosedHandler nats.ConnHandler + natsErrorHandler nats.ErrHandler + asyncErrorSubject string + expectedPing Ping + withError error + }{ + { + name: "minimal config", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with done handler, no handlers on nats connection", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with error handler, no handlers on nats connection", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + ErrorHandler: func(Service, *NATSError) { + errService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "test.sub", + }, + { + name: "with error handler, no handlers on nats connection, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + ErrorHandler: func(Service, *NATSError) { + errService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + }, + { + name: "with done handler, append to nats handlers", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "test.sub", + }, + { + name: "with error handler, append to nats handlers", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + }, + { + name: "with error handler, append to nats handlers, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + Version: "0.1.0", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + }, + { + name: "validation error, invalid service name", + givenConfig: Config{ + Name: "test_service!", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, invalid version", + givenConfig: Config{ + Name: "test_service!", + Version: "abc", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, empty subject", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "", + Handler: testHandler, + }, + }, + withError: ErrConfigValidation, + }, + { + name: "validation error, no handler", + givenConfig: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test_subject", + Handler: nil, + }, + }, + withError: ErrConfigValidation, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL(), + nats.ErrorHandler(test.natsErrorHandler), + nats.ClosedHandler(test.natsClosedHandler), + ) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := AddService(nc, test.givenConfig) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info := srv.Info() + pingSubject, err := ControlSubject(PingVerb, info.Name, info.ID) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + pingResp, err := nc.Request(pingSubject, nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var ping Ping + if err := json.Unmarshal(pingResp.Data, &ping); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + test.expectedPing.ID = info.ID + if test.expectedPing != ping { + t.Fatalf("Invalid ping response; want: %+v; got: %+v", test.expectedPing, ping) + } + + if test.givenConfig.DoneHandler != nil { + go nc.Opts.ClosedCB(nc) + select { + case <-doneService: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on DoneHandler") + } + if test.natsClosedHandler != nil { + select { + case <-closedNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ClosedHandler") + } + } + } + + if test.givenConfig.ErrorHandler != nil { + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) + select { + case <-errService: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ErrorHandler") + } + if test.natsErrorHandler != nil { + select { + case <-errNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on AsyncErrHandler") + } + } + } + + if err := srv.Stop(); err != nil { + t.Fatalf("Unexpected error when stopping the service: %v", err) + } + if test.natsClosedHandler != nil { + go nc.Opts.ClosedCB(nc) + select { + case <-doneService: + t.Fatalf("Expected to restore nats closed handler") + case <-time.After(50 * time.Millisecond): + } + select { + case <-closedNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on ClosedHandler") + } + } + if test.natsErrorHandler != nil { + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) + select { + case <-errService: + t.Fatalf("Expected to restore nats error handler") + case <-time.After(50 * time.Millisecond): + } + select { + case <-errNats: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout on AsyncErrHandler") + } + } + }) + } +} + +func TestMonitoringHandlers(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + asyncErr := make(chan struct{}) + errHandler := func(s Service, n *NATSError) { + asyncErr <- struct{}{} + } + + config := Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) error { return nil }, + }, + Schema: Schema{ + Request: "some_schema", + }, + ErrorHandler: errHandler, + } + srv, err := AddService(nc, config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer func() { + srv.Stop() + if !srv.Stopped() { + t.Fatalf("Expected service to be stopped") + } + }() + + info := srv.Info() + + tests := []struct { + name string + subject string + withError bool + expectedResponse interface{} + }{ + { + name: "PING all", + subject: "$SRV.PING", + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "PING name", + subject: "$SRV.PING.TEST_SERVICE", + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "PING ID", + subject: fmt.Sprintf("$SRV.PING.TEST_SERVICE.%s", info.ID), + expectedResponse: Ping{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + }, + { + name: "INFO all", + subject: "$SRV.INFO", + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "INFO name", + subject: "$SRV.INFO.TEST_SERVICE", + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "INFO ID", + subject: fmt.Sprintf("$SRV.INFO.TEST_SERVICE.%s", info.ID), + expectedResponse: Info{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Subject: "test.sub", + }, + }, + { + name: "SCHEMA all", + subject: "$SRV.SCHEMA", + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "SCHEMA name", + subject: "$SRV.SCHEMA.TEST_SERVICE", + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "SCHEMA ID", + subject: fmt.Sprintf("$SRV.SCHEMA.TEST_SERVICE.%s", info.ID), + expectedResponse: SchemaResp{ + ServiceIdentity: ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + ID: info.ID, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "PING error", + subject: "$SRV.PING", + withError: true, + }, + { + name: "INFO error", + subject: "$SRV.INFO", + withError: true, + }, + { + name: "STATS error", + subject: "$SRV.STATS", + withError: true, + }, + { + name: "SCHEMA error", + subject: "$SRV.SCHEMA", + withError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.withError { + // use publish instead of request, so Respond will fail inside the handler + if err := nc.Publish(test.subject, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-asyncErr: + return + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for async error") + } + return + } + + resp, err := nc.Request(test.subject, nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + respMap := make(map[string]interface{}) + if err := json.Unmarshal(resp.Data, &respMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedResponseJSON, err := json.Marshal(test.expectedResponse) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedRespMap := make(map[string]interface{}) + if err := json.Unmarshal(expectedResponseJSON, &expectedRespMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(respMap, expectedRespMap) { + t.Fatalf("Invalid response; want: %+v; got: %+v", expectedRespMap, respMap) + } + }) + } +} + +func TestServiceStats(t *testing.T) { + handler := func(r *Request) error { + if bytes.Equal(r.Data, []byte("err")) { + r.Error("500", "oops", nil) + return fmt.Errorf("oops") + } + + // client errors (validation etc.) should not be accounted for in stats + if bytes.Equal(r.Data, []byte("client_err")) { + r.Error("400", "bad request", nil) + return nil + } + r.Respond([]byte("ok")) + return nil + } + tests := []struct { + name string + config Config + expectedStats map[string]interface{} + }{ + { + name: "without schema or stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + }, + }, + { + name: "with stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + { + name: "with schema", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + }, + { + name: "with schema and stats handler", + config: Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: handler, + }, + Schema: Schema{ + Request: "some_schema", + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := AddService(nc, test.config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + for i := 0; i < 10; i++ { + if _, err := nc.Request(srv.Info().Subject, []byte("msg"), time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + if _, err := nc.Request(srv.Info().Subject, []byte("client_err"), time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := nc.Request(srv.Info().Subject, []byte("err"), time.Second); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + info := srv.Info() + resp, err := nc.Request(fmt.Sprintf("$SRV.STATS.TEST_SERVICE.%s", info.ID), nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var stats Stats + if err := json.Unmarshal(resp.Data, &stats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if stats.Name != info.Name { + t.Errorf("Unexpected service name; want: %s; got: %s", info.Name, stats.Name) + } + if stats.ID != info.ID { + t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) + } + if stats.NumRequests != 12 { + t.Errorf("Unexpected num_requests; want: 12; got: %d", stats.NumRequests) + } + if stats.NumErrors != 1 { + t.Errorf("Unexpected num_requests; want: 1; got: %d", stats.NumErrors) + } + if test.expectedStats != nil { + var data map[string]interface{} + if err := json.Unmarshal(stats.Data, &data); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(data, test.expectedStats) { + t.Fatalf("Invalid data from stats handler; want: %v; got: %v", test.expectedStats, data) + } + } + }) + } +} + +func TestRequestRespond(t *testing.T) { + type x struct { + A string `json:"a"` + B int `json:"b"` + } + + tests := []struct { + name string + respondData interface{} + errDescription string + errCode string + errData []byte + expectedMessage string + expectedCode string + expectedResponse []byte + withRespondError error + }{ + { + name: "byte response", + respondData: []byte("OK"), + expectedResponse: []byte("OK"), + }, + { + name: "byte response, connection closed", + respondData: []byte("OK"), + withRespondError: ErrRespond, + }, + { + name: "struct response", + respondData: x{"abc", 5}, + expectedResponse: []byte(`{"a":"abc","b":5}`), + }, + { + name: "invalid response data", + respondData: func() {}, + withRespondError: ErrMarshalResponse, + }, + { + name: "generic error", + errDescription: "oops", + errCode: "500", + errData: []byte("error!"), + expectedMessage: "oops", + expectedCode: "500", + }, + { + name: "error without response payload", + errDescription: "oops", + errCode: "500", + expectedMessage: "oops", + expectedCode: "500", + }, + { + name: "missing error code", + errDescription: "oops", + withRespondError: ErrArgRequired, + }, + { + name: "missing error description", + errCode: "500", + withRespondError: ErrArgRequired, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + respData := test.respondData + respError := test.withRespondError + errCode := test.errCode + errDesc := test.errDescription + errData := test.errData + // Stub service. + handler := func(req *Request) error { + if errors.Is(test.withRespondError, ErrRespond) { + nc.Close() + } + if errCode == "" && errDesc == "" { + if resp, ok := respData.([]byte); ok { + err := req.Respond(resp) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return nil + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + } else { + err := req.RespondJSON(respData) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return nil + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + } + return nil + } + + err := req.Error(errCode, errDesc, errData) + if respError != nil { + if !errors.Is(err, respError) { + t.Fatalf("Expected error: %v; got: %v", respError, err) + } + return nil + } + if err != nil { + t.Fatalf("Unexpected error when sending response: %v", err) + } + return nil + } + + svc, err := AddService(nc, Config{ + Name: "CoolService", + Version: "0.1.0", + Description: "Erroring service", + Endpoint: Endpoint{ + Subject: "svc.fail", + Handler: handler, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + resp, err := nc.Request("svc.fail", nil, 50*time.Millisecond) + if test.withRespondError != nil { + return + } + if err != nil { + t.Fatalf("request error: %v", err) + } + + if test.errCode != "" { + description := resp.Header.Get("Nats-Service-Error") + if description != test.expectedMessage { + t.Fatalf("Invalid response message; want: %q; got: %q", test.expectedMessage, description) + } + code := resp.Header.Get("Nats-Service-Error-Code") + if code != test.expectedCode { + t.Fatalf("Invalid response code; want: %q; got: %q", test.expectedCode, code) + } + if !bytes.Equal(resp.Data, test.errData) { + t.Fatalf("Invalid response payload; want: %q; got: %q", string(test.errData), resp.Data) + } + return + } + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if !bytes.Equal(bytes.TrimSpace(resp.Data), bytes.TrimSpace(test.expectedResponse)) { + t.Fatalf("Invalid response; want: %s; got: %s", string(test.expectedResponse), string(resp.Data)) + } + }) + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +} + +func TestControlSubject(t *testing.T) { + tests := []struct { + name string + verb Verb + srvName string + id string + expectedSubject string + withError error + }{ + { + name: "PING ALL", + verb: PingVerb, + expectedSubject: "$SRV.PING", + }, + { + name: "PING name", + verb: PingVerb, + srvName: "test", + expectedSubject: "$SRV.PING.TEST", + }, + { + name: "PING id", + verb: PingVerb, + srvName: "test", + id: "123", + expectedSubject: "$SRV.PING.TEST.123", + }, + { + name: "invalid verb", + verb: Verb(100), + withError: ErrVerbNotSupported, + }, + { + name: "name not provided", + verb: PingVerb, + srvName: "", + id: "123", + withError: ErrServiceNameRequired, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + res, err := ControlSubject(test.verb, test.srvName, test.id) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if res != test.expectedSubject { + t.Errorf("Invalid subject; want: %q; got: %q", test.expectedSubject, res) + } + }) + } +} diff --git a/netchan.go b/netchan.go index 3f2a33e60..a1af9e06e 100644 --- a/netchan.go +++ b/netchan.go @@ -1,4 +1,4 @@ -// Copyright 2013-2018 The NATS Authors +// Copyright 2013-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/parser.go b/parser.go index 4540f5c1a..a0f0cca2e 100644 --- a/parser.go +++ b/parser.go @@ -1,4 +1,4 @@ -// Copyright 2012-2020 The NATS Authors +// Copyright 2012-2122 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/services/errors.go b/services/errors.go deleted file mode 100644 index 9ad1c5452..000000000 --- a/services/errors.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import "fmt" - -type ServiceAPIError struct { - ErrorCode int - Description string -} - -func (e *ServiceAPIError) Error() string { - return fmt.Sprintf("%d %s", e.ErrorCode, e.Description) -} diff --git a/services/service.go b/services/service.go deleted file mode 100644 index e9e3a652e..000000000 --- a/services/service.go +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import ( - "encoding/json" - "errors" - "fmt" - "strings" - "sync" - "time" - - "github.com/nats-io/nats.go" - "github.com/nats-io/nuid" -) - -// Notice: Experimental Preview -// -// This functionality is EXPERIMENTAL and may be changed in later releases. - -type ( - - // Service is an interface for service management. - // It exposes methods to stop/reset a service, as well as get information on a service. - Service interface { - ID() string - Name() string - Description() string - Version() string - Stats() ServiceStats - Reset() - Stop() - } - - // A request handler. - // TODO (could make error more and return more info to user automatically?) - ServiceHandler func(svc Service, req *nats.Msg) error - - // Clients can request as well. - ServiceStats struct { - Name string `json:"name"` - ID string `json:"id"` - Version string `json:"version"` - Started time.Time `json:"started"` - Endpoints []Stats `json:"stats"` - } - - Stats struct { - Name string `json:"name"` - NumRequests int `json:"num_requests"` - NumErrors int `json:"num_errors"` - TotalLatency time.Duration `json:"total_latency"` - AverageLatency time.Duration `json:"average_latency"` - Data interface{} `json:"data"` - } - - // ServiceInfo is the basic information about a service type - ServiceInfo struct { - Name string `json:"name"` - ID string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Subject string `json:"subject"` - } - - ServiceSchema struct { - Request string `json:"request"` - Response string `json:"response"` - } - - Endpoint struct { - Subject string `json:"subject"` - Handler ServiceHandler - } - - InternalEndpoint struct { - Name string - Handler nats.MsgHandler - } - - ServiceVerb int64 - - ServiceConfig struct { - Name string `json:"name"` - Description string `json:"description"` - Version string `json:"version"` - Schema ServiceSchema `json:"schema"` - Endpoint Endpoint `json:"endpoint"` - StatusHandler func(Endpoint) interface{} - } - - // service is the internal implementation of a Service - service struct { - sync.Mutex - ServiceConfig - id string - // subs - reqSub *nats.Subscription - internal map[string]*nats.Subscription - statuses map[string]*Stats - stats *ServiceStats - conn *nats.Conn - } -) - -const ( - // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. - QG = "svc" - - // ServiceApiPrefix is the root of all control subjects - ServiceApiPrefix = "$SRV" - - ServiceErrorHeader = "Nats-Service-Error" -) - -const ( - SrvPing ServiceVerb = iota - SrvStatus - SrvInfo - SrvSchema -) - -func (s *ServiceConfig) Valid() error { - if s.Name == "" { - return errors.New("name is required") - } - return s.Endpoint.Valid() -} - -func (e *Endpoint) Valid() error { - s := strings.TrimSpace(e.Subject) - if len(s) == 0 { - return errors.New("subject is required") - } - if e.Handler == nil { - return errors.New("handler is required") - } - return nil -} - -func (s ServiceVerb) String() string { - switch s { - case SrvPing: - return "PING" - case SrvStatus: - return "STATUS" - case SrvInfo: - return "INFO" - case SrvSchema: - return "SCHEMA" - default: - return "" - } -} - -// Add adds a microservice. -// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and -// also have a version that talkes full blown OpenAPI spec and we can pull these things out. -func Add(nc *nats.Conn, config ServiceConfig) (Service, error) { - if err := config.Valid(); err != nil { - return nil, err - } - - id := nuid.Next() - svc := &service{ - ServiceConfig: config, - conn: nc, - id: id, - } - svc.internal = make(map[string]*nats.Subscription) - svc.statuses = make(map[string]*Stats) - svc.statuses[""] = &Stats{ - Name: config.Name, - } - - svc.stats = &ServiceStats{ - Name: config.Name, - ID: id, - Version: config.Version, - Started: time.Now(), - } - - // Setup internal subscriptions. - var err error - - svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { - svc.reqHandler(m) - }) - if err != nil { - return nil, err - } - - info := &ServiceInfo{ - Name: config.Name, - ID: id, - Description: config.Description, - Version: config.Version, - Subject: config.Endpoint.Subject, - } - - infoHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(info, "", " ") - m.Respond(response) - } - - pingHandler := func(m *nats.Msg) { - infoHandler(m) - } - - statusHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(svc.Stats(), "", " ") - m.Respond(response) - } - - schemaHandler := func(m *nats.Msg) { - response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") - m.Respond(response) - } - - if err := svc.addInternalHandlerGroup(nc, SrvInfo, infoHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandlerGroup(nc, SrvPing, pingHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandlerGroup(nc, SrvStatus, statusHandler); err != nil { - return nil, err - } - - if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { - if err := svc.addInternalHandlerGroup(nc, SrvSchema, schemaHandler); err != nil { - return nil, err - } - } - - svc.stats.ID = id - svc.stats.Started = time.Now() - return svc, nil -} - -// addInternalHandlerGroup generates control handlers for a specific verb -// each request generates 3 subscriptions, one for the general verb -// affecting all services written with the framework, one that handles -// all services of a particular kind, and finally a specific service. -func (svc *service) addInternalHandlerGroup(nc *nats.Conn, verb ServiceVerb, handler nats.MsgHandler) error { - name := fmt.Sprintf("%s-all", verb.String()) - if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { - return err - } - name = fmt.Sprintf("%s-kind", verb.String()) - if err := svc.addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { - return err - } - return svc.addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) -} - -// addInternalHandler registers a control subject handler -func (svc *service) addInternalHandler(nc *nats.Conn, verb ServiceVerb, kind, id, name string, handler nats.MsgHandler) error { - subj, err := SvcControlSubject(verb, kind, id) - if err != nil { - svc.Stop() - return err - } - - svc.internal[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[name] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) - svc.Unlock() - }() - handler(msg) - }) - if err != nil { - svc.Stop() - return err - } - - svc.statuses[name] = &Stats{ - Name: name, - } - return nil -} - -// reqHandler itself -func (svc *service) reqHandler(req *nats.Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[""] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) - svc.Unlock() - }() - - if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { - hdr := make(nats.Header) - apiErr := &ServiceAPIError{} - if ok := errors.As(err, &apiErr); !ok { - hdr[ServiceErrorHeader] = []string{fmt.Sprintf("%d %s", 500, err.Error())} - } else { - hdr[ServiceErrorHeader] = []string{apiErr.Error()} - } - svc.Lock() - stats := svc.statuses[""] - stats.NumErrors++ - svc.Unlock() - - svc.conn.PublishMsg(&nats.Msg{ - Subject: req.Reply, - Header: hdr, - }) - } -} - -func (svc *service) Stop() { - if svc.reqSub != nil { - svc.reqSub.Drain() - svc.reqSub = nil - } - var keys []string - for key, sub := range svc.internal { - keys = append(keys, key) - sub.Drain() - } - for _, key := range keys { - delete(svc.internal, key) - } -} - -func (svc *service) ID() string { - return svc.id -} - -func (svc *service) Name() string { - return svc.ServiceConfig.Name -} - -func (svc *service) Description() string { - return svc.ServiceConfig.Description -} - -func (svc *service) Version() string { - return svc.ServiceConfig.Version -} - -func (svc *service) Stats() ServiceStats { - svc.Lock() - defer func() { - svc.Unlock() - }() - if svc.ServiceConfig.StatusHandler != nil { - stats := svc.statuses[""] - stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) - } - idx := 0 - v := make([]Stats, len(svc.statuses)) - for _, se := range svc.statuses { - v[idx] = *se - idx++ - } - svc.stats.Endpoints = v - return *svc.stats -} - -func (svc *service) Reset() { - for _, se := range svc.statuses { - se.NumRequests = 0 - se.TotalLatency = 0 - se.NumErrors = 0 - se.Data = nil - } -} - -// SvcControlSubject returns monitoring subjects used by the ServiceImpl -func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { - sverb := verb.String() - if sverb == "" { - return "", fmt.Errorf("unsupported service verb") - } - kind = strings.ToUpper(kind) - if kind == "" && id == "" { - return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil - } - if id == "" { - return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil - } - return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil -} diff --git a/services/service_test.go b/services/service_test.go deleted file mode 100644 index 532be8915..000000000 --- a/services/service_test.go +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package services - -import ( - "encoding/json" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/nats-io/nats-server/v2/server" - natsserver "github.com/nats-io/nats-server/v2/test" - "github.com/nats-io/nats.go" -) - -func TestServiceBasics(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub service. - doAdd := func(svc Service, req *nats.Msg) error { - if rand.Intn(10) == 0 { - return fmt.Errorf("Unexpected Error!") - } - // Happy Path. - // Random delay between 5-10ms - time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) - if err := req.Respond([]byte("42")); err != nil { - return err - } - return nil - } - - var svcs []Service - - // Create 5 service responders. - config := ServiceConfig{ - Name: "CoolAddService", - Version: "v0.1", - Description: "Add things together", - Endpoint: Endpoint{ - Subject: "svc.add", - Handler: doAdd, - }, - Schema: ServiceSchema{Request: "", Response: ""}, - } - - for i := 0; i < 5; i++ { - svc, err := Add(nc, config) - if err != nil { - t.Fatalf("Expected to create Service, got %v", err) - } - defer svc.Stop() - svcs = append(svcs, svc) - } - - // Now send 50 requests. - for i := 0; i < 50; i++ { - _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - } - - for _, svc := range svcs { - if svc.Name() != "CoolAddService" { - t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) - } - if len(svc.Description()) == 0 || len(svc.Version()) == 0 { - t.Fatalf("Expected non empty description and version") - } - } - - // Make sure we can request info, 1 response. - // This could be exported as well as main ServiceImpl. - subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") - if err != nil { - t.Fatalf("Failed to building info subject %v", err) - } - info, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - var inf ServiceInfo - if err := json.Unmarshal(info.Data, &inf); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if inf.Subject != "svc.add" { - t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) - } - - // Ping all services. Multiple responses. - // could do STATZ too? - inbox := nats.NewInbox() - sub, err := nc.SubscribeSync(inbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - pingSubject, err := SvcControlSubject(SrvPing, "CoolAddService", "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - var pingCount int - for { - _, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - pingCount++ - } - if pingCount != 5 { - t.Fatalf("Expected 5 ping responses, got: %d", pingCount) - } - - // Get stats from all services - statsInbox := nats.NewInbox() - sub, err = nc.SubscribeSync(statsInbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - statsSubject, err := SvcControlSubject(SrvStatus, "CoolAddService", "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - stats := make([]ServiceStats, 0) - var requestsNum int - for { - resp, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - var srvStats ServiceStats - if err := json.Unmarshal(resp.Data, &srvStats); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if len(srvStats.Endpoints) != 10 { - t.Fatalf("Expected 10 endpoints on a serivce, got: %d", len(srvStats.Endpoints)) - } - for _, e := range srvStats.Endpoints { - if e.Name == "CoolAddService" { - requestsNum += e.NumRequests - } - } - stats = append(stats, srvStats) - } - if len(stats) != 5 { - t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) - } - - // Services should process 50 requests total - if requestsNum != 50 { - t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) - } -} - -func TestServiceErrors(t *testing.T) { - tests := []struct { - name string - handlerResponse error - expectedStatus string - }{ - { - name: "generic error", - handlerResponse: fmt.Errorf("oops"), - expectedStatus: "500 oops", - }, - { - name: "api error", - handlerResponse: &ServiceAPIError{ErrorCode: 400, Description: "oops"}, - expectedStatus: "400 oops", - }, - { - name: "no error", - handlerResponse: nil, - expectedStatus: "", - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub service. - handler := func(svc Service, req *nats.Msg) error { - if test.handlerResponse == nil { - if err := req.Respond([]byte("ok")); err != nil { - return err - } - } - return test.handlerResponse - } - - svc, err := Add(nc, ServiceConfig{ - Name: "CoolService", - Description: "Erroring service", - Endpoint: Endpoint{ - Subject: "svc.fail", - Handler: handler, - }, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer svc.Stop() - - resp, err := nc.Request("svc.fail", nil, 1*time.Second) - if err != nil { - t.Fatalf("request error") - } - - status := resp.Header.Get("Nats-Service-Error") - if status != test.expectedStatus { - t.Fatalf("Invalid response status; want: %q; got: %q", test.expectedStatus, status) - } - }) - } -} - -func RunServerOnPort(port int) *server.Server { - opts := natsserver.DefaultTestOptions - opts.Port = port - return RunServerWithOptions(&opts) -} - -func RunServerWithOptions(opts *server.Options) *server.Server { - return natsserver.RunServer(opts) -} diff --git a/timer.go b/timer.go index 1216762d4..4fb02ecb4 100644 --- a/timer.go +++ b/timer.go @@ -1,4 +1,4 @@ -// Copyright 2017-2018 The NATS Authors +// Copyright 2017-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/timer_test.go b/timer_test.go index d561f9675..7b216b598 100644 --- a/timer_test.go +++ b/timer_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2018 The NATS Authors +// Copyright 2017-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/ws.go b/ws.go index d3732e371..c4919a3ae 100644 --- a/ws.go +++ b/ws.go @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/ws_test.go b/ws_test.go index eafe7b67f..1de473b60 100644 --- a/ws_test.go +++ b/ws_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at