Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

way to map/query across all workers #2

Open
bmeck opened this issue Oct 15, 2014 · 29 comments
Open

way to map/query across all workers #2

bmeck opened this issue Oct 15, 2014 · 29 comments

Comments

@bmeck
Copy link
Contributor

bmeck commented Oct 15, 2014

design / bikeshed

this is a common problem but being able to run a reduce across workers is important.

right now we can use a separate registry from the broker to generate a list of workers and run a reduce across them.

being able to wait on / run a reduce across all workers of a service would be a big win. technically all that is needed is the ability to:

  • get a list of workers for a service
  • send targeted messages to workers

since pigato is more than just a simple omdp we should discuss if this should be a service or built into the broker.

@prdn
Copy link
Owner

prdn commented Oct 15, 2014

Are you meaning a map reduce implemention in pigato?
I like it!

The best thing for me is to create a worker that handles this kind of
request.

I think that there should be no need to know how many workers are
available. A client when sends a REDUCE request could also define how many
workers use.
Then all the generated requests will be queued and any available worker for
the given service will be used automatically.

Please tell me if I misunderstood your thoughts.

Paolo

On 15 Oct 2014 17:59, "Bradley Meck" [email protected] wrote:

design / bikeshed

this is a common problem but being able to run a reduce across workers is
important.

right now we can use a separate registry from the broker to generate a
list of workers and run a reduce across them.

being able to wait on / run a reduce across all workers of a service
would be a big win. technically all that is needed is the ability to:

get a list of workers for a service
send targeted messages to workers

since pigato is more than just a simple omdp we should discuss if this
should be a service or built into the broker.


Reply to this email directly or view it on GitHub.

@bmeck
Copy link
Contributor Author

bmeck commented Oct 15, 2014

yes this is a map reduce across workers, where the workers are the source of state. In our usage we have workers sitting on boxes monitoring processes. Since the processes are tied to physical machines we cannot run out to a shared DB etc. to get a single source of data.

so if we want to query all workers, somehow. We can do the reduce client side, or server side. I would prefer not to send a reduce method over the wire like many map/reduce impls.

Renaming issue to just be a global map()

@bmeck bmeck changed the title way to run a reduce across workers way to map/query across all workers Oct 15, 2014
@bmeck
Copy link
Contributor Author

bmeck commented Nov 24, 2014

@prdn starting impl, will need a new set of queueing on the controller.

controller.wrq[workerId] worker request queue, if the queue matches [workerId] will pick up work from this rather than calling dispatch. Means that mapping could starve srq but you might have a better idea.

@bmeck
Copy link
Contributor Author

bmeck commented Nov 24, 2014

just going to have map send normal looking requests to all workers, they should be indistinguishable for the worker. unless you can think of a reason why workers need to know that this specific request a is a map request.

@prdn
Copy link
Owner

prdn commented Nov 24, 2014

I think that a worker shouldn't know that a request is map request.

I have a couple of questions to better suggest a solution.

How do you know that all required workers are connected to the broker?
The client will specify how many workers are required?
Who will perform the reducing operation?
And it is important to remember that the client could receive incomplete replies due the fact that all not required workers were connected.

We may think to a slightly different pattern. For example a broadcast request from the client to a specific service. All the workers will answer to that request.

Let me know what do you think

@bmeck
Copy link
Contributor Author

bmeck commented Nov 24, 2014

How do we know that all required workers are connected?

You don't, just query what you have and see if the results have the needed info

How many workers required?

I was just going to queue the request to all currently connected workers for a service

Who performs the reduce?

Reduce will be left to the client. Since the request is not seen as a map request, the broker needs to tag responses for multiplexing purposes.

Incomplete replies

Unsure what you mean, if we are pulling multiple streams of data the concept of a complete reply would be difficult. We may not know when a stream ends (until it does).

@prdn
Copy link
Owner

prdn commented Nov 24, 2014

Ok now it's more clear. Thank you.

If I've well understood the client receives a single multiplexed steam from the broker that is the result of all the workers streams. Is it correct?

@bmeck
Copy link
Contributor Author

bmeck commented Nov 24, 2014

yes

@prdn
Copy link
Owner

prdn commented Nov 24, 2014

Ok so the client could not have any builtin logic of reducing but receives a stream as normally.
I like it.

@prdn
Copy link
Owner

prdn commented Nov 24, 2014

I have to think why we need a new queue.
Starting from the request id the broker could multiplex the streams from multiple brokers in one single steam.

@bmeck
Copy link
Contributor Author

bmeck commented Dec 1, 2014

@prdn let me know if you have any thoughts to avoid the new queue.

right now we need a new queue because we can only queue based upon service name and not target requests to workers. if we wait for a dispatch on a service and see the queued request is not for our service id we would need to skip that request (but keep it at the front of the queue to avoid starvation). I did not see a sanctioned way to do this.

@prdn
Copy link
Owner

prdn commented Dec 2, 2014

