Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I can not found Broadcast message to a connectionId #1

Closed
majidbigdeli opened this issue Jun 1, 2019 · 38 comments · Fixed by kataras/iris#1316
Closed

I can not found Broadcast message to a connectionId #1

majidbigdeli opened this issue Jun 1, 2019 · 38 comments · Fixed by kataras/iris#1316
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@majidbigdeli
Copy link

majidbigdeli commented Jun 1, 2019

How Sending message to specific user with neffos ?
I can not found Broadcast message to a connectionId

@kataras
Copy link
Owner

kataras commented Jun 1, 2019

Hello @majidbigdeli,

First of all I am very glad that you are posting questions while this library is still (on the final steps of) work-in-progress!

Secondly, you can already achieve this by connecting the connection to a room with the connection's ID as the room name, this way you can use the nsConn.Room("ID").Emit(...). (the same exactly way that the old Iris' websocket library was able to send To a specific connection but here you have to JoinRoom manually). But let me discuss your alternative below.

It's easy to do something like this, we just have to add a "To" field in the Message and check its value inside the connected namespaces (and rooms) on the Server#Broadcast but to complete a feature like this the "To" connection should be able to see who was the original client sender "From" (because you may want to print its name before the message's body). That was one of my internal questions I had when designing the library, if the library should or not to expose the connection ID (the original client publisher that server-side connection catch and calls its ()Server.Broadcast(Message{})) of the message to the subscribers (to other clients) or not and back then I decided that it's better to not do it for security reasons (the Server#IDGenerator can be used to set connections' IDs based on a custom algorithm, a developer may set this ID to a private database user's row's ID, so we should not expose the sender ID by a "From" field).

At the other hand, the "From" can be solved by the end-developer him/herself by using a custom object and set the Message.Body to the object's serialized form(i.e JSON), and deserialize it on read. An example is also available for encoding/json and even protobufs, check _examples/basic and _examples/protobuf.

As you probably already know from the Go and JavaScript examples,
so far the messages from Server#Broadcast are sent to all connections that are connected to the Message's specific Namespace (and room if Message { Room: "is_not_empty"}) like NSConn,Room#Emit(...) method but to all server's active connections (it does not use loop because it would require a lock to add/remove active connections, instead I used a custom sync.Cond therefore new connections are still able to connect and old can disconnect even while something is broadcasting).

However, we can have an optional Message.To string field among the Namespace and Room fields to decide if an active connection's Emit should be called AND a From field can be sent IF the given To field is not empty and first parameter of the Server#Broadcast is not nil [ OR if set-ed manually (better but this manualy-set of the From field behavior may not be expected by end-developers?)].

Sounds good to you?

@kataras
Copy link
Owner

kataras commented Jun 2, 2019

It's done @majidbigdeli, just fill the Message.To field.

server.Broadcast(c, Message{ To: "other connecton id", Namespace: "...", Room: "...", Body: ....})

I decided to not expose the from field from the Message at any case, end-developer can send and receive any custom data so they are able to fill if really required, as explained above.

kataras added a commit that referenced this issue Jun 2, 2019
…ested at: #1

keep note that namespaces and rooms can send to a group of conns but 1-1 too
@majidbigdeli
Copy link
Author

Hello my friend @kataras .
I read all your commits every day. (Because you are a great programmer)
I work In the Asterisk Manager Interface (AMI) project,
I need to work with the web socket.
I used socket.io before. But after I looked at the neffos project carefully.
I decided to use neffos instead of socket.io . neffos project is very clean project.

