Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #2405: Add server <-> server transfers of pdarrays #2406

Merged
merged 38 commits into from
Sep 11, 2023

Conversation

bmcdonald3
Copy link
Contributor

@bmcdonald3 bmcdonald3 commented May 1, 2023

This PR adds the ability to send a pdarray from one server to
another server through ZMQ sockets. This can be useful when
multiple Arkouda servers would like to operate on smaller
subsets of one giant dataset that can't fit on each Arkouda
server individually. For example, one large "mother" Arkouda
server could load in the entire dataset, downselect the data
as needed for each "child" server, then send the portion of
that array to the child server that would like to operate on
that portion of the dataset.

Prior to this PR, this would need to be done by writing the
downselected portion from the mother server, reading those
files into the child server, and then operating. This aims
to replace the file system go-between of this use case,
simplifying the number of steps needed to get a pdarray
from one server to another and also potentially achieving
better performance than would be possible using the file
system.

The performance could be improved, especially for the case
where the sending/receiving servers are of different element
sizes, but this initial work is focused on correctness.

Performance numbers collected on a Cray CS HDR transfers over
100G HDR-IB:

Nodes Receiving 2 Receiving 3 Receiving 4
Sending 2 1.2385 GiB/s 0.6264 GiB/s 1.4077 GiB/s
Sending 3 0.6680 GiB/s 1.6672 GiB/s 0.6270 GiB/s
Sending 4 1.3586 GiB/s 0.7278 GiB/s 1.9504 GiB/s

Types supported in initial implementation with send_array/receive_array interface:

  • all native Arkouda datatypes (int, uint, bool, float)
  • categorical
  • strings
  • segmented array

Additionally, DataFrames containing columns of the above supported types can be transferred with a slightly different interface of DataFrame.transfer(hostname, port)/ak.receive_dataframe(hostname, port)

Closes: #2405

@hokiegeek2
Copy link
Contributor

@bmcdonald3 this looks cool. Question -> when an array is being transferred from one arkouda instance to another, the zmq ports of the sending and receive servers block until the array transfer completes, correct?

@stress-tess stress-tess requested a review from a team May 3, 2023 16:06
@bmcdonald3
Copy link
Contributor Author

@hokiegeek2 Yes, that is correct

@hokiegeek2
Copy link
Contributor

@bmcdonald3 Do you have any response time numbers for an xfer of a really large array like in the 50-100TB range? I may be wrong, but I am wondering if xfers of relatively large arrays may make the sending and receiving arkouda instances unresponsive for several minutes.

@bmcdonald3
Copy link
Contributor Author

@hokiegeek2 No, I don't have that information, but, based on the numbers above, we observed 0.1 GiB/s per-node as-is, and that was without targeting the high speed network (as mentioned, this code is unoptimized and there are clear next steps for optimization). If we assume this 50 TB array is over 50 nodes, that would be 5 GiB/s injection, so about 15 minutes.

But, aside from the raw numbers of this initial implementation, what I would say in response to that is that, since this aims to replace writing/reading files (the only way to accomplish moving data from one server to another today), if we are targeting the high speed network (as indicated, the numbers in the table in the original PR are not, but saw large speedups when targeting infiniband), we have a much higher potential injection rate than doing so through the filesystem. If we assume 100 GiB/s infiniband (actual number) as the high bound, and 25 GiB/s to read/write files as a high bound (optimistic numbers), that would mean this would go 8x faster than reading/writing the files (since with files we have to read and write, the injection rate would have to take into both the writing and reading stages, so the injection rate here from reading/writing is actually 12.5 GiB/s, even though we can read/write at 25 GiB/s, so 8x lower than the 100 GiB/s potential from transferring via sockets).

So, while this would block the server for a while when transferring a large array, I don't see that as unique to this operation and think that we would see significant improvements from transferring the files as opposed to reading/writing the files in terms of performance (which is equivalent to the amount of unresponsive time you mention).

@hokiegeek2
Copy link
Contributor

@bmcdonald3 gotcha, okay. Just curious, what kind of network did you test on?