Ok.
Let's do this.
In the future we may merge the two queues in a more generic one. Can't say before seeing what you have in mind.

Thank you

@alexeygolev
Copy link
Contributor

Maybe I'm wrong, but isn't ventilator/sink pattern a better fit for this?

@bmeck
Copy link
Contributor Author

bmeck commented Jan 8, 2015

@alexeygolev it would be a better fit in the sense of doing parallel tasks but we start to get into territory where zmq is not exactly a good fit to my experience. We would need a multiplexed sink and ventilator per service on the broker. Preferably without opening a port per sink/ventilator. If you know a good way to do this I would be game and it would make things easier.

@alexeygolev
Copy link
Contributor

@bmeck sounds like a mission... will research/play around

@bmeck
Copy link
Contributor Author

bmeck commented Jan 8, 2015

@alexeygolev one thing to note is service workers may contain state so we need to be sure that the query hits all the workers

@prdn
Copy link
Owner

prdn commented Jan 9, 2015

Guys.
What about this solution..

  • a client send a map request on a particular service (i.e. '$mapreduce'). The request contains in his payload the real service for the request (es. 'echo') and for example the minimum number of workers needed
  • a 'special' worker handles that type of requests
  • when the worker receives a $mapreduce request it asks the list of available workers in the 'echo' service
  • the $mapreduce worker generates a request for each one of the 'echo' workers (including in the request also the respective workerId that must handle that request)
  • the $mapreduce worker collects partial and final replies and replies back to the client

To implement this scenario we have only to:

  • create a specialised worker that handles $mapreduce requests
  • add to the broker the possibility to be queried on the available workers for a given service (we can set an authentication for this kind of query)
  • add the broker to serve a request to a particular workerId

This approach is interesting for me because there are only little modifications for the broker and we can move the complexity to specialised workers.
In the future we may implement other patterns without increasing broker complexity.

What do you think about this?

@bmeck
Copy link
Contributor Author

bmeck commented Jan 9, 2015

as long as there is a way to query all of the workers reliably thats fine,
but seems to complicate things since you have to manage mapreduce
separately and cannot just grab a normal query from all.

On Fri, Jan 9, 2015 at 5:17 AM, Paolo Ardoino [email protected]
wrote:

Guys.
What about this solution..

  • a client send a map request on a particular service (i.e.
    '$mapreduce'). The request contains in his payload the real service for the
    request (es. 'echo') and for example the minimum number of workers needed
  • a 'special' worker handles that type of requests
  • when the worker receives a $mapreduce request it asks the list of
    available workers in the 'echo' service
  • the $mapreduce worker generates a request for each one of the 'echo'
    workers (including in the request also the respective workerId that must
    handle that request)
  • the $mapreduce worker collects partial and final replies and replies
    back to the client

To implement this scenario we have only to:

  • create a specialised worker that handles $mapreduce requests
  • add to the broker the possibility to be queried on the available
    workers for a given service (we can set an authentication for this kind of
    query)
  • add the broker to serve a request to a particular workerId

This approach is interesting for me because there are only little
modifications for the broker and we can move the complexity to specialised
workers.
In the future we may implement other patterns without increasing broker
complexity.

What do you think about this?


Reply to this email directly or view it on GitHub
#2 (comment).

@prdn
Copy link
Owner

prdn commented Jan 9, 2015

For the Client this is totally transparent.
We may for example leave the Client send requests to the 'echo' service and
specify the option $mapreduce=true in the options object.
Basically the multiplex-logic is moved to a Worker instead of keeping it in
the Worker.

On Fri Jan 09 2015 at 12:55:51 PM Bradley Meck [email protected]
wrote:

as long as there is a way to query all of the workers reliably thats fine,
but seems to complicate things since you have to manage mapreduce
separately and cannot just grab a normal query from all.

On Fri, Jan 9, 2015 at 5:17 AM, Paolo Ardoino [email protected]
wrote:

Guys.
What about this solution..

  • a client send a map request on a particular service (i.e.
    '$mapreduce'). The request contains in his payload the real service for
    the
    request (es. 'echo') and for example the minimum number of workers
    needed
  • a 'special' worker handles that type of requests
  • when the worker receives a $mapreduce request it asks the list of
    available workers in the 'echo' service
  • the $mapreduce worker generates a request for each one of the 'echo'
    workers (including in the request also the respective workerId that must
    handle that request)
  • the $mapreduce worker collects partial and final replies and replies
    back to the client

To implement this scenario we have only to:

  • create a specialised worker that handles $mapreduce requests
  • add to the broker the possibility to be queried on the available
    workers for a given service (we can set an authentication for this kind
    of
    query)
  • add the broker to serve a request to a particular workerId

This approach is interesting for me because there are only little
modifications for the broker and we can move the complexity to
specialised
workers.
In the future we may implement other patterns without increasing broker
complexity.

What do you think about this?


Reply to this email directly or view it on GitHub
#2 (comment).


Reply to this email directly or view it on GitHub
#2 (comment).

