Skip to content

Commit

Permalink
feat(nwaku)_: receive messages via relay
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Dec 10, 2024
1 parent cc2022b commit 648bff7
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 22 deletions.
2 changes: 1 addition & 1 deletion third_party/nwaku
Submodule nwaku updated 49 files
+65 −2 CHANGELOG.md
+2 −2 README.md
+1 −1 apps/chat2/chat2.nim
+1 −1 apps/chat2bridge/chat2bridge.nim
+3 −2 apps/liteprotocoltester/diagnose_connections.nim
+10 −2 apps/liteprotocoltester/filter_subscriber.nim
+1 −1 apps/liteprotocoltester/infra.env
+3 −1 apps/liteprotocoltester/lightpush_publisher.nim
+3 −3 apps/liteprotocoltester/lpt_metrics.nim
+16 −8 apps/liteprotocoltester/service_peer_management.nim
+39 −1 apps/liteprotocoltester/statistics.nim
+1 −1 apps/liteprotocoltester/tester_config.nim
+1 −1 apps/liteprotocoltester/tester_message.nim
+14 −11 apps/networkmonitor/networkmonitor.nim
+1 −1 examples/README.md
+15 −10 library/events/json_message_event.nim
+4 −2 library/libwaku.nim
+20 −0 library/utils.nim
+6 −4 library/waku_thread/inter_thread_communication/requests/discovery_request.nim
+16 −25 library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim
+5,924 −553 metrics/waku-network-monitor-dashboard.json
+1 −1 tests/factory/test_node_factory.nim
+1 −1 tests/test_waku_dnsdisc.nim
+56 −44 tests/test_waku_rendezvous.nim
+1 −0 tests/testlib/wakunode.nim
+1 −1 tests/waku_rln_relay/test_wakunode_rln_relay.nim
+2 −2 tests/wakunode2/test_app.nim
+1 −1 tests/wakunode2/test_validators.nim
+1 −1 vendor/nim-dnsdisc
+1 −1 vendor/nimbus-build-system
+10 −0 waku/discovery/waku_discv5.nim
+13 −11 waku/discovery/waku_dnsdisc.nim
+9 −2 waku/factory/external_config.nim
+3 −6 waku/factory/node_factory.nim
+55 −23 waku/factory/waku.nim
+15 −15 waku/node/waku_node.nim
+4 −4 waku/waku_api/rest/admin/types.nim
+2 −1 waku/waku_api/rest/debug/types.nim
+7 −7 waku/waku_api/rest/filter/types.nim
+1 −1 waku/waku_api/rest/health/types.nim
+1 −1 waku/waku_api/rest/lightpush/types.nim
+1 −1 waku/waku_api/rest/relay/types.nim
+2 −2 waku/waku_api/rest/serdes.nim
+7 −0 waku/waku_core/peers.nim
+3 −0 waku/waku_rendezvous.nim
+36 −0 waku/waku_rendezvous/common.nim
+267 −0 waku/waku_rendezvous/protocol.nim
+8 −0 waku/waku_rln_relay/group_manager/group_manager_base.nim
+19 −4 waku/waku_rln_relay/protocol_metrics.nim
79 changes: 79 additions & 0 deletions wakuv2/common/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package common