Finally, I think you made the right decision.
In my opinion, If someone needs to know who is the original client sender of the message, they can use the custom data.
I agree with this .(can be solved by the end-developer him/herself by using a custom object and set the Message.Body to the object's serialized form(i.e JSON), and deserialize it on read.)

@kataras Thanks for implementing Message.To. ❤️❤️

@kataras Thanks for implementing neffos and neffos.js . ❤️❤️

@kataras
Copy link
Owner

kataras commented Jun 2, 2019

This post of yours, my friend @majidbigdeli, is one of the most inspirational messages I have ever received for my work, I am really thankful!

Yes, I had a lot of issues working with socket.io at my past career as well. I think a library based on raw websockets is the best option nowdays, note that socket.io has a lot of steps to decide what subprotocol to use for communication(this is the top reason makes that uncompatible with nodejs ws and browser's websockets by-design) and a lot of code which is not even readable for most of the developers over the years. This is one of the main reasons I decided to design and code a decent alternative from zero, hopefully this will be more visible by next months when Iris v11.2 will be released as well ❤️

@kataras kataras added enhancement New feature or request good first issue Good for newcomers labels Jun 9, 2019
@lauretagabriel
Copy link

Hi @kataras, may I know the estimated date where this project will be fully integrated and stable with the Iris framework? Sorry to sound demanding but really looking forward to use this together with the Iris framework. Thank you!!

@kataras
Copy link
Owner

kataras commented Jun 20, 2019

Hello @lauretagabriel, I feel you, it's a good question. Expect it until the end of the next week, maybe a bit sooner.

@kataras
Copy link
Owner

kataras commented Jun 28, 2019

@lauretagabriel @majidbigdeli can I ask you to give some time in order to read and give me feedback about the new neffos wiki Getting Started page, is it understable enough for newcomers and for you? Generally offer some feedback on how to process with it if you want, I want to minimize the README and push a complete guide inside the project's wiki which can be converted to a special book on the future as well. Thank you a lot guys.

@kataras
Copy link
Owner

kataras commented Jun 29, 2019

@majidbigdeli Thanks a lot!

About JMeter, we can use JMeter with neffos only with the native-only messages because as you understand the JMeter and its plugins are using the websocket API but neffos has its own way to verify a client and to support namespaces, rooms and etc, so we can't use them directly with JMeter because it will stack on the "ack" part of the neffos, they will not pass unless the same neffos websocket client steps are produced. However, if you are an experienced JMeter user you are more fit to do that and share the results with the rest of us.

I'll add a wiki page about native messages, although Iris v11.2 has its own example already, but it looks like this:

img43

Also note that we have a stress-test for neffos specifically, it's located at: https://github.com/kataras/neffos/tree/master/_examples/stress-test

@kataras
Copy link
Owner

kataras commented Jun 29, 2019

@kataras
Copy link
Owner

kataras commented Jun 30, 2019

Thanks for your nice words and the greek translation too!!! I really appreciate it!!! I am so lucky having you around too! I answered you at the neffos.js project: kataras/neffos.js#2 (comment)

kataras added a commit to kataras/neffos.js that referenced this issue Jun 30, 2019
kataras added a commit that referenced this issue Jul 5, 2019
@kataras
Copy link
Owner

kataras commented Jul 5, 2019

Hey @majidbigdeli,

I don't want to depend on Iris for the neffos package because it will make its download heavier, instead I added this feature on the Iris side itself. Using the websocket.Handler which now accepts an optional second argument of func(ctx iris.Context) string:

import "github.com/kataras/iris/websocket"

[...]
irisIDGenerator := func(ctx iris.Context) string {
  return ctx.GetHeader("X-Username")
}
app.Get("/my_endpoint", websocket.Handler(websocketNeffosServer, irisIDGenerator))

Wiki: https://github.com/kataras/neffos/wiki/Getting-started#run-the-server

@kataras
Copy link
Owner

kataras commented Jul 7, 2019

Thank YOU @majidbigdeli, you are free to give me feedback about the wikis as well or if you found any spelling/language mistakes that I missed please let me know, meanwhile the iris documentation will be centralized to its wiki pages as well.

@majidbigdeli
Copy link
Author

@kataras
Yes.I read all the wiki and pdf pages. Thanks for the pdf That you sent me.
The wiki pages were well-ordered and I enjoyed it.
For pdf, I think the fonts should be more appropriate.
I'll read the wiki again.
Your work is commendable. 🙏🙏🙏

@kataras
Copy link
Owner

kataras commented Jul 10, 2019

@majidbigdeli Both server and client connections shares the same methods, for Conn you have to use the Conn.Ask and inside events, or nsConn := conn.Connect(...) you can also use the NSConn.Ask.

Why did you expect the server to have an Ask method? Which client/connection to ask?

method 1

// [server := neffos.New(..., events)]
server.OnConnect = func(c *neffos.Conn) error {
     nsConn :=  c.Connect(ctx, "namespace") // to force-connect a client connection to a namespace.
     // or let the client connect and use
     // nsConn := c.WaitConnect(ctx, "namespace") to wait for it.
    response, err := nsConn.Ask(ctx, neffos.Message{...})
}

method 2

var events = neffos.Events {
   // if you want to wait for a message at the namespace connected.
    neffos.OnNamespaceConnected: func(c *neffos.NSConn, msg neffos.Message) error {
         response, err := c.Ask(ctx, neffos.Message{...Event:"an event from which response will come"})
    },
    // if you want to wait for a message inside an event.
    "myEvent": func(c *neffos.NSConn, msg neffos.Message) error {
        response, err := c.Ask(ctx, neffos.Message{...Event: "otherClientEventOrTheSame"})
   },
}

@kataras
Copy link
Owner

kataras commented Jul 11, 2019

So you want to send something from a http request to a known upgraded websocket connection and block until response, you may want to check out the Server.Do method, example:

server.Do(func(c *neffos.Conn){
   if c.ID() == extensionMessage {
      response, err := c.Ask(ctx, neffos.Message{...})
      // [...]
   }
   
}, false)

Aboutthe server.Connect, yes you are right, sorry for my mistake, it couldn't be possible :P conn.Connect I meant, do it on:

server.OnConnect = func(c *neffos.Conn) error {
     nsConn :=  c.Connect(ctx, "namespace") // to force-connect a client connection to a namespace.
     // or let the client connect and use
     // nsConn := c.WaitConnect(ctx, "namespace") to wait for it.
     response, err := nsConn.Ask(ctx, neffos.Message{...})
}

@kataras
Copy link
Owner

kataras commented Jul 11, 2019

@majidbigdeli Thanks!!! Let's wait for a review then.

Also, I think you will find interesting the latest feature I've pushed as requested on #3

@kataras
Copy link
Owner

kataras commented Jul 11, 2019

Yes, Do is slow as documented but other options must be done manually, here are my two solutions on this:

  1. to cache the connections by yourself through a map[string]*neffos.Conn and register on server.OnConnect and retrieve by conn id manually, this would require to delete a connection on a server.OnDisconnect as well. Neffos does not keep a connection map like this because one or more connections may share the same ID and for other performance reasons.
  2. If you have a way to "link" the websocket connection and a route handler (i.e by userid?) you can also wrap a handler like this in order to have direct access to that conn:
func(c *Conn) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request){
            c.Ask...
   }
}

But to be honest, in your case I would change a different pattern, the problem is that you should not be in this path at the first place, maybe I can provide you a different approach if you could explain me the infostructure.


Yes I didn't expect to add a feature like this on the zero version as well, many websocket frameworks don't even have it and they are 4+ years old. I am coding a nats exchange too now, which has also a lightweight client library.

@majidbigdeli
Copy link
Author

majidbigdeli commented Jul 11, 2019

@kataras

I really excuse me. My English is not so good .

I have cron that run every 1 second

	c := cron.New()
	_ = c.AddFunc("@every 1s", func() {
		controller.NotificationHandler()
	})
	c.Start()

And

NotificationHandler()

func NotificationHandler() {
	mutex.Lock()

	//listExtensionNumber 
	var listExtensionNumber []model.IDTvp

	connections := Server.GetConnections()
	for c := range connections {
		var extentionNumber model.IDTvp
		connectionID, err := strconv.Atoi(c)
		if err != nil {
			fmt.Println(err)
		}
		extentionNumber.ID = connectionID
		listExtensionNumber = append(listExtensionNumber, extentionNumber)
	}

	//get list Notification from database with send list ExtensionNumber ;
	// Attention { list ExtensionNumber == list ConnectionID }
	listNotification, err := data.GetNotificationList(listExtensionNumber)

	if err != nil {
		fmt.Println(err)
		return
	}

	if listNotification != nil {
		if len(*listNotification) > 0 {
			for _, element := range *listNotification {
				//extensionNumber = my unique connection ID
				extensionNumber := strconv.Itoa(element.Number)
				Server.Broadcast(nil, neffos.Message{
					To:        extensionNumber,
					Namespace: variable.Agent,
					Event:     variable.OnNotification,
					Body:      neffos.Marshal(element),
				})
			}

		}
	}

	mutex.Unlock()

}