@bmeck
Copy link
Contributor Author

bmeck commented Apr 10, 2015

@prdn are we tied to the special worker approach, would a PR for allowing normal workers to act as part of a reduce be fine?

@prdn
Copy link
Owner

prdn commented Apr 10, 2015

We are not tied to that approach. I'm truly interested in seeing your PR.
I've started to work on broadcasting and related thing to reach then the map-reduce functionality, but I'm open to anything.

@prdn
Copy link
Owner

prdn commented Apr 10, 2015

@bmeck could you explain a little bit which are the proposed modifications to the worker and how it can impact the current behaviour.
I wish only to remain as generic as possible with base classes

@bmeck
Copy link
Contributor Author

bmeck commented Apr 10, 2015

This would be completely transparent to the worker. It would require that the broker be able to multiplex receiving multiple streams from different workers. To the worker there would be no need to know that it is being asked as a batch request. The map function is a bit more complicated though, we can perform it client side or via a well known service. In a pseudocode sort of summary it looks like:

// client sends a batch request
request = client.queryBatch(time_service, get_time);
// ... on the broker
broker.startMultipler(get_all_services(time_service))
//
// dispatches go out when appropriate
//
// ... on any of the workers requested
worker.sendToBroker(some_data)
// ... on the broker
broker.sendToClient(multiplex(worker.id, some_data))
// ... on the client
// for each worker that replies
request.on('reply', function (rep) {
  // handle our reply like normal
});
//
// continues until all workers end
//
broker.sendBatchDoneToClient()

In this case the broker and the client need to be altered, but not the Worker. This means that you can send batch requests without any issues.

Now onto mapping...

Mapping is a bit interesting because I don't think it should be in the broker at all. For now I think it should live on the client and the client should just deal w/ the multiple replies as it comes in. We could do the map on the broker or make a well known function signature for how to accept a map request; but I think that should be put off until the ups/downs of this are more visible.

@bmeck
Copy link
Contributor Author

bmeck commented Apr 10, 2015

as a side note we are faking this in a similar way to do multiplexed responses from a worker and sending back Server Sent Events; just would be using this on the broker for multiplexing worker responses rather than parts of the worker's reply stream.

@prdn
Copy link
Owner

prdn commented Apr 11, 2015

If I well understand the Broker should offer a way to retrieve all workers available for a given service.
The client is the real multiplexer right?

For example, let's say that the Broker answers to a Client request called $directory giving all the workerIds belonging to a given service.
Then the client sends N requests (N = number of workers). Each Request specifies the workerId (possible in the pigato master). Then the Client receives all the requests and multiplexes them into a single stream.

Does this correspond to your scenario?

@bmeck
Copy link
Contributor Author

bmeck commented Apr 11, 2015

in my scenario the client uses 1 connection and the broker makes multiple
connections. The client is demultiplexing a single stream into multiple
replies sent from that single connection to the broker.

We could make the client do a request for all the current workers of a
service, then request data from each one; however, this would be multiple
connections for the client which also means many connections through the
broker. I am not sure if there would be an advantage to doing it this way.

On Sat, Apr 11, 2015 at 5:23 AM, Paolo Ardoino [email protected]
wrote:

If I well understand the Broker should offer a way to retrieve all workers
available for a given service.
The client is the real multiplexer right?

For example, let's say that the Broker answers to a Client request called
$directory giving all the workerIds belonging to a given service.
Then the client sends N requests (N = number of workers). Each Request
specifies the workerId (possible in the pigato master). Then the Client
receives all the requests and multiplexes them into a single stream.

Does this correspond to your scenario?


Reply to this email directly or view it on GitHub
#2 (comment).

@prdn
Copy link
Owner

prdn commented Apr 11, 2015

in my suggested scenario the client will use a single connection but sends multiple requests on that connection. The broker maintains its simplicity and let requests flow as normal.
You are probably right that your solution is a bit more performing but on the other hand it implies adding complexity to the broker.

If it is not too much effort for you, don't you mind if we start with a client demultiplexer?
But if you prefer and feel more comfortable with your scenario and we are able to keep the actual broker low complexity, please go ahead with that and we will discuss that eventually in the future.

@bmeck
Copy link
Contributor Author

bmeck commented Apr 11, 2015

we can do it client multiplexed first, but then we will need to expose what the worker ids are, which is fine.

@prdn
Copy link
Owner

prdn commented Apr 12, 2015

@bmeck please take a look to this commit f7127f2 , this file https://github.com/prdn/pigato/blob/master/services/Directory.js and this test https://github.com/prdn/pigato/blob/master/test/directory.js

Basically now I have added a new socket to the Broker (inproc) that is used by the Broker to publish its internal status to subscribed core services.
The first core service is Directory that offers a '$dir' service that sends to a client an array of workerIds belonging to the given service.

Now we should have all the pieces for the map-reduce.
This approach also let us to create other services very easily.

Let me know what do you think about this.

@github-staff github-staff deleted a comment from AndreSabbagh Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants