Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Service api improvements #1160

Merged
merged 14 commits into from
Dec 20, 2022
4 changes: 1 addition & 3 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,13 +1528,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
if stream == _EMPTY_ {
stream, err = js.StreamNameBySubject(subj)
if err != nil {
return nil, err
}
} else {
stream = o.stream
}

// With an explicit durable name, we can lookup the consumer first
Expand Down
70 changes: 70 additions & 0 deletions micro/example_package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package micro

import (
"fmt"
"log"
"strconv"
"time"

"github.com/nats-io/nats.go"
)

func Example() {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
log.Fatal(err)
}
defer nc.Close()

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) {
val, err := strconv.Atoi(string(req.Data))
if err != nil {
req.Error("400", "request data should be a number")
return
}

responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
}

config := Config{
Name: "IncrementService",
Version: "0.1.0",
Description: "Increment numbers",
Endpoint: Endpoint{
// service handler
Handler: incrementHandler,
// a unique subject serving as a service endpoint
Subject: "numbers.increment",
},
}
// Multiple instances of the servcice with the same name can be created.
// Requests to a service with the same name will be load-balanced.
for i := 0; i < 5; i++ {
svc, err := AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer svc.Stop()
}

// send a request to a service
resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second)
if err != nil {
log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
log.Fatal(err)
}
fmt.Println(responseVal)

//
// Output: 4
//
}
251 changes: 251 additions & 0 deletions micro/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package micro

import (
"fmt"
"log"

"github.com/nats-io/nats.go"
)

func ExampleAddService() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req *Request) {
req.Respond(req.Data)
}

config := Config{
Name: "EchoService",
Version: "v1.0.0",
Description: "Send back what you receive",
Endpoint: Endpoint{
Subject: "echo",
Handler: echoHandler,
},

// DoneHandler can be set to customize behavior on stopping a service.
DoneHandler: func(srv Service) {
info := srv.Info()
fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
},

// ErrorHandler can be used to customize behavior on service execution error.
ErrorHandler: func(srv Service, err *NATSError) {
info := srv.Info()
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},
}

srv, err := AddService(nc, config)
if err != nil {
log.Fatal(err)
}
defer srv.Stop()
}

func ExampleService_Info() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
},
}

srv, _ := AddService(nc, config)

// service info
info := srv.Info()

fmt.Println(info.ID)
fmt.Println(info.Name)
fmt.Println(info.Description)
fmt.Println(info.Version)
fmt.Println(info.Subject)
}

func ExampleService_Stats() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
},
}

srv, _ := AddService(nc, config)

// stats of a service instance
stats := srv.Stats()

fmt.Println(stats.AverageProcessingTime)
fmt.Println(stats.TotalProcessingTime)

}

func ExampleService_Stop() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
},
}

srv, _ := AddService(nc, config)

// stop a service
err = srv.Stop()
if err != nil {
log.Fatal(err)
}

// stop is idempotent so multiple executions will not return an error
err = srv.Stop()
if err != nil {
log.Fatal(err)
}
}

func ExampleService_Stopped() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
},
}

srv, _ := AddService(nc, config)

// stop a service
err = srv.Stop()
if err != nil {
log.Fatal(err)
}

if srv.Stopped() {
fmt.Println("service stopped")
}
}

func ExampleService_Reset() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

config := Config{
Name: "EchoService",
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
},
}

srv, _ := AddService(nc, config)

// reset endpoint stats on this service
srv.Reset()

empty := Stats{
ServiceIdentity: srv.Info().ServiceIdentity,
}
if srv.Stats() != empty {
log.Fatal("Expected endpoint stats to be empty")
}
}

func ExampleControlSubject() {

// subject used to get PING from all services
subjectPINGAll, _ := ControlSubject(PingVerb, "", "")
fmt.Println(subjectPINGAll)

// subject used to get PING from services with provided name
subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "")
fmt.Println(subjectPINGName)

// subject used to get PING from a service with provided name and ID
subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123")
fmt.Println(subjectPINGInstance)

// Output:
// $SRV.PING
// $SRV.PING.COOLSERVICE
// $SRV.PING.COOLSERVICE.123
}

func ExampleRequest_Respond() {
handler := func(req *Request) {
// respond to the request
if err := req.Respond(req.Data); err != nil {
log.Fatal(err)
}
}

fmt.Printf("%T", handler)
}

func ExampleRequest_RespondJSON() {
type Point struct {
X int `json:"x"`
Y int `json:"y"`
}

handler := func(req *Request) {
resp := Point{5, 10}
// respond to the request
// response will be serialized to {"x":5,"y":10}
if err := req.RespondJSON(resp); err != nil {
log.Fatal(err)
}
}

fmt.Printf("%T", handler)
}

func ExampleRequest_Error() {
handler := func(req *Request) {
// respond with an error
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
if err := req.Error("400", "bad request"); err != nil {
log.Fatal(err)
}
}

fmt.Printf("%T", handler)
}
61 changes: 61 additions & 0 deletions micro/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package micro

import (
"encoding/json"
"errors"
"fmt"

"github.com/nats-io/nats.go"
)

type (
Request struct {
*nats.Msg
errResponse bool
}

// RequestHandler is a function used as a Handler for a service.
RequestHandler func(*Request)
)

var (
ErrRespond = errors.New("NATS error when sending response")
ErrMarshalResponse = errors.New("marshaling response")
ErrArgRequired = errors.New("argument required")
)

func (r *Request) Respond(response []byte) error {
if err := r.Msg.Respond(response); err != nil {
return fmt.Errorf("%w: %s", ErrRespond, err)
}

return nil
}

func (r *Request) RespondJSON(response interface{}) error {
resp, err := json.Marshal(response)
if err != nil {
return ErrMarshalResponse
}

return r.Respond(resp)
}

// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
func (r *Request) Error(code, description string) error {
if code == "" {
return fmt.Errorf("%w: error code", ErrArgRequired)
}
if description == "" {
return fmt.Errorf("%w: description", ErrArgRequired)
}
response := &nats.Msg{
Header: nats.Header{
ErrorHeader: []string{description},
ErrorCodeHeader: []string{code},
},
}
r.errResponse = true
return r.RespondMsg(response)
}
Loading