Attention { Extension Number == Connection Id }

I need to get a list of notifications with status = start from the database every 1 second
And send any notifications to an user, and if user receive the notification , I update status of notification in the database to send.

The problem is That if cron run again before update notification status ,a notification send again to user.

Because i use Server.Broadcast I can not wait for response so cron run again

Server.Broadcast(nil, neffos.Message{
					To:        extensionNumber,
					Namespace: variable.Agent,
					Event:     variable.OnNotification,
					Body:      neffos.Marshal(element),
				})

for solution 1 neffos have

func (s *Server) GetConnections() map[string]*Conn {
	conns := make(map[string]*Conn)

	s.mu.RLock()
	for c := range s.connections {
		conns[c.ID()] = c
	}
	s.mu.RUnlock()

	return conns
}

i can use
server.GetConnections()[extensionNumber].Ask(ctx, neffos.Message{...})

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

@majidbigdeli I've just pushed a Server.Ask(context, message) method, can you test it out and tell me the results?. If "msg.To" is filled then it sends the request to a specific connection otherwise the first responder's reply will be returned instead.

@majidbigdeli
Copy link
Author

@kataras .
I dont see reply() method in neffos.js .

@majidbigdeli
Copy link
Author

majidbigdeli commented Jul 12, 2019

@kataras
i get online connections with

	connections := Server.GetConnections()
	for c := range connections {
		var extentionNumber model.IDTvp
		connectionID, err := strconv.Atoi(c)
		if err != nil {
			fmt.Println(err)
		}
		extentionNumber.ID = connectionID
		listExtensionNumber = append(listExtensionNumber, extentionNumber)
	}

Because the server is blocked it is not good . because it call every 1s.
What is your offer to get online connections?

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

You don't need this @majidbigdeli, Reply(body) is just a helper for nsConn.Emit(incomingMsg.Event, body)

@majidbigdeli
Copy link
Author

helper for

thank you

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

i get online connections with Server.GetConnections(), what is your offer to get online connections? because it call every 1s.

Move your whole logic PER CONN inside the server.OnConnect. Have a function that makes a cron job for a connection and returns its job wrapper, Conn has a Set and Get methods which sets and gets from temp storageper connection, you can use it to store the cron job and use it to stop the job at the server.OnDisconnect, it's the best and most correct approach:

server.OnConnect = func(c * neffos.Conn) error {
   // a function that is a wrapper of your cron and contains 
   // a field with the connection to work with.
   cronJob := getCronJobForConnection(c)
   c.Set("cron", cronJob)

   // the OnDisconnect below will call its cronJob.Stop but for any case
   // ... inside your cronJob.Start method check for MyCronType.Conn.IsClosed()  - 
   // if it's closed then exit from go routine.
   go cronJob.Start()

   return nil
}

