Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server <-> server transfers of pdarrays #2405

Closed
bmcdonald3 opened this issue May 1, 2023 · 0 comments · Fixed by #2406
Closed

Add server <-> server transfers of pdarrays #2405

bmcdonald3 opened this issue May 1, 2023 · 0 comments · Fixed by #2406
Assignees

Comments

@bmcdonald3
Copy link
Contributor

bmcdonald3 commented May 1, 2023

Add the ability to send a pdarray from one server to
another server through ZMQ sockets. This can be useful when
multiple Arkouda servers would like to operate on smaller
subsets of one giant dataset that can't fit on each Arkouda
server individually. For example, one large "mother" Arkouda
server could load in the entire dataset, down-select the data
as needed for each "child" server, then send the portion of
that array to the child server that would like to operate on
that portion of the dataset.

Prior to this PR, this would need to be done by writing the
down-selected portion from the mother server, reading those
files into the child server, and then operating. This aims
to replace the file system go-between of this use case,
simplifying the number of steps needed to get a pdarray
from one server to another and also potentially achieving
better performance than would be possible using the file
system.

Usage:

Server-1 ("mother" server):

>>> a = ak.read('my_file') # file that's too big for other servers
>>> b = a[0:int((a.size/2))] # get a portion of the array to send to child
>>> b.send_array("child-name", 1234) # send smaller array to server running on "child-name"

Server-2 ("child" server):

a = ak.receive_array("mother-name", 1234) # receive array from "mother-name"
                                    # port must match the port from the `send` call

Types supported in initial implementation with send_array/receive_array interface:

  • all native Arkouda datatypes (int, uint, bool, float)
  • categorical
  • strings
  • segmented array

Additionally, DataFrames containing columns of the above supported types can be transferred with a slightly different interface of DataFrame.transfer(hostname, port)/ak.receive_dataframe(hostname, port)

@bmcdonald3 bmcdonald3 self-assigned this May 1, 2023
stress-tess added a commit that referenced this issue Sep 11, 2023
* Add server <-> server transfers of pdarrays
This PR adds the ability to send a pdarray from one server to
another server through ZMQ sockets. This can be useful when
multiple Arkouda servers would like to operate on smaller
subsets of one giant dataset that can't fit on each Arkouda
server individually. For example, one large "mother" Arkouda
server could load in the entire dataset, downselect the data
as needed for each "child" server, then send the portion of
that array to the child server that would like to operate on
that portion of the dataset.

Prior to this PR, this would need to be done by writing the
downselected portion from the mother server, reading those
files into the child server, and then operating. This aims
to replace the file system go-between of this use case.

The performance could be improved, especially for the case
where the sending/receiving servers are of different element
sizes, but this initial work is focused on correctness.

Performance numbers collected on a Cray CS HDR:

| Nodes     | Receiving 2  | Receiving 3  | Receiving 4  |
| --------: | -----------: | -----------: | -----------: |
| Sending 2 | 0.2101 GiB/s | 0.1029 GiB/s | 0.2062 GiB/s |
| Sending 3 | 0.1034 GiB/s | 0.2995 GiB/s | 0.1004 GiB/s |
| Sending 4 | 0.2026 GiB/s | 0.1041 GiB/s | 0.4035 GiB/s |

There are clear next steps for performance improvements, such
as targetting the high-speed network (such as infiniband) when
it is available, which was observed to improve performance by
~10x in initial trials.

* Cleanup

* Add docstring and cleanup

* Add transfer message to ServerModules

* Fix flake8

* Make 129 compatible

* Fix bux when sending and recieving nodes are the same

* Remove unnecessary transfer

* Start adding multiple objtypes

* Add server-side transfers of seg strings, still need client

* Add strings sending/receiving to client side

* Working for integer segmented arrays

* Save

* Pull out sendSetupInfo

* Working for int dataframe transfer

* Update

* fix flake8

* save

* Remove last `c_memcpy` deprecation warnings
Certain code is only compiled when `CHPL_COMM!=none` and since most
of our compilations were with `CHPL_COMM=none`, a couple of last
`c_memcpy` references were missed.

* save

* Add all segarray transfer types

* Add segarray transfer to DF

* Categorical transfer added

* Fix flake8

* Clean up docs

* Remove list modification

* Add review feedback

* Fix na codes for categorical dataframe transfer

* Fix naming of receive functinos

* Fix flake8

* Add memory limit checking on receiving side

* Fix flake8

* Fix missed merge text

* Move receive_dataframe to IO

* Update transfer code to work with new OOM sym entry creation

* Fix dataframe circular dependency

* Fix flake8 whitespace

---------

Co-authored-by: pierce <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant