From 7028a0b1cbc225031b770e1d7d9c4ad9e02b49eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 14 May 2024 09:06:11 -0400 Subject: [PATCH] feat(REST): storev3 client (#1100) --- cmd/waku/server/rest/legacy_store.go | 212 ++++++++++++ cmd/waku/server/rest/legacy_store_api.yaml | 203 ++++++++++++ .../{store_test.go => legacy_store_test.go} | 10 +- cmd/waku/server/rest/store.go | 189 +++++------ cmd/waku/server/rest/store_api.yaml | 310 +++++++----------- cmd/waku/server/rest/waku_rest.go | 3 +- waku/v2/protocol/store/client.go | 37 ++- waku/v2/protocol/store/result.go | 17 +- 8 files changed, 656 insertions(+), 325 deletions(-) create mode 100644 cmd/waku/server/rest/legacy_store.go create mode 100644 cmd/waku/server/rest/legacy_store_api.yaml rename cmd/waku/server/rest/{store_test.go => legacy_store_test.go} (91%) diff --git a/cmd/waku/server/rest/legacy_store.go b/cmd/waku/server/rest/legacy_store.go new file mode 100644 index 000000000..3fb0331a8 --- /dev/null +++ b/cmd/waku/server/rest/legacy_store.go @@ -0,0 +1,212 @@ +package rest + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-chi/chi/v5" + "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" +) + +type LegacyStoreService struct { + node *node.WakuNode + mux *chi.Mux +} + +type LegacyStoreResponse struct { + Messages []LegacyStoreWakuMessage `json:"messages"` + Cursor *LegacyHistoryCursor `json:"cursor,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type LegacyHistoryCursor struct { + PubsubTopic string `json:"pubsubTopic"` + SenderTime string `json:"senderTime"` + StoreTime string `json:"storeTime"` + Digest []byte `json:"digest"` +} + +type LegacyStoreWakuMessage struct { + Payload []byte `json:"payload"` + ContentTopic string `json:"contentTopic"` + Version *uint32 `json:"version,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + Meta []byte `json:"meta,omitempty"` +} + +const routeLegacyStoreMessagesV1 = "/store/v1/messages" + +func NewLegacyStoreService(node *node.WakuNode, m *chi.Mux) *LegacyStoreService { + s := &LegacyStoreService{ + node: node, + mux: m, + } + + m.Get(routeLegacyStoreMessagesV1, s.getV1Messages) + + return s +} + +func getLegacyStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.HistoryRequestOption, error) { + query := &legacy_store.Query{} + var options []legacy_store.HistoryRequestOption + var err error + peerAddrStr := r.URL.Query().Get("peerAddr") + var m multiaddr.Multiaddr + if peerAddrStr != "" { + m, err = multiaddr.NewMultiaddr(peerAddrStr) + if err != nil { + return nil, nil, err + } + options = append(options, legacy_store.WithPeerAddr(m)) + } else { + // The user didn't specify a peer address and self-node is configured as a store node. + // In this case we assume that the user is willing to retrieve the messages stored by + // the local/self store node. + options = append(options, legacy_store.WithLocalQuery()) + } + + query.PubsubTopic = r.URL.Query().Get("pubsubTopic") + + contentTopics := r.URL.Query().Get("contentTopics") + if contentTopics != "" { + query.ContentTopics = strings.Split(contentTopics, ",") + } + + startTimeStr := r.URL.Query().Get("startTime") + if startTimeStr != "" { + startTime, err := strconv.ParseInt(startTimeStr, 10, 64) + if err != nil { + return nil, nil, err + } + query.StartTime = &startTime + } + + endTimeStr := r.URL.Query().Get("endTime") + if endTimeStr != "" { + endTime, err := strconv.ParseInt(endTimeStr, 10, 64) + if err != nil { + return nil, nil, err + } + query.EndTime = &endTime + } + + var cursor *pb.Index + + senderTimeStr := r.URL.Query().Get("senderTime") + storeTimeStr := r.URL.Query().Get("storeTime") + digestStr := r.URL.Query().Get("digest") + + if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" { + cursor = &pb.Index{} + + if senderTimeStr != "" { + cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) + if err != nil { + return nil, nil, err + } + } + + if storeTimeStr != "" { + cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) + if err != nil { + return nil, nil, err + } + } + + if digestStr != "" { + cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) + if err != nil { + return nil, nil, err + } + } + + cursor.PubsubTopic = query.PubsubTopic + + options = append(options, legacy_store.WithCursor(cursor)) + } + + pageSizeStr := r.URL.Query().Get("pageSize") + ascendingStr := r.URL.Query().Get("ascending") + if ascendingStr != "" || pageSizeStr != "" { + ascending := true + pageSize := uint64(legacy_store.DefaultPageSize) + if ascendingStr != "" { + ascending, err = strconv.ParseBool(ascendingStr) + if err != nil { + return nil, nil, err + } + } + + if pageSizeStr != "" { + pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) + if err != nil { + return nil, nil, err + } + if pageSize > legacy_store.MaxPageSize { + pageSize = legacy_store.MaxPageSize + } + } + + options = append(options, legacy_store.WithPaging(ascending, pageSize)) + } + + return query, options, nil +} + +func writeLegacyStoreError(w http.ResponseWriter, code int, err error) { + writeResponse(w, LegacyStoreResponse{ErrorMessage: err.Error()}, code) +} + +func toLegacyStoreResponse(result *legacy_store.Result) LegacyStoreResponse { + response := LegacyStoreResponse{} + + cursor := result.Cursor() + if cursor != nil { + response.Cursor = &LegacyHistoryCursor{ + PubsubTopic: cursor.PubsubTopic, + SenderTime: fmt.Sprintf("%d", cursor.SenderTime), + StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime), + Digest: cursor.Digest, + } + } + + for _, m := range result.Messages { + response.Messages = append(response.Messages, LegacyStoreWakuMessage{ + Payload: m.Payload, + ContentTopic: m.ContentTopic, + Version: m.Version, + Timestamp: m.Timestamp, + Meta: m.Meta, + }) + } + + return response +} + +func (d *LegacyStoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { + query, options, err := getLegacyStoreParams(r) + if err != nil { + writeLegacyStoreError(w, http.StatusBadRequest, err) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + result, err := d.node.LegacyStore().Query(ctx, *query, options...) + if err != nil { + writeLegacyStoreError(w, http.StatusInternalServerError, err) + return + } + + writeErrOrResponse(w, nil, toLegacyStoreResponse(result)) +} diff --git a/cmd/waku/server/rest/legacy_store_api.yaml b/cmd/waku/server/rest/legacy_store_api.yaml new file mode 100644 index 000000000..41a313fe5 --- /dev/null +++ b/cmd/waku/server/rest/legacy_store_api.yaml @@ -0,0 +1,203 @@ +openapi: 3.0.3 +info: + title: Waku V2 node Store REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: store + description: Store REST API for WakuV2 node + +paths: + /store/v1/messages: + get: + summary: Gets message history + description: > + Retrieves WakuV2 message history. The returned history + can be potentially filtered by optional request parameters. + operationId: getMessageHistory + tags: + - store + parameters: + - name: peerAddr + in: query + schema: + type: string + required: true + description: > + P2P fully qualified peer multiaddress + in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. + example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' + + - name: pubsubTopic + in: query + schema: + type: string + description: > + The pubsub topic on which a WakuMessage is published. + If left empty, no filtering is applied. + It is also intended for pagination purposes. + It should be a URL-encoded string. + example: 'my%20pubsub%20topic' + + - name: contentTopics + in: query + schema: string + description: > + Comma-separated list of content topics. When specified, + only WakuMessages that are linked to any of the given + content topics will be delivered in the get response. + It should be a URL-encoded-comma-separated string. + example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' + + - name: startTime + in: query + schema: + type: string + description: > + The inclusive lower bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: endTime + in: query + schema: + type: string + description: > + The inclusive upper bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: senderTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was generated. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590947000000000' + + - name: storeTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was stored. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590945000000000' + + - name: digest + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + URL-base64-encoded string computed as a hash of the + a message content topic plus a message payload. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' + + - name: pageSize + in: query + schema: + type: string + description: > + Number of messages to retrieve per page + example: '5' + + - name: ascending + in: query + schema: + type: string + description: > + "true" for paging forward, "false" for paging backward + example: "true" + + responses: + '200': + description: WakuV2 message history. + content: + application/json: + schema: + $ref: '#/components/schemas/StoreResponse' + '400': + description: Bad request error. + content: + text/plain: + type: string + '412': + description: Precondition failed. + content: + text/plain: + type: string + '500': + description: Internal server error. + content: + text/plain: + type: string + +components: + schemas: + StoreResponse: + type: object + properties: + messages: + type: array + items: + $ref: '#/components/schemas/WakuMessage' + cursor: + $ref: '#/components/schemas/HistoryCursor' + error_message: + type: string + required: + - messages + + HistoryCursor: + type: object + properties: + pubsub_topic: + type: string + sender_time: + type: string + store_time: + type: string + digest: + type: string + required: + - pubsub_topic + - sender_time + - store_time + - digest + + WakuMessage: + type: object + properties: + payload: + type: string + content_topic: + type: string + version: + type: integer + format: int32 + timestamp: + type: integer + format: int64 + ephemeral: + type: boolean + required: + - payload + - content_topic diff --git a/cmd/waku/server/rest/store_test.go b/cmd/waku/server/rest/legacy_store_test.go similarity index 91% rename from cmd/waku/server/rest/store_test.go rename to cmd/waku/server/rest/legacy_store_test.go index 3b7eca13e..b4c80b1e3 100644 --- a/cmd/waku/server/rest/store_test.go +++ b/cmd/waku/server/rest/legacy_store_test.go @@ -52,7 +52,7 @@ func TestGetMessages(t *testing.T) { defer node2.Stop() router := chi.NewRouter() - _ = NewStoreService(node2, router) + _ = NewLegacyStoreService(node2, router) // TEST: get cursor // TEST: get no messages @@ -64,12 +64,12 @@ func TestGetMessages(t *testing.T) { "pubsubTopic": {pubsubTopic1}, "pageSize": {"2"}, } - path := routeStoreMessagesV1 + "?" + queryParams.Encode() + path := routeLegacyStoreMessagesV1 + "?" + queryParams.Encode() req, _ := http.NewRequest(http.MethodGet, path, nil) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) - response := StoreResponse{} + response := LegacyStoreResponse{} err = json.Unmarshal(rr.Body.Bytes(), &response) require.NoError(t, err) require.Len(t, response.Messages, 2) @@ -84,12 +84,12 @@ func TestGetMessages(t *testing.T) { "digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)}, "pageSize": {"2"}, } - path = routeStoreMessagesV1 + "?" + queryParams.Encode() + path = routeLegacyStoreMessagesV1 + "?" + queryParams.Encode() req, _ = http.NewRequest(http.MethodGet, path, nil) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) - response = StoreResponse{} + response = LegacyStoreResponse{} err = json.Unmarshal(rr.Body.Bytes(), &response) require.NoError(t, err) require.Len(t, response.Messages, 1) diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go index f1435eb81..991fc89e5 100644 --- a/cmd/waku/server/rest/store.go +++ b/cmd/waku/server/rest/store.go @@ -3,7 +3,7 @@ package rest import ( "context" "encoding/base64" - "fmt" + "errors" "net/http" "strconv" "strings" @@ -12,52 +12,34 @@ import ( "github.com/go-chi/chi/v5" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" ) -type StoreService struct { +type StoreQueryService struct { node *node.WakuNode mux *chi.Mux } -type StoreResponse struct { - Messages []StoreWakuMessage `json:"messages"` - Cursor *HistoryCursor `json:"cursor,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` -} - -type HistoryCursor struct { - PubsubTopic string `json:"pubsubTopic"` - SenderTime string `json:"senderTime"` - StoreTime string `json:"storeTime"` - Digest []byte `json:"digest"` -} +const routeStoreMessagesV1 = "/store/v3/messages" -type StoreWakuMessage struct { - Payload []byte `json:"payload"` - ContentTopic string `json:"contentTopic"` - Version *uint32 `json:"version,omitempty"` - Timestamp *int64 `json:"timestamp,omitempty"` - Meta []byte `json:"meta,omitempty"` -} - -const routeStoreMessagesV1 = "/store/v1/messages" - -func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { - s := &StoreService{ +func NewStoreQueryService(node *node.WakuNode, m *chi.Mux) *StoreQueryService { + s := &StoreQueryService{ node: node, mux: m, } - m.Get(routeStoreMessagesV1, s.getV1Messages) + m.Get(routeStoreMessagesV1, s.getV3Messages) return s } -func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.HistoryRequestOption, error) { - query := &legacy_store.Query{} - var options []legacy_store.HistoryRequestOption +func getStoreParams(r *http.Request) (store.Criteria, []store.RequestOption, error) { + var options []store.RequestOption var err error peerAddrStr := r.URL.Query().Get("peerAddr") var m multiaddr.Multiaddr @@ -66,72 +48,78 @@ func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.Histor if err != nil { return nil, nil, err } - options = append(options, legacy_store.WithPeerAddr(m)) - } else { - // The user didn't specify a peer address and self-node is configured as a store node. - // In this case we assume that the user is willing to retrieve the messages stored by - // the local/self store node. - options = append(options, legacy_store.WithLocalQuery()) + options = append(options, store.WithPeerAddr(m)) } - query.PubsubTopic = r.URL.Query().Get("pubsubTopic") + includeData := false + includeDataStr := r.URL.Query().Get("includeData") + if includeDataStr != "" { + includeData, err = strconv.ParseBool(includeDataStr) + if err != nil { + return nil, nil, errors.New("invalid value for includeData. Use true|false") + } + } + options = append(options, store.IncludeData(includeData)) + + pubsubTopic := r.URL.Query().Get("pubsubTopic") contentTopics := r.URL.Query().Get("contentTopics") + var contentTopicsArr []string if contentTopics != "" { - query.ContentTopics = strings.Split(contentTopics, ",") + contentTopicsArr = strings.Split(contentTopics, ",") + } + + hashesStr := r.URL.Query().Get("hashes") + var hashes []pb.MessageHash + if hashesStr != "" { + hashesStrArr := strings.Split(hashesStr, ",") + for _, hashStr := range hashesStrArr { + hash, err := base64.URLEncoding.DecodeString(hashStr) + if err != nil { + return nil, nil, err + } + hashes = append(hashes, pb.ToMessageHash(hash)) + } + } + + isMsgHashCriteria := false + if len(hashes) != 0 { + isMsgHashCriteria = true + if pubsubTopic != "" || len(contentTopics) != 0 { + return nil, nil, errors.New("cant use content filters while specifying message hashes") + } + } else { + if pubsubTopic == "" || len(contentTopicsArr) != 0 { + return nil, nil, errors.New("pubsubTOpic and contentTopics are required") + } } startTimeStr := r.URL.Query().Get("startTime") + var startTime int64 if startTimeStr != "" { - startTime, err := strconv.ParseInt(startTimeStr, 10, 64) + startTime, err = strconv.ParseInt(startTimeStr, 10, 64) if err != nil { return nil, nil, err } - query.StartTime = &startTime } endTimeStr := r.URL.Query().Get("endTime") + var endTime int64 if endTimeStr != "" { - endTime, err := strconv.ParseInt(endTimeStr, 10, 64) + endTime, err = strconv.ParseInt(endTimeStr, 10, 64) if err != nil { return nil, nil, err } - query.EndTime = &endTime } - var cursor *pb.Index - - senderTimeStr := r.URL.Query().Get("senderTime") - storeTimeStr := r.URL.Query().Get("storeTime") - digestStr := r.URL.Query().Get("digest") - - if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" { - cursor = &pb.Index{} - - if senderTimeStr != "" { - cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - } - - if storeTimeStr != "" { - cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - } - - if digestStr != "" { - cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) - if err != nil { - return nil, nil, err - } + var cursor []byte + cursorStr := r.URL.Query().Get("cursor") + if cursorStr != "" { + cursor, err = base64.URLEncoding.DecodeString(cursorStr) + if err != nil { + return nil, nil, err } - - cursor.PubsubTopic = query.PubsubTopic - - options = append(options, legacy_store.WithCursor(cursor)) + options = append(options, store.WithCursor(cursor)) } pageSizeStr := r.URL.Query().Get("pageSize") @@ -156,43 +144,30 @@ func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.Histor } } - options = append(options, legacy_store.WithPaging(ascending, pageSize)) + options = append(options, store.WithPaging(ascending, pageSize)) } - return query, options, nil -} - -func writeStoreError(w http.ResponseWriter, code int, err error) { - writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code) -} - -func toStoreResponse(result *legacy_store.Result) StoreResponse { - response := StoreResponse{} - - cursor := result.Cursor() - if cursor != nil { - response.Cursor = &HistoryCursor{ - PubsubTopic: cursor.PubsubTopic, - SenderTime: fmt.Sprintf("%d", cursor.SenderTime), - StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime), - Digest: cursor.Digest, + var query store.Criteria + if isMsgHashCriteria { + query = store.MessageHashCriteria{ + MessageHashes: hashes, + } + } else { + query = store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopicsArr...), + TimeStart: &startTime, + TimeEnd: &endTime, } } - for _, m := range result.Messages { - response.Messages = append(response.Messages, StoreWakuMessage{ - Payload: m.Payload, - ContentTopic: m.ContentTopic, - Version: m.Version, - Timestamp: m.Timestamp, - Meta: m.Meta, - }) - } + return query, options, nil +} - return response +func writeStoreError(w http.ResponseWriter, code int, err error) { + writeResponse(w, &storepb.StoreQueryResponse{StatusCode: proto.Uint32(uint32(code)), StatusDesc: proto.String(err.Error())}, code) } -func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { +func (d *StoreQueryService) getV3Messages(w http.ResponseWriter, r *http.Request) { query, options, err := getStoreParams(r) if err != nil { writeStoreError(w, http.StatusBadRequest, err) @@ -202,11 +177,11 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - result, err := d.node.LegacyStore().Query(ctx, *query, options...) + result, err := d.node.Store().Request(ctx, query, options...) if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) + writeLegacyStoreError(w, http.StatusInternalServerError, err) return } - writeErrOrResponse(w, nil, toStoreResponse(result)) + writeErrOrResponse(w, nil, result.Response()) } diff --git a/cmd/waku/server/rest/store_api.yaml b/cmd/waku/server/rest/store_api.yaml index 41a313fe5..542de742d 100644 --- a/cmd/waku/server/rest/store_api.yaml +++ b/cmd/waku/server/rest/store_api.yaml @@ -1,203 +1,135 @@ -openapi: 3.0.3 -info: - title: Waku V2 node Store REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: store - description: Store REST API for WakuV2 node +# /store/v3/messages: +get: + summary: Gets message history + description: > + Retrieves Waku message history. The returned history + can be potentially filtered by optional request parameters. + operationId: getMessageHistory + tags: + - store + parameters: + - name: peerAddr + in: query + schema: + type: string + required: true + description: > + P2P fully qualified peer multiaddress + in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. + example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' -paths: - /store/v1/messages: - get: - summary: Gets message history + - name: includeData + in: query + schema: + type: string description: > - Retrieves WakuV2 message history. The returned history - can be potentially filtered by optional request parameters. - operationId: getMessageHistory - tags: - - store - parameters: - - name: peerAddr - in: query - schema: - type: string - required: true - description: > - P2P fully qualified peer multiaddress - in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. - example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' + Boolean indicating if the query should return messages (data) or hashes only. + A value of 'false' returns hashes only. + A value of 'true' returns hashes AND messages. + Default value is 'false' + example: 'true' - - name: pubsubTopic - in: query - schema: - type: string - description: > - The pubsub topic on which a WakuMessage is published. - If left empty, no filtering is applied. - It is also intended for pagination purposes. - It should be a URL-encoded string. - example: 'my%20pubsub%20topic' + - name: pubsubTopic + in: query + schema: + type: string + description: > + The pubsub topic on which a WakuMessage is published. + If left empty, no filtering is applied. + It is also intended for pagination purposes. + It should be a URL-encoded string. + example: 'my%20pubsub%20topic' - - name: contentTopics - in: query - schema: string - description: > - Comma-separated list of content topics. When specified, - only WakuMessages that are linked to any of the given - content topics will be delivered in the get response. - It should be a URL-encoded-comma-separated string. - example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' + - name: contentTopics + in: query + schema: string + description: > + Comma-separated list of content topics. When specified, + only WakuMessages that are linked to any of the given + content topics will be delivered in the get response. + It should be a URL-encoded-comma-separated string. + example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' - - name: startTime - in: query - schema: - type: string - description: > - The inclusive lower bound on the timestamp of - queried WakuMessages. This field holds the - Unix epoch time in nanoseconds as a 64-bits - integer value. - example: '1680590945000000000' + - name: startTime + in: query + schema: + type: string + description: > + The inclusive lower bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' - - name: endTime - in: query - schema: - type: string - description: > - The inclusive upper bound on the timestamp of - queried WakuMessages. This field holds the - Unix epoch time in nanoseconds as a 64-bits - integer value. - example: '1680590945000000000' + - name: endTime + in: query + schema: + type: string + description: > + The inclusive upper bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' - - name: senderTime - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - Represents the Unix time in nanoseconds at which a message was generated. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: '1680590947000000000' + - name: hashes + in: query + schema: + type: string + description: > + Comma-separated list of message hashes. + URL-base64-encoded string computed as a hash of messages. + Used to find messages by hash. + example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' - - name: storeTime - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - Represents the Unix time in nanoseconds at which a message was stored. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: '1680590945000000000' + - name: cursor + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + URL-base64-encoded string computed as a hash of a message. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' - - name: digest - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - URL-base64-encoded string computed as a hash of the - a message content topic plus a message payload. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' + - name: pageSize + in: query + schema: + type: string + description: > + Number of messages to retrieve per page + example: '5' - - name: pageSize - in: query - schema: - type: string - description: > - Number of messages to retrieve per page - example: '5' + - name: ascending + in: query + schema: + type: string + description: > + "true" for paging forward, "false" for paging backward. + If not specified or if specified with an invalid value, the default is "true". + example: "true" - - name: ascending - in: query + responses: + '200': + description: Waku message history. + content: + application/json: schema: - type: string - description: > - "true" for paging forward, "false" for paging backward - example: "true" - - responses: - '200': - description: WakuV2 message history. - content: - application/json: - schema: - $ref: '#/components/schemas/StoreResponse' - '400': - description: Bad request error. - content: - text/plain: - type: string - '412': - description: Precondition failed. - content: - text/plain: - type: string - '500': - description: Internal server error. - content: - text/plain: - type: string - -components: - schemas: - StoreResponse: - type: object - properties: - messages: - type: array - items: - $ref: '#/components/schemas/WakuMessage' - cursor: - $ref: '#/components/schemas/HistoryCursor' - error_message: - type: string - required: - - messages - - HistoryCursor: - type: object - properties: - pubsub_topic: - type: string - sender_time: - type: string - store_time: - type: string - digest: - type: string - required: - - pubsub_topic - - sender_time - - store_time - - digest - - WakuMessage: - type: object - properties: - payload: + $ref: '#/components/schemas/StoreQueryResponse' + '400': + description: Bad request error. + content: + text/plain: type: string - content_topic: + '412': + description: Precondition failed. + content: + text/plain: type: string - version: - type: integer - format: int32 - timestamp: - type: integer - format: int64 - ephemeral: - type: boolean - required: - - payload - - content_topic + '500': + description: Internal server error. + content: + text/plain: + type: string \ No newline at end of file diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index cd49feef6..6e4acb749 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -51,7 +51,8 @@ func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuR _ = NewDebugService(node, mux) _ = NewHealthService(node, mux) - _ = NewStoreService(node, mux) + _ = NewStoreQueryService(node, mux) + _ = NewLegacyStoreService(node, mux) _ = NewLightpushService(node, mux, log) listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port) diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 9eb3da40a..5ebc62e41 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -177,11 +177,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ } result := &Result{ - store: s, - messages: response.Messages, - storeRequest: storeRequest, - peerID: params.selectedPeer, - cursor: response.PaginationCursor, + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, } return result, nil @@ -213,12 +214,13 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { if r.IsComplete() { return &Result{ - store: s, - started: true, - messages: []*pb.WakuMessageKeyValue{}, - cursor: nil, - storeRequest: r.storeRequest, - peerID: r.PeerID(), + store: s, + started: true, + messages: []*pb.WakuMessageKeyValue{}, + cursor: nil, + storeRequest: r.storeRequest, + storeResponse: r.storeResponse, + peerID: r.PeerID(), }, nil } @@ -232,12 +234,13 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } result := &Result{ - started: true, - store: s, - messages: response.Messages, - storeRequest: storeRequest, - peerID: r.PeerID(), - cursor: response.PaginationCursor, + started: true, + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: r.PeerID(), + cursor: response.PaginationCursor, } return result, nil diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index 180d27de1..7bf2fa235 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -9,12 +9,13 @@ import ( // Result represents a valid response from a store node type Result struct { - started bool - messages []*pb.WakuMessageKeyValue - store *WakuStore - storeRequest *pb.StoreQueryRequest - cursor []byte - peerID peer.ID + started bool + messages []*pb.WakuMessageKeyValue + store *WakuStore + storeRequest *pb.StoreQueryRequest + storeResponse *pb.StoreQueryResponse + cursor []byte + peerID peer.ID } func (r *Result) Cursor() []byte { @@ -33,6 +34,10 @@ func (r *Result) Query() *pb.StoreQueryRequest { return r.storeRequest } +func (r *Result) Response() *pb.StoreQueryResponse { + return r.storeResponse +} + func (r *Result) Next(ctx context.Context) (bool, error) { if !r.started { r.started = true