server.OnDisconnect  = func(c *neffos.Conn) {
     if v := c.Get("cron"); v != nil {
         // optional type check or v.(MyCronType).Stop()
         if cronJob, ok := v.(MyCronType);ok  {
                  job.Stop()
           }
     }
}

UPDATE Conn.Set and Conn.Get methods added.

@majidbigdeli
Copy link
Author

@kataras
I tested server.ask() it's work. thank you .

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

OK nice @majidbigdeli, check my updated answer above and tell me if that helped you with the cron thing, otherwise I will push an example :P So it's time to think a way to make this Server.Ask feature accessible when users use a StackExchange like redis or nats for scaling-out.

@majidbigdeli
Copy link
Author

majidbigdeli commented Jul 12, 2019

Do you suggest that for each connection makes a cron job ?
I have a lot of connections .
As a result, a large number of requests go to the database at a time.

please see https://github.com/majidbigdeli/neffosAmiClient

i send list of connectionId(extentionNumber) to database and i get list of notification

Update

my database is sql server and I send list of connectionId with Tvp to database

@majidbigdeli
Copy link
Author

OK nice @majidbigdeli, check my updated answer above and tell me if that helped you with the cron thing, otherwise I will push an example :P So it's time to think a way to make this Server.Ask feature accessible when users use a StackExchange like redis or nats for scaling-out.

yes . Thank you if you push an example.

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

Do you suggest that for each connection makes a cron job ?

No, I suggested to return a wrapper for this but now I see the whole code you use, and I understand some things, please answer the above so I can provide more help:

  • Each database entry ID is linked with a websocket connection ID, this link is done on server.IDGenerator
  • You have other database table with IDs subcollection in a database that are allowed to get notifications, not every database ID is allowed to get notified
  • and all these can change at runtime

So that you need is a Broadcast to more than one To fields that depend on a database runtime call.

My recommendation is very simple and efficient:

We could add a message.ToFunc(c *neffos.Conn) bool which will perform custom checks when a connection is allowed to send the broadcasted message from a server.Broadcast call.

In that ToFunc you will make a select database call filtered by the c.ID() and return true if that matches the db criteria to get notified.

Sounds good?

EDIT : However, such a feature like this couldn't be ever compatible with StackExchange across different neffos instances because of the func. So I have to ask you two more things:

  • Can you describe me the reason behind the decision to not use the Rooms feature instead?

  • If you know what connection ID is registered to be allowed to get notifications then on the Server.OnConnect you can make a database call and if criterias are passed then "force-connect" c.Connect(ctx, variable.Agent) a connection to a special namespace that will contain a notify/ or in ur case the variable.OnNotification event which will fire cron notifications to the client, the cliens need to declare that namespace and have that notify event which will push the incoming server nofications to the UI, they don't need to connect manually - server does it OnConnect. And on your NotificationHandler you can delete all the lines and mutexes that you use and just fire:

Server.Broadcast(nil, neffos.Message{
// this is the namespace that  should be force-connected on `Server.OnConnect`.
	Namespace: variable.Agent, 
	Event:     variable.OnNotification,
	Body:      neffos.Marshal(element),
})

With above you could easly call server.Broadcast and it would sent to the subcollections as you want without blocks and with scaling-out option available.

@majidbigdeli
Copy link
Author

I just need a List connection Id.
I get List connection Id with

var listConnectionId []string

	connections := Server.GetConnections()
	for c := range connections {
		listConnectionId = append(listConnectionId, c)
	}

@majidbigdeli
Copy link
Author

majidbigdeli commented Jul 12, 2019

UPDATE Conn.Set and Conn.Get methods added.

Thank you for this

@kataras
Copy link
Owner

kataras commented Jul 12, 2019

I just need a List connection Id.
I get List connection Id with

var listConnectionId []string

	connections := Server.GetConnections()
	for c := range connections {
		listConnectionId = append(listConnectionId, c)
	}

As I already mentioned neffos does not keep a collection like this because by-default there is no guarante that an ID is mapping only to one single connection and for other performance reasons.
So you have to do it by yourself, using channels or locks.