import (
"encoding/json"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

// Envelope contains information about the pubsub topic of a WakuMessage
// and a hash used to identify a message based on the bytes of a WakuMessage
// protobuffer
type Envelope interface {
Message() *pb.WakuMessage
PubsubTopic() string
Hash() pb.MessageHash
}

type envelopeImpl struct {
msg *pb.WakuMessage
topic string
hash pb.MessageHash
}

type tmpWakuMessageJson struct {
Payload []byte `json:"payload,omitempty"`
ContentTopic string `json:"contentTopic,omitempty"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
Ephemeral *bool `json:"ephemeral,omitempty"`
RateLimitProof []byte `json:"proof,omitempty"`
}

type tmpEnvelopeStruct struct {
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
PubsubTopic string `json:"pubsubTopic"`
MessageHash string `json:"messageHash"`
}

// NewEnvelope creates a new Envelope from a json string generated in nwaku
func NewEnvelope(jsonEventStr string) (Envelope, error) {
tmpEnvelopeStruct := tmpEnvelopeStruct{}
err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct)
if err != nil {
return nil, err
}

hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
if err != nil {
return nil, err
}

return &envelopeImpl{
msg: &pb.WakuMessage{
Payload: tmpEnvelopeStruct.WakuMessage.Payload,
ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic,
Version: tmpEnvelopeStruct.WakuMessage.Version,
Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp,
Meta: tmpEnvelopeStruct.WakuMessage.Meta,
Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral,
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
},
topic: tmpEnvelopeStruct.PubsubTopic,
hash: pb.ToMessageHash(hash),
}, nil
}

func (e *envelopeImpl) Message() *pb.WakuMessage {
return e.msg
}

func (e *envelopeImpl) PubsubTopic() string {
return e.topic
}

func (e *envelopeImpl) Hash() pb.MessageHash {
return e.hash
}
5 changes: 2 additions & 3 deletions wakuv2/common/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.uber.org/zap"

"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"

"github.com/status-im/status-go/logutils"

Expand Down Expand Up @@ -41,7 +40,7 @@ type MessageParams struct {
// ReceivedMessage represents a data packet to be received through the
// WakuV2 protocol and successfully decrypted.
type ReceivedMessage struct {
Envelope *protocol.Envelope // Wrapped Waku Message
Envelope Envelope // Wrapped Waku Message

MsgType MessageType

Expand Down Expand Up @@ -105,7 +104,7 @@ type MemoryMessageStore struct {
messages map[common.Hash]*ReceivedMessage
}

func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
func NewReceivedMessage(env Envelope, msgType MessageType) *ReceivedMessage {
ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic)
if err != nil {
logutils.ZapLogger().Debug("failed to extract content topic from message",
Expand Down
97 changes: 79 additions & 18 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -380,7 +381,7 @@ const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute

type SentEnvelope struct {
Envelope *protocol.Envelope
Envelope common.Envelope
PublishMethod publish.PublishMethod
}

Expand Down Expand Up @@ -580,7 +581,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
}
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
waku.bandwidthCounter = metrics.NewBandwidthCounter()
if nodeKey == nil {
Expand Down Expand Up @@ -1020,12 +1020,12 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P
w.logger.Error("could not unsubscribe", zap.Error(err))
}
return
// TODO-nwaku
/*case env := <-sub[0].Ch:

case env := <-w.node.MsgChan:
err := w.OnNewEnvelopes(env, common.RelayedMessageType, false)
if err != nil {
w.logger.Error("OnNewEnvelopes error", zap.Error(err))
}*/
}
}
}
}()
Expand Down Expand Up @@ -1363,7 +1363,7 @@ func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
}

// OnNewEnvelope is an interface from Waku FilterManager API that gets invoked when any new message is received by Filter.
func (w *Waku) OnNewEnvelope(env *protocol.Envelope) error {
func (w *Waku) OnNewEnvelope(env common.Envelope) error {
return w.OnNewEnvelopes(env, common.RelayedMessageType, false)
}

Expand Down Expand Up @@ -1503,6 +1503,7 @@ func (w *Waku) Start() error {
w.node.FilterLightnode(),
filterapi.WithBatchInterval(300*time.Millisecond))
}
*/

err = w.setupRelaySubscriptions()
if err != nil {
Expand All @@ -1514,7 +1515,6 @@ func (w *Waku) Start() error {
w.wg.Add(1)
go w.processQueueLoop()
}
*/

w.wg.Add(1)

Expand Down Expand Up @@ -1748,7 +1748,7 @@ func (w *Waku) Stop() error {
return nil
}

func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
func (w *Waku) OnNewEnvelopes(envelope common.Envelope, msgType common.MessageType, processImmediately bool) error {
if envelope == nil {
return nil
}
Expand Down Expand Up @@ -2326,7 +2326,7 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,

ctx, cancel := context.WithCancel(context.Background())

wakunode, err := newWakuNode(ctx, nwakuCfg)
wakunode, err := newWakuNode(ctx, nwakuCfg, logger)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -2360,20 +2360,49 @@ func wakuNew(nodeKey *ecdsa.PrivateKey,
onPeerStats: onPeerStats,
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
filters: common.NewFilters(cfg.DefaultShardPubsubTopic, logger),
}, nil

}

//export globalEventCallback
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
// This is shared among all Golang instances
// TODO-nwaku
// self := Waku{wakuCtx: userData}
// self.MyEventCallback(callerRet, msg, len)
// The event callback sends back the node's ctx to know to which
// node is the event being emited for. Since we only have a global
// callback in the go side, We register all the nodes that we create
// so we can later obtain which instance of `WakuNode` is should
// be invoked depending on the ctx received

var nodeRegistry map[unsafe.Pointer]*WakuNode

func init() {
nodeRegistry = make(map[unsafe.Pointer]*WakuNode)
}

func (self *Waku) MyEventCallback(callerRet C.int, msg *C.char, len C.size_t) {
fmt.Println("Event received:", C.GoStringN(msg, C.int(len)))
func registerNode(node *WakuNode) {
_, ok := nodeRegistry[node.wakuCtx]
if !ok {
nodeRegistry[node.wakuCtx] = node
}
}

func unregisterNode(node *WakuNode) {
delete(nodeRegistry, node.wakuCtx)
}

//export globalEventCallback
func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
if callerRet == C.RET_OK {
eventStr := C.GoStringN(msg, C.int(len))
node, ok := nodeRegistry[userData]
if ok {
node.OnEvent(eventStr)
}
} else {
errMsgField := zap.Skip()
if len != 0 {
errMsgField = zap.String("error", C.GoStringN(msg, C.int(len)))
}
log.Error("globalEventCallback retCode not ok", zap.Int("retCode", int(callerRet)), errMsgField)
}
}

type response struct {
Expand All @@ -2396,10 +2425,12 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
// WakuNode represents an instance of an nwaku node
type WakuNode struct {
wakuCtx unsafe.Pointer
logger *zap.Logger
cancel context.CancelFunc
MsgChan chan common.Envelope
}

func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
func newWakuNode(ctx context.Context, config *WakuConfig, logger *zap.Logger) (*WakuNode, error) {
ctx, cancel := context.WithCancel(ctx)

n := &WakuNode{
Expand Down Expand Up @@ -2440,6 +2471,8 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {

wg.Add(1)
n.wakuCtx = C.cGoWakuNew(cJsonConfig, resp)
n.MsgChan = make(chan common.Envelope, 100)
n.logger = logger.Named("nwaku")
wg.Wait()

// Notice that the events for self node are handled by the 'MyEventCallback' method
Expand All @@ -2448,6 +2481,32 @@ func newWakuNode(ctx context.Context, config *WakuConfig) (*WakuNode, error) {
return n, nil
}

type jsonEvent struct {
EventType string `json:"eventType"`
}

func (n *WakuNode) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
n.logger.Error("could not unmarshal nwaku event string", zap.Error(err))
return
}

switch jsonEvent.EventType {
case "message":
n.parseMessageEvent(eventStr)
}
}

func (n *WakuNode) parseMessageEvent(eventStr string) {
envelope, err := common.NewEnvelope(eventStr)
if err != nil {
n.logger.Error("could not parse message", zap.Error(err))
}
n.MsgChan <- envelope
}

func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, error) {
var pubsubTopic string
if len(optPubsubTopic) == 0 {
Expand Down Expand Up @@ -2811,6 +2870,7 @@ func (n *WakuNode) Start() error {
C.cGoWakuStart(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
registerNode(n)
return nil
}

Expand All @@ -2828,6 +2888,7 @@ func (n *WakuNode) Stop() error {
C.cGoWakuStop(n.wakuCtx, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
unregisterNode(n)
return nil
}

Expand Down

0 comments on commit 648bff7

Please sign in to comment.