@bmcdonald3
Copy link
Contributor Author

Good question! This was tested using 1G Ethernet (I've updated the original description to say that).

In practice, we wouldn't want to use Ethernet for these transfers in most cases, so those performance numbers aren't particularly valuable I don't think and they were really collected as a correctness test more than for the performance and Elliot advised against putting out that performance table, but the table looked nice and I had already collected the numbers, so I had to include it haha.

I plan to run some actual studies using IPoIB in the near future to test more realistic numbers, but since this initial implementation was focused on establishing a baseline with correctness, rather than performance, I have not had a chance to do a full sweep of that yet.

@stress-tess
Copy link
Member

@hokiegeek2 No, I don't have that information, but, based on the numbers above, we observed 0.1 GiB/s per-node as-is, and that was without targeting the high speed network (as mentioned, this code is unoptimized and there are clear next steps for optimization). If we assume this 50 TB array is over 50 nodes, that would be 5 GiB/s injection, so about 15 minutes.

But, aside from the raw numbers of this initial implementation, what I would say in response to that is that, since this aims to replace writing/reading files (the only way to accomplish moving data from one server to another today), if we are targeting the high speed network (as indicated, the numbers in the table in the original PR are not, but saw large speedups when targeting infiniband), we have a much higher potential injection rate than doing so through the filesystem. If we assume 100 GiB/s infiniband (actual number) as the high bound, and 25 GiB/s to read/write files as a high bound (optimistic numbers), that would mean this would go 8x faster than reading/writing the files (since with files we have to read and write, the injection rate would have to take into both the writing and reading stages, so the injection rate here from reading/writing is actually 12.5 GiB/s, even though we can read/write at 25 GiB/s, so 8x lower than the 100 GiB/s potential from transferring via sockets).

So, while this would block the server for a while when transferring a large array, I don't see that as unique to this operation and think that we would see significant improvements from transferring the files as opposed to reading/writing the files in terms of performance (which is equivalent to the amount of unresponsive time you mention).

@bmcdonald3 and @hokiegeek2 i gotta question. I think this is really interesting, im curious how this would compare in the case when you have multiple "child" servers? So if everyone on a team of N people wanted to play with their own copy of the filtered down dataset. With the transfer approach, we'd need to do N transfers (one for each "child" server). With the file system approach, we'd only have to do a single write and do N reads. Also (my ignorance of file IO is showing :) ) would it be possible for those reads to be done simultaneously? Because if so, it feels like for a large enough number of team-members the file system would be faster right? It's possible that number is too high to every realistically hit and it's possible that none of what i said made any sense lol

@stress-tess
Copy link
Member

Elliot advised against putting out that performance table, but the table looked nice and I had already collected the numbers, so I had to include it haha.

lmao the table does look nice 😁

@bmcdonald3
Copy link
Contributor Author

im curious how this would compare in the case when you have multiple "child" servers? So if everyone on a team of N people wanted to play with their own copy of the filtered down dataset. With the transfer approach, we'd need to do N transfers (one for each "child" server). With the file system approach, we'd only have to do a single write and do N reads. Also (my ignorance of file IO is showing :) ) would it be possible for those reads to be done simultaneously?

This is interesting question, but I think there are a lot of unknowns and this gets into a classic networking problem. If every single server did want the exact same dataset, then I think the most efficient way to do that would be to have the head server transfer to a child, then have that child become another worker node, so that child would then transfer it's data to another server, so you'd get something like: 1 node -> transfer -> 2 nodes -> transfer -> 4 nodes -> transfer -> 8 nodes ..., where each new child node is starting to do some transfers as well, so I think you could get log(N) time there.

This would definitely be a best case scenario though, it would be pretty hard to get a bunch of different people acting in unison to make something like that happen, but if the goal was just freeing up the one big server, then that one could just do one transfer and then have that child be the new server to farm things out, but all of this is speculative, since I don't have a concrete use case in mind that I know this is looking to help with, it is possible that IO is better, but it would depend on the use case.

What I was imagining based on past conversations was that every child server wanted a different portion of the dataset, which is a very different problem.

This is information that I stole from Elliot, so I can't say a ton about it, but networks and file systems typically have three different limits: limit for single thread transfer, limit for single node transfer, limit for total simultaneous transfer amongst all nodes. Inifiniband has a much higher total simultaneous transfer upper bound than the typical file system. So if we were really working with huge 50TB arrays like John mentioned, we would definitely be hitting that third bottleneck of total simultaneous transfer amongst all nodes, where if we were trying to read 50 TB in from the same filesystem on 5 different servers, that will basically be serialized and all the servers would be doing their reads one at a time, not in parallel. On much smaller data sets where you aren't capping out that total transfer rate on a single server, the parallelism from reading from multiple servers could come in handy, but that wouldn't be a case where the server was stalling for 15 minutes.

In summary, I am not an expert on this stuff and I think that the real value here is dependent on the actual use case in question.

@stress-tess
Copy link
Member

This is interesting question, but I think there are a lot of unknowns and this gets into a classic networking problem. If every single server did want the exact same dataset, then I think the most efficient way to do that would be to have the head server transfer to a child, then have that child become another worker node, so that child would then transfer it's data to another server, so you'd get something like: 1 node -> transfer -> 2 nodes -> transfer -> 4 nodes -> transfer -> 8 nodes ..., where each new child node is starting to do some transfers as well, so I think you could get log(N) time there.

Ooooo i didn't think about using the child servers to do transfers, that's clever

if we were trying to read 50 TB in from the same filesystem on 5 different servers, that will basically be serialized and all the servers would be doing their reads one at a time, not in parallel. On much smaller data sets where you aren't capping out that total transfer rate on a single server, the parallelism from reading from multiple servers could come in handy, but that wouldn't be a case where the server was stalling for 15 minutes.

Okay that makes a lot of sense! thanks ben/elliot

@bmcdonald3 bmcdonald3 force-pushed the array-transfer branch 6 times, most recently from 3ead159 to 5276ae3 Compare June 14, 2023 17:47
@bmcdonald3 bmcdonald3 force-pushed the array-transfer branch 2 times, most recently from 1405f97 to b64b394 Compare June 21, 2023 16:46
Copy link
Member

@stress-tess stress-tess left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial review, I got a couple questions mostly me not understanding ports and stuff that well

I didn't really dig into the chpl side too much. I figured i would let @hokiegeek2 and @Ethan-DeBandi99 take a crack at that first since they have more expertise. i'll take a look on my next pass through

arkouda/dataframe.py Outdated Show resolved Hide resolved
arkouda/categorical.py Outdated Show resolved Hide resolved
arkouda/dataframe.py Show resolved Hide resolved
arkouda/io.py Show resolved Hide resolved
arkouda/categorical.py Show resolved Hide resolved
src/TransferMsg.chpl Outdated Show resolved Hide resolved
src/TransferMsg.chpl Show resolved Hide resolved
src/TransferMsg.chpl Show resolved Hide resolved
Copy link
Contributor

@Ethan-DeBandi99 Ethan-DeBandi99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things to address. And one question for clarification.

arkouda/categorical.py Outdated Show resolved Hide resolved
arkouda/categorical.py Outdated Show resolved Hide resolved
arkouda/dataframe.py Show resolved Hide resolved
arkouda/categorical.py Show resolved Hide resolved
arkouda/dataframe.py Outdated Show resolved Hide resolved
src/TransferMsg.chpl Show resolved Hide resolved
src/TransferMsg.chpl Show resolved Hide resolved
@bmcdonald3 bmcdonald3 force-pushed the array-transfer branch 2 times, most recently from fc3d328 to a2bafed Compare July 11, 2023 20:03
@bmcdonald3 bmcdonald3 force-pushed the array-transfer branch 4 times, most recently from c6e2f86 to 77a436f Compare July 28, 2023 17:08
@stress-tess stress-tess merged commit cdc2457 into Bears-R-Us:master Sep 11, 2023
@bradcray
Copy link
Contributor

Sweet!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add server <-> server transfers of pdarrays
5 participants