From 7c4a43841aabbd0610546368f8f044d1b15f0559 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 19 Oct 2023 10:24:40 +0700 Subject: [PATCH 1/6] feat: add lightPush Rest endpoints --- cmd/waku/server/rest/lightpush_api.yaml | 84 +++++++++++++++++++++ cmd/waku/server/rest/lightpush_rest.go | 66 ++++++++++++++++ cmd/waku/server/rest/lightpush_rest_test.go | 75 ++++++++++++++++++ 3 files changed, 225 insertions(+) create mode 100644 cmd/waku/server/rest/lightpush_api.yaml create mode 100644 cmd/waku/server/rest/lightpush_rest.go create mode 100644 cmd/waku/server/rest/lightpush_rest_test.go diff --git a/cmd/waku/server/rest/lightpush_api.yaml b/cmd/waku/server/rest/lightpush_api.yaml new file mode 100644 index 000000000..b2c342f64 --- /dev/null +++ b/cmd/waku/server/rest/lightpush_api.yaml @@ -0,0 +1,84 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: lightpush + description: Lightpush REST API for WakuV2 node + +paths: + /lightpush/v1/message: + post: + summary: Request a message relay from a LightPush service provider + description: Push a message to be relayed on a PubSub topic. + operationId: postMessagesToPubsubTopic + tags: + - lightpush + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PushRequest' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '500': + description: Internal server error + content: + text/plain: + schema: + type: string + '503': + description: Service not available + content: + text/plain: + schema: + type: string + +components: + schemas: + PubsubTopic: + type: string + + ContentTopic: + type: string + + WakuMessage: + type: object + properties: + payload: + type: string + format: byte + contentTopic: + $ref: '#/components/schemas/ContentTopic' + version: + type: number + timestamp: + type: number + required: + - payload + - contentTopic + + PushRequest: + type: object + properties: + pusbsubTopic: + $ref: '#/components/schemas/PubsubTopic' + message: + $ref: '#/components/schemas/WakuMessage' + required: + - message \ No newline at end of file diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go new file mode 100644 index 000000000..3030d965d --- /dev/null +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -0,0 +1,66 @@ +package rest + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" +) + +const routeLightPushV1Messages = "/lightpush/v1/message" + +type LightpushService struct { + node *node.WakuNode + log *zap.Logger + id peer.ID +} + +func NewLightpushService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *LightpushService { + serv := &LightpushService{ + node: node, + log: log.Named("lightpush"), + } + + m.Post(routeLightPushV1Messages, serv.postMessagev1) + + return serv +} + +func (msg lightpushRequest) Check() error { + if msg.Message == nil { + return errors.New("waku message is required") + } + return nil +} + +type lightpushRequest struct { + PubSubTopic string `json:"pubsubTopic"` + Message *pb.WakuMessage `json:"message"` +} + +// handled error codes are 200, 400, 500, 503 +func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) { + msg := &lightpushRequest{} + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(msg); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer req.Body.Close() + // + + if serv.node.Lightpush() == nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + serv.log.Info("", zap.String("", serv.id.String())) + _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic), lightpush.WithPeer(serv.id)) + writeErrOrResponse(w, err, true) +} diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go new file mode 100644 index 000000000..8b453f0f3 --- /dev/null +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -0,0 +1,75 @@ +package rest + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-chi/chi/v5" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/node" + wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func createLightPushNode(t *testing.T) *node.WakuNode { + node, err := node.New(node.WithLightPush(), node.WithWakuRelay()) + require.NoError(t, err) + + err = node.Start(context.Background()) + require.NoError(t, err) + + return node +} + +// node2 connects to node1 +func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { + node1 := createLightPushNode(t) + node2 := createLightPushNode(t) + + node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) + err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) + require.NoError(t, err) + node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) + + return node1, node2 +} + +func TestLightpushMessagev1(t *testing.T) { + pubSubTopic := "/waku/2/default-waku/proto" + node1, node2 := twoLightPushConnectedNodes(t, pubSubTopic) + defer func() { + node1.Stop() + node2.Stop() + }() + + router := chi.NewRouter() + serv := NewLightpushService(node2, router, utils.Logger()) + // serv.id = node1.Host().ID() + _ = serv + + msg := lightpushRequest{ + PubSubTopic: pubSubTopic, + Message: &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "abc", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + }, + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeLightPushV1Messages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) +} From 3204ba8899df4c2f53d6b6e39f04bb855cd76fd9 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 19 Oct 2023 10:25:55 +0700 Subject: [PATCH 2/6] test: lightPush Rest Service --- cmd/waku/server/rest/waku_rest.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index ec0325905..0041ee087 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -36,6 +36,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewDebugService(node, mux) _ = NewHealthService(node, mux) _ = NewStoreService(node, mux) + _ = NewLightpushService(node, mux, log) listenAddr := fmt.Sprintf("%s:%d", address, port) From 9a5541419a1b2b2740dc30e2dc3645dc9ad20eb2 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 19 Oct 2023 10:45:23 +0700 Subject: [PATCH 3/6] fix: check error --- cmd/waku/server/rest/lightpush_rest_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index 8b453f0f3..a7f643fde 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -37,7 +37,8 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) require.NoError(t, err) - node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) + err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) + require.NoError(t, err) return node1, node2 } From c9a339b385df98bac5fed32369a557c3d7113f62 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 19 Oct 2023 11:39:38 +0700 Subject: [PATCH 4/6] nit: fixes --- cmd/waku/server/rest/lightpush_rest.go | 10 ++++++---- cmd/waku/server/rest/lightpush_rest_test.go | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index 3030d965d..e7b360bbe 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -6,7 +6,6 @@ import ( "net/http" "github.com/go-chi/chi/v5" - "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -18,7 +17,6 @@ const routeLightPushV1Messages = "/lightpush/v1/message" type LightpushService struct { node *node.WakuNode log *zap.Logger - id peer.ID } func NewLightpushService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *LightpushService { @@ -53,6 +51,11 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req return } defer req.Body.Close() + + if err := msg.Check(); err != nil { + writeErrOrResponse(w, err, http.StatusBadRequest) + return + } // if serv.node.Lightpush() == nil { @@ -60,7 +63,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req return } - serv.log.Info("", zap.String("", serv.id.String())) - _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic), lightpush.WithPeer(serv.id)) + _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) writeErrOrResponse(w, err, true) } diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go index a7f643fde..d4b8a760e 100644 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ b/cmd/waku/server/rest/lightpush_rest_test.go @@ -53,7 +53,6 @@ func TestLightpushMessagev1(t *testing.T) { router := chi.NewRouter() serv := NewLightpushService(node2, router, utils.Logger()) - // serv.id = node1.Host().ID() _ = serv msg := lightpushRequest{ From 0161bcbf6b4ca6e89511aabb29f17e8e6bf44862 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Fri, 20 Oct 2023 15:58:20 +0700 Subject: [PATCH 5/6] fix: lightpush returned Header is text/plain --- cmd/waku/server/rest/lightpush_rest.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index e7b360bbe..b15cd6a96 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -53,7 +53,8 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req defer req.Body.Close() if err := msg.Check(); err != nil { - writeErrOrResponse(w, err, http.StatusBadRequest) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) return } // @@ -64,5 +65,10 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req } _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) - writeErrOrResponse(w, err, true) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(err.Error())) + } else { + w.WriteHeader(http.StatusOK) + } } From 40e7409f210684985f7e8cd0808d45703b639398 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Sat, 21 Oct 2023 18:11:58 +0700 Subject: [PATCH 6/6] fix: handle error --- cmd/waku/server/rest/lightpush_rest.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index b15cd6a96..8500df4a5 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -54,7 +54,8 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req if err := msg.Check(); err != nil { w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) + _, err = w.Write([]byte(err.Error())) + serv.log.Error("writing response", zap.Error(err)) return } // @@ -67,7 +68,8 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req _, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic)) if err != nil { w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte(err.Error())) + _, err = w.Write([]byte(err.Error())) + serv.log.Error("writing response", zap.Error(err)) } else { w.WriteHeader(http.StatusOK) }