I am preparing an example for you, it will take some time because I do 1000 things in the same time:p

UPDATE I tried to follow the logic of your application and pushed an example at: https://github.com/kataras/neffos/blob/master/_examples/cronjob/main.go - that example shows various things that may help you in general, please tell me if that helped.

kataras added a commit that referenced this issue Jul 13, 2019
… can be done very easly but it's based on a problem described at: #1 (comment)
@kataras
Copy link
Owner

kataras commented Jul 13, 2019

@majidbigdeli neffos.reply is pushed on neffos.js, works exactly the same as the Go's side.

@majidbigdeli
Copy link
Author

majidbigdeli commented Jul 13, 2019

UPDATE I tried to follow the logic of your application and pushed an example at: https://github.com/kataras/neffos/blob/master/_examples/cronjob/main.go - that example shows various things that may help you in general, please tell me if that helped.

@kataras . I do not have any brother but you are like a brother Thank You.
Yes, that's exactly what I wanted.

I can not use neffos.message Without To Because websocket connection id 1 have ( message id 1 , messageId 2 ,messageId 3) and websocket connection id 2 have (messageId 4 , messageId 5) and websocket connection id 3 have (messageId 6 , messageId 8, messageId 12,) .

user with websocket connection Id 1 should not see messageId 5 .

I get all messages for all online users with send list websocket connection Id to database.

I do not intend to send all the messages to all online users. Each user must receive their messages.

I need list websocket connection Id without server block .
To determine messages for user I send list websocket connection Id to the database .

your example is very good..

@kataras
Copy link
Owner

kataras commented Jul 13, 2019

Thanks @majidbigdeli I will try to not let you down then!! I am very honored.

So that's exactly the example does, we will let it there, and if you want you can improve it, for other users that may have the same application requirements as yours, sounds good?

@majidbigdeli
Copy link
Author

@kataras Thank You .
Yes . I'm going to implement an real world project with iris web socket version 11.2 and push it into iris.
For example iris 11.2 with Asterisk Manager Interface (AMI) . Thank you so much.
I like to translate iris into Persian, but my English language is not good, but I will try.
@kataras Thank you for this amazing support.

@majidbigdeli
Copy link
Author

I don't want to depend on Iris for the neffos package because it will make its download heavier, instead I added this feature on the Iris side itself. Using the websocket.Handler which now accepts an optional second argument of func(ctx iris.Context) string:

import "github.com/kataras/iris/websocket"

[...]
irisIDGenerator := func(ctx iris.Context) string {
  return ctx.GetHeader("X-Username")
}
app.Get("/my_endpoint", websocket.Handler(websocketNeffosServer, irisIDGenerator))

Wiki: https://github.com/kataras/neffos/wiki/Getting-started#run-the-server

@kataras
ctx.GetHeader("X-Username") Always return "" ;

@kataras
Copy link
Owner

kataras commented Aug 2, 2019

You have right @majidbigdeli , this happens because the parsing of url params as headers (for browser side header settings) it's done after the Iris' id generator call.

Fixed now, it will be pushed on the Iris v11.2.3 and the neffos 0.0.9 version (I made a change here too which fixes this issue and other issue that an Iris ID generator was called before the handshake, which logically is wrong).

@kataras kataras closed this as completed in 9b3aafa Aug 3, 2019
github-actions bot pushed a commit to goproxies/github.aaakk.us.kg-kataras-iris that referenced this issue Jul 27, 2020
…quested at: kataras/neffos#1 (comment)

Former-commit-id: 0994b63373ebe2b5383a28f042aa2133061cbd18
github-actions bot pushed a commit to goproxies/github.aaakk.us.kg-kataras-iris that referenced this issue Jul 27, 2020
…/neffos#1 (comment)

Former-commit-id: 9753e3e45c7c24788b97814d3ecfb4b03f5ff414
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants