From dc44f932ba665797af0231dd1a3f601925ffe646 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 14 Dec 2022 15:13:57 +0100 Subject: [PATCH] Revert to using service interface, rename package to micro --- {service => micro}/example_package_test.go | 6 +- {service => micro}/example_test.go | 24 +- {service => micro}/request.go | 2 +- {service => micro}/service.go | 208 +++++++--- {service => micro}/service_test.go | 436 ++++++++++++++++++++- 5 files changed, 584 insertions(+), 92 deletions(-) rename {service => micro}/example_package_test.go (91%) rename {service => micro}/example_test.go (90%) rename {service => micro}/request.go (98%) rename {service => micro}/service.go (71%) rename {service => micro}/service_test.go (58%) diff --git a/service/example_package_test.go b/micro/example_package_test.go similarity index 91% rename from service/example_package_test.go rename to micro/example_package_test.go index a5019a7b5..1e919aa84 100644 --- a/service/example_package_test.go +++ b/micro/example_package_test.go @@ -1,4 +1,4 @@ -package service +package micro import ( "fmt" @@ -19,7 +19,7 @@ func Example() { } defer nc.Close() - // Service handler is a function which takes *service.Request as argument. + // Service handler is a function which takes Service.Request as argument. // req.Respond or req.Error should be used to respond to the request. incrementHandler := func(req *Request) { val, err := strconv.Atoi(string(req.Data)) @@ -46,7 +46,7 @@ func Example() { // Multiple instances of the servcice with the same name can be created. // Requests to a service with the same name will be load-balanced. for i := 0; i < 5; i++ { - svc, err := Add(nc, config) + svc, err := AddService(nc, config) if err != nil { log.Fatal(err) } diff --git a/service/example_test.go b/micro/example_test.go similarity index 90% rename from service/example_test.go rename to micro/example_test.go index 4c472cc92..df657cfaa 100644 --- a/service/example_test.go +++ b/micro/example_test.go @@ -1,4 +1,4 @@ -package service +package micro import ( "fmt" @@ -7,7 +7,7 @@ import ( "github.com/nats-io/nats.go" ) -func ExampleAdd() { +func ExampleAddService() { nc, err := nats.Connect("127.0.0.1:4222") if err != nil { log.Fatal(err) @@ -28,17 +28,17 @@ func ExampleAdd() { }, // DoneHandler can be set to customize behavior on stopping a service. - DoneHandler: func(srv *Service) { - fmt.Printf("stopped service %q with ID %q\n", srv.Name, srv.ID()) + DoneHandler: func(srv Service) { + fmt.Printf("stopped service %q with ID %q\n", srv.Name(), srv.ID()) }, // ErrorHandler can be used to customize behavior on service execution error. - ErrorHandler: func(srv *Service, err *NATSError) { - fmt.Printf("Service %q returned an error on subject %q: %s", srv.Name, err.Subject, err.Description) + ErrorHandler: func(srv Service, err *NATSError) { + fmt.Printf("Service %q returned an error on subject %q: %s", srv.Name(), err.Subject, err.Description) }, } - srv, err := Add(nc, config) + srv, err := AddService(nc, config) if err != nil { log.Fatal(err) } @@ -60,7 +60,7 @@ func ExampleService_ID() { }, } - srv, _ := Add(nc, config) + srv, _ := AddService(nc, config) // unique service ID id := srv.ID() @@ -82,7 +82,7 @@ func ExampleService_Stats() { }, } - srv, _ := Add(nc, config) + srv, _ := AddService(nc, config) // stats of a service instance stats := srv.Stats() @@ -108,7 +108,7 @@ func ExampleService_Stop() { }, } - srv, _ := Add(nc, config) + srv, _ := AddService(nc, config) // stop a service err = srv.Stop() @@ -138,7 +138,7 @@ func ExampleService_Stopped() { }, } - srv, _ := Add(nc, config) + srv, _ := AddService(nc, config) // stop a service err = srv.Stop() @@ -166,7 +166,7 @@ func ExampleService_Reset() { }, } - srv, _ := Add(nc, config) + srv, _ := AddService(nc, config) // reset endpoint stats on this service srv.Reset() diff --git a/service/request.go b/micro/request.go similarity index 98% rename from service/request.go rename to micro/request.go index 72cc7f345..d391e8af1 100644 --- a/service/request.go +++ b/micro/request.go @@ -1,4 +1,4 @@ -package service +package micro import ( "encoding/json" diff --git a/service/service.go b/micro/service.go similarity index 71% rename from service/service.go rename to micro/service.go index 1ac4b5be0..dcab61d48 100644 --- a/service/service.go +++ b/micro/service.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package service +package micro import ( "encoding/json" @@ -31,10 +31,37 @@ import ( // This functionality is EXPERIMENTAL and may be changed in later releases. type ( - // Service represents a configured NATS service. + Service interface { + // ID returns the service instance's unique ID. + ID() string + + // Name returns the name of the service. + // It can be shared between multiple service instances. + Name() string + + // Description returns the service description. + Description() string + + // Version returns the service version. + Version() string + + // Stats returns statisctics for the service endpoint and all monitoring endpoints. + Stats() Stats + + // Reset resets all statistics on a service instance. + Reset() + + // Stop drains the endpoint subscriptions and marks the service as stopped. + Stop() error + + // Stopped informs whether [Stop] was executed on the service. + Stopped() bool + } + + // service represents a configured NATS service. // It should be created using [Add] in order to configure the appropriate NATS subscriptions // for request handler and monitoring. - Service struct { + service struct { // Config contains a configuration of the service Config @@ -49,10 +76,10 @@ type ( } // ErrHandler is a function used to configure a custom error handler for a service, - ErrHandler func(*Service, *NATSError) + ErrHandler func(Service, *NATSError) // DoneHandler is a function used to configure a custom done handler for a service. - DoneHandler func(*Service) + DoneHandler func(Service) // StatsHandleris a function used to configure a custom STATS endpoint. // It should return a value which can be serialized to JSON. @@ -169,6 +196,9 @@ var ( // ErrVerbNotSupported is returned when invalid [Verb] is used (PING, SCHEMA, INFO, STATS) ErrVerbNotSupported = errors.New("unsupported verb") + + // ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name + ErrServiceNameRequired = errors.New("service name is required to generate ID control subject") ) func (s Verb) String() string { @@ -186,19 +216,19 @@ func (s Verb) String() string { } } -// Add adds a microservice. +// AddService adds a microservice. // It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as // the actual service handler on the subject provided in config.Endpoint // A service name and Endpoint configuration are required to add a service. -// Add returns a [Service] interface, allowing service menagement. +// AddService returns a [Service] interface, allowing service menagement. // Each service is assigned a unique ID. -func Add(nc *nats.Conn, config Config) (*Service, error) { +func AddService(nc *nats.Conn, config Config) (Service, error) { if err := config.valid(); err != nil { return nil, err } id := nuid.Next() - svc := &Service{ + svc := &service{ Config: config, conn: nc, id: id, @@ -209,6 +239,8 @@ func Add(nc *nats.Conn, config Config) (*Service, error) { Name: config.Name, } + svc.setupAsyncCallbacks() + // Setup internal subscriptions. var err error @@ -232,24 +264,40 @@ func Add(nc *nats.Conn, config Config) (*Service, error) { ID: id, } - infoHandler := func(m *nats.Msg) { + infoHandler := func(req *Request) { response, _ := json.Marshal(info) - m.Respond(response) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err)); err != nil && config.ErrorHandler != nil { + go config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) + } + } } - pingHandler := func(m *nats.Msg) { + pingHandler := func(req *Request) { response, _ := json.Marshal(ping) - m.Respond(response) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err)); err != nil && config.ErrorHandler != nil { + go config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) + } + } } - statusHandler := func(m *nats.Msg) { + statsHandler := func(req *Request) { response, _ := json.Marshal(svc.Stats()) - m.Respond(response) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err)); err != nil && config.ErrorHandler != nil { + go config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) + } + } } - schemaHandler := func(m *nats.Msg) { + schemaHandler := func(req *Request) { response, _ := json.Marshal(svc.Schema) - m.Respond(response) + if err := req.Respond(response); err != nil { + if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err)); err != nil && config.ErrorHandler != nil { + go config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) + } + } } if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil { @@ -258,7 +306,7 @@ func Add(nc *nats.Conn, config Config) (*Service, error) { if err := svc.verbHandlers(nc, PingVerb, pingHandler); err != nil { return nil, err } - if err := svc.verbHandlers(nc, StatsVerb, statusHandler); err != nil { + if err := svc.verbHandlers(nc, StatsVerb, statsHandler); err != nil { return nil, err } @@ -268,80 +316,100 @@ func Add(nc *nats.Conn, config Config) (*Service, error) { } } - svc.natsHandlers.closed = nc.Opts.ClosedCB - if nc.Opts.ClosedCB != nil { - nc.SetClosedHandler(func(c *nats.Conn) { + return svc, nil +} + +func (s *Config) valid() error { + if !serviceNameRegexp.MatchString(s.Name) { + return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) + } + return s.Endpoint.valid() +} + +func (e *Endpoint) valid() error { + if e.Subject == "" { + return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) + } + if e.Handler == nil { + return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) + } + return nil +} + +func (svc *service) setupAsyncCallbacks() { + svc.natsHandlers.closed = svc.conn.Opts.ClosedCB + if svc.conn.Opts.ClosedCB != nil { + svc.conn.SetClosedHandler(func(c *nats.Conn) { svc.Stop() svc.natsHandlers.closed(c) }) } else { - nc.SetClosedHandler(func(c *nats.Conn) { + svc.conn.SetClosedHandler(func(c *nats.Conn) { svc.Stop() }) } - svc.natsHandlers.asyncErr = nc.Opts.AsyncErrorCB - if nc.Opts.AsyncErrorCB != nil { - nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if config.ErrorHandler != nil { - config.ErrorHandler(svc, &NATSError{ - Description: err.Error(), + svc.natsHandlers.asyncErr = svc.conn.Opts.AsyncErrorCB + if svc.conn.Opts.AsyncErrorCB != nil { + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + svc.natsHandlers.asyncErr(c, s, err) + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ Subject: s.Subject, + Description: err.Error(), }) } svc.Stop() svc.natsHandlers.asyncErr(c, s, err) }) } else { - nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if config.ErrorHandler != nil { - config.ErrorHandler(svc, &NATSError{ - Description: err.Error(), + svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if !svc.matchSubscriptionSubject(s.Subject) { + return + } + if svc.Config.ErrorHandler != nil { + svc.Config.ErrorHandler(svc, &NATSError{ Subject: s.Subject, + Description: err.Error(), }) } svc.Stop() }) } - - return svc, nil } -func (s *Config) valid() error { - if !serviceNameRegexp.MatchString(s.Name) { - return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) +func (svc *service) matchSubscriptionSubject(subj string) bool { + if svc.reqSub.Subject == subj { + return true } - return s.Endpoint.valid() -} - -func (e *Endpoint) valid() error { - if e.Subject == "" { - return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) - } - if e.Handler == nil { - return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) + for _, verbSub := range svc.verbSubs { + if verbSub.Subject == subj { + return true + } } - return nil + return false } // verbHandlers generates control handlers for a specific verb. // Each request generates 3 subscriptions, one for the general verb // affecting all services written with the framework, one that handles // all services of a particular kind, and finally a specific service instance. -func (svc *Service) verbHandlers(nc *nats.Conn, verb Verb, handler nats.MsgHandler) error { +func (svc *service) verbHandlers(nc *nats.Conn, verb Verb, handler RequestHandler) error { name := fmt.Sprintf("%s-all", verb.String()) if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { return err } name = fmt.Sprintf("%s-kind", verb.String()) - if err := svc.addInternalHandler(nc, verb, svc.Name, "", name, handler); err != nil { + if err := svc.addInternalHandler(nc, verb, svc.Config.Name, "", name, handler); err != nil { return err } - return svc.addInternalHandler(nc, verb, svc.Name, svc.ID(), verb.String(), handler) + return svc.addInternalHandler(nc, verb, svc.Config.Name, svc.ID(), verb.String(), handler) } // addInternalHandler registers a control subject handler. -func (s *Service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name string, handler nats.MsgHandler) error { +func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name string, handler RequestHandler) error { subj, err := ControlSubject(verb, kind, id) if err != nil { s.Stop() @@ -350,7 +418,7 @@ func (s *Service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st s.verbSubs[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { start := time.Now() - handler(msg) + handler(&Request{Msg: msg}) s.m.Lock() stats := s.endpointStats[name] @@ -371,7 +439,7 @@ func (s *Service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st } // reqHandler itself -func (s *Service) reqHandler(req *Request) { +func (s *service) reqHandler(req *Request) { start := time.Now() s.Endpoint.Handler(req) s.m.Lock() @@ -388,7 +456,7 @@ func (s *Service) reqHandler(req *Request) { } // Stop drains the endpoint subscriptions and marks the service as stopped. -func (s *Service) Stop() error { +func (s *service) Stop() error { s.m.Lock() if s.stopped { return nil @@ -413,7 +481,7 @@ func (s *Service) Stop() error { restoreAsyncHandlers(s.conn, s.natsHandlers) s.stopped = true if s.DoneHandler != nil { - s.DoneHandler(s) + go s.DoneHandler(s) } return nil } @@ -424,12 +492,27 @@ func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { } // ID returns the service instance's unique ID. -func (s *Service) ID() string { +func (s *service) ID() string { return s.id } +// ID returns the service instance's unique ID. +func (s *service) Name() string { + return s.Config.Name +} + +// ID returns the service instance's unique ID. +func (s *service) Version() string { + return s.Config.Version +} + +// ID returns the service instance's unique ID. +func (s *service) Description() string { + return s.Config.Description +} + // Stats returns statisctics for the service endpoint and all monitoring endpoints. -func (s *Service) Stats() Stats { +func (s *service) Stats() Stats { s.m.Lock() defer s.m.Unlock() if s.StatsHandler != nil { @@ -443,15 +526,15 @@ func (s *Service) Stats() Stats { idx++ } return Stats{ - Name: s.Name, + Name: s.Config.Name, ID: s.ID(), - Version: s.Version, + Version: s.Config.Version, Endpoints: v, } } // Reset resets all statistics on a service instance. -func (s *Service) Reset() { +func (s *service) Reset() { s.m.Lock() for _, se := range s.endpointStats { se.NumRequests = 0 @@ -465,7 +548,7 @@ func (s *Service) Reset() { } // Stopped informs whether [Stop] was executed on the service. -func (s *Service) Stopped() bool { +func (s *service) Stopped() bool { s.m.Lock() defer s.m.Unlock() return s.stopped @@ -482,6 +565,9 @@ func ControlSubject(verb Verb, name, id string) (string, error) { if verbStr == "" { return "", fmt.Errorf("%w: %q", ErrVerbNotSupported, verbStr) } + if name == "" && id != "" { + return "", ErrServiceNameRequired + } name = strings.ToUpper(name) if name == "" && id == "" { return fmt.Sprintf("%s.%s", APIPrefix, verbStr), nil diff --git a/service/service_test.go b/micro/service_test.go similarity index 58% rename from service/service_test.go rename to micro/service_test.go index ecb35ac08..768591a70 100644 --- a/service/service_test.go +++ b/micro/service_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package service +package micro import ( "bytes" @@ -19,6 +19,7 @@ import ( "errors" "fmt" "math/rand" + "reflect" "testing" "time" @@ -56,7 +57,7 @@ func TestServiceBasics(t *testing.T) { } } - var svcs []*Service + var svcs []Service // Create 5 service responders. config := Config{ @@ -71,7 +72,7 @@ func TestServiceBasics(t *testing.T) { } for i := 0; i < 5; i++ { - svc, err := Add(nc, config) + svc, err := AddService(nc, config) if err != nil { t.Fatalf("Expected to create Service, got %v", err) } @@ -88,10 +89,10 @@ func TestServiceBasics(t *testing.T) { } for _, svc := range svcs { - if svc.Name != "CoolAddService" { - t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name) + if svc.Name() != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) } - if len(svc.Description) == 0 || len(svc.Version) == 0 { + if len(svc.Description()) == 0 || len(svc.Version()) == 0 { t.Fatalf("Expected non empty description and version") } } @@ -205,6 +206,7 @@ func TestAddService(t *testing.T) { givenConfig Config natsClosedHandler nats.ConnHandler natsErrorHandler nats.ErrHandler + asyncErrorSubject string expectedPing Ping withError error }{ @@ -229,7 +231,7 @@ func TestAddService(t *testing.T) { Subject: "test.sub", Handler: testHandler, }, - DoneHandler: func(*Service) { + DoneHandler: func(Service) { doneService <- struct{}{} }, }, @@ -245,13 +247,31 @@ func TestAddService(t *testing.T) { Subject: "test.sub", Handler: testHandler, }, - ErrorHandler: func(*Service, *NATSError) { + ErrorHandler: func(Service, *NATSError) { errService <- struct{}{} }, }, expectedPing: Ping{ Name: "test_service", }, + asyncErrorSubject: "test.sub", + }, + { + name: "with error handler, no handlers on nats connection, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + ErrorHandler: func(Service, *NATSError) { + errService <- struct{}{} + }, + }, + expectedPing: Ping{ + Name: "test_service", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", }, { name: "with done handler, append to nats handlers", @@ -261,7 +281,7 @@ func TestAddService(t *testing.T) { Subject: "test.sub", Handler: testHandler, }, - DoneHandler: func(*Service) { + DoneHandler: func(Service) { doneService <- struct{}{} }, }, @@ -274,6 +294,7 @@ func TestAddService(t *testing.T) { expectedPing: Ping{ Name: "test_service", }, + asyncErrorSubject: "test.sub", }, { name: "with error handler, append to nats handlers", @@ -283,7 +304,7 @@ func TestAddService(t *testing.T) { Subject: "test.sub", Handler: testHandler, }, - DoneHandler: func(*Service) { + DoneHandler: func(Service) { doneService <- struct{}{} }, }, @@ -297,6 +318,29 @@ func TestAddService(t *testing.T) { Name: "test_service", }, }, + { + name: "with error handler, append to nats handlers, error on monitoring subject", + givenConfig: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: testHandler, + }, + DoneHandler: func(Service) { + doneService <- struct{}{} + }, + }, + natsClosedHandler: func(c *nats.Conn) { + closedNats <- struct{}{} + }, + natsErrorHandler: func(*nats.Conn, *nats.Subscription, error) { + errNats <- struct{}{} + }, + expectedPing: Ping{ + Name: "test_service", + }, + asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + }, { name: "validation error, invalid service name", givenConfig: Config{ @@ -346,7 +390,7 @@ func TestAddService(t *testing.T) { } defer nc.Close() - srv, err := Add(nc, test.givenConfig) + srv, err := AddService(nc, test.givenConfig) if test.withError != nil { if !errors.Is(err, test.withError) { t.Fatalf("Expected error: %v; got: %v", test.withError, err) @@ -354,7 +398,7 @@ func TestAddService(t *testing.T) { return } - pingSubject, err := ControlSubject(PingVerb, srv.Name, srv.ID()) + pingSubject, err := ControlSubject(PingVerb, srv.Name(), srv.ID()) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -389,7 +433,7 @@ func TestAddService(t *testing.T) { } if test.givenConfig.ErrorHandler != nil { - go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: "test.sub"}, fmt.Errorf("oops")) + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) select { case <-errService: case <-time.After(1 * time.Second): @@ -421,7 +465,7 @@ func TestAddService(t *testing.T) { } } if test.natsErrorHandler != nil { - go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: "test.sub"}, fmt.Errorf("oops")) + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops")) select { case <-errService: t.Fatalf("Expected to restore nats error handler") @@ -437,6 +481,308 @@ func TestAddService(t *testing.T) { } } +func TestMonitoringHandlers(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + asyncErr := make(chan struct{}) + errHandler := func(s Service, n *NATSError) { + asyncErr <- struct{}{} + } + + config := Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + Schema: Schema{ + Request: "some_schema", + }, + ErrorHandler: errHandler, + } + srv, err := AddService(nc, config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer func() { + srv.Stop() + if !srv.Stopped() { + t.Fatalf("Expected service to be stopped") + } + }() + + tests := []struct { + name string + subject string + withError bool + expectedResponse interface{} + }{ + { + name: "PING all", + subject: "$SRV.PING", + expectedResponse: Ping{ + Name: "test_service", + ID: srv.ID(), + }, + }, + { + name: "PING name", + subject: "$SRV.PING.TEST_SERVICE", + expectedResponse: Ping{ + Name: "test_service", + ID: srv.ID(), + }, + }, + { + name: "PING ID", + subject: fmt.Sprintf("$SRV.PING.TEST_SERVICE.%s", srv.ID()), + expectedResponse: Ping{ + Name: "test_service", + ID: srv.ID(), + }, + }, + { + name: "INFO all", + subject: "$SRV.INFO", + expectedResponse: Info{ + Name: "test_service", + ID: srv.ID(), + Subject: "test.sub", + }, + }, + { + name: "INFO name", + subject: "$SRV.INFO.TEST_SERVICE", + expectedResponse: Info{ + Name: "test_service", + ID: srv.ID(), + Subject: "test.sub", + }, + }, + { + name: "INFO ID", + subject: fmt.Sprintf("$SRV.INFO.TEST_SERVICE.%s", srv.ID()), + expectedResponse: Info{ + Name: "test_service", + ID: srv.ID(), + Subject: "test.sub", + }, + }, + { + name: "SCHEMA all", + subject: "$SRV.SCHEMA", + expectedResponse: Schema{ + Request: "some_schema", + }, + }, + { + name: "SCHEMA name", + subject: "$SRV.SCHEMA.TEST_SERVICE", + expectedResponse: Schema{ + Request: "some_schema", + }, + }, + { + name: "SCHEMA ID", + subject: fmt.Sprintf("$SRV.SCHEMA.TEST_SERVICE.%s", srv.ID()), + expectedResponse: Schema{ + Request: "some_schema", + }, + }, + { + name: "PING error", + subject: "$SRV.PING", + withError: true, + }, + { + name: "INFO error", + subject: "$SRV.INFO", + withError: true, + }, + { + name: "STATS error", + subject: "$SRV.STATS", + withError: true, + }, + { + name: "SCHEMA error", + subject: "$SRV.SCHEMA", + withError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.withError { + // use publish instead of request, so Respond will fail inside the handler + if err := nc.Publish(test.subject, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-asyncErr: + return + case <-time.After(1 * time.Second): + t.Fatalf("Timeout waiting for async error") + } + return + } + + resp, err := nc.Request(test.subject, nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + respMap := make(map[string]interface{}) + if err := json.Unmarshal(resp.Data, &respMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedResponseJSON, err := json.Marshal(test.expectedResponse) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedRespMap := make(map[string]interface{}) + if err := json.Unmarshal(expectedResponseJSON, &expectedRespMap); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(respMap, expectedRespMap) { + t.Fatalf("Invalid response; want: %+v; got: %+v", expectedRespMap, respMap) + } + }) + } +} + +func TestServiceStats(t *testing.T) { + tests := []struct { + name string + config Config + expectedEndpointsLen int + expectedStats map[string]interface{} + }{ + { + name: "without schema or stats handler", + config: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + }, + expectedEndpointsLen: 10, + }, + { + name: "with stats handler", + config: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedEndpointsLen: 10, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + { + name: "with schema", + config: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + Schema: Schema{ + Request: "some_schema", + }, + }, + expectedEndpointsLen: 13, + }, + { + name: "with schema and stats handler", + config: Config{ + Name: "test_service", + Endpoint: Endpoint{ + Subject: "test.sub", + Handler: func(*Request) {}, + }, + Schema: Schema{ + Request: "some_schema", + }, + StatsHandler: func(e Endpoint) interface{} { + return map[string]interface{}{ + "key": "val", + } + }, + }, + expectedEndpointsLen: 13, + expectedStats: map[string]interface{}{ + "key": "val", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := AddService(nc, test.config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + + resp, err := nc.Request(fmt.Sprintf("$SRV.STATS.TEST_SERVICE.%s", srv.ID()), nil, 1*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var stats Stats + if err := json.Unmarshal(resp.Data, &stats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if len(stats.Endpoints) != test.expectedEndpointsLen { + t.Errorf("Unexpected endpoint count; want: %d; got: %d", test.expectedEndpointsLen, len(stats.Endpoints)) + } + if stats.Name != srv.Name() { + t.Errorf("Unexpected service name; want: %s; got: %s", srv.Name(), stats.Name) + } + if stats.ID != srv.ID() { + t.Errorf("Unexpected service name; want: %s; got: %s", srv.ID(), stats.ID) + } + if test.expectedStats != nil { + for _, e := range stats.Endpoints { + if e.Name != "test_service" { + continue + } + if val, ok := e.Data.(map[string]interface{}); !ok || !reflect.DeepEqual(val, test.expectedStats) { + t.Fatalf("Invalid data from stats handler; want: %v; got: %v", test.expectedStats, val) + } + } + } + }) + } +} + func TestRequestRespond(t *testing.T) { type x struct { A string `json:"a"` @@ -551,7 +897,7 @@ func TestRequestRespond(t *testing.T) { } } - svc, err := Add(nc, Config{ + svc, err := AddService(nc, Config{ Name: "CoolService", Description: "Erroring service", Endpoint: Endpoint{ @@ -604,3 +950,63 @@ func RunServerOnPort(port int) *server.Server { func RunServerWithOptions(opts *server.Options) *server.Server { return natsserver.RunServer(opts) } + +func TestControlSubject(t *testing.T) { + tests := []struct { + name string + verb Verb + srvName string + id string + expectedSubject string + withError error + }{ + { + name: "PING ALL", + verb: PingVerb, + expectedSubject: "$SRV.PING", + }, + { + name: "PING name", + verb: PingVerb, + srvName: "test", + expectedSubject: "$SRV.PING.TEST", + }, + { + name: "PING id", + verb: PingVerb, + srvName: "test", + id: "123", + expectedSubject: "$SRV.PING.TEST.123", + }, + { + name: "invalid verb", + verb: Verb(100), + withError: ErrVerbNotSupported, + }, + { + name: "name not provided", + verb: PingVerb, + srvName: "", + id: "123", + withError: ErrServiceNameRequired, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + res, err := ControlSubject(test.verb, test.srvName, test.id) + if test.withError != nil { + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: %v; got: %v", test.withError, err) + } + return + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if res != test.expectedSubject { + t.Errorf("Invalid subject; want: %q; got: %q", test.expectedSubject, res) + } + }) + } +}