Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 20, 2023
1 parent 68ba49d commit ef0774f
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 22 deletions.
1 change: 1 addition & 0 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ fi
# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1


# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
--run-c-data \
Expand Down
3 changes: 2 additions & 1 deletion ci/scripts/rust_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set -e

arrow_dir=${1}
source_dir=${1}/rust
build_dir=${2}/rust

# This file is used to build the rust binaries needed for the archery
# integration tests. Testing of the rust implementation in normal CI is handled
Expand Down Expand Up @@ -54,7 +55,7 @@ rustup show
pushd ${source_dir}

# build only the integration testing binaries
cargo build -p arrow-integration-testing
cargo build -p arrow-integration-testing --target-dir ${build_dir}

# Save disk space by removing large temporary build products
rm -rf target/debug/deps
Expand Down
9 changes: 9 additions & 0 deletions dev/archery/archery/integration/cdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@
import cffi
from contextlib import contextmanager
import functools
import os
import sys

from .tester import CDataExporter, CDataImporter


if sys.platform == "darwin":
dll_suffix = ".dylib"
elif os.name == "nt":
dll_suffix = ".dll"
else:
dll_suffix = ".so"

_c_data_decls = """
struct ArrowSchema {
// Array type description
Expand Down
10 changes: 1 addition & 9 deletions dev/archery/archery/integration/tester_cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import contextlib
import functools
import os
import sys
import subprocess

from . import cdata
Expand All @@ -42,15 +41,8 @@
"localhost",
]

if sys.platform == "darwin":
_dll_suffix = ".dylib"
elif os.name == "nt":
_dll_suffix = ".dll"
else:
_dll_suffix = ".so"

_DLL_PATH = _EXE_PATH
_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + _dll_suffix)
_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + cdata.dll_suffix)


class CppTester(Tester):
Expand Down
10 changes: 1 addition & 9 deletions dev/archery/archery/integration/tester_go.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import contextlib
import functools
import os
import sys
import subprocess

from . import cdata
Expand All @@ -43,17 +42,10 @@
"localhost",
]

if sys.platform == "darwin":
_dll_suffix = ".dylib"
elif os.name == "nt":
_dll_suffix = ".dll"
else:
_dll_suffix = ".so"

_DLL_PATH = os.path.join(
ARROW_ROOT_DEFAULT,
"go/arrow/internal/cdata_integration")
_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix)
_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + cdata.dll_suffix)


class GoTester(Tester):
Expand Down
114 changes: 112 additions & 2 deletions dev/archery/archery/integration/tester_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
# under the License.

import contextlib
import functools
import os
import subprocess

from .tester import Tester
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT


_EXE_PATH = os.path.join(ARROW_ROOT_DEFAULT, "rust/target/debug")
_EXE_PATH = os.environ.get(
"ARROW_RUST_EXE_PATH", os.path.join(ARROW_ROOT_DEFAULT, "rust/target/debug")
)
_INTEGRATION_EXE = os.path.join(_EXE_PATH, "arrow-json-integration-test")
_STREAM_TO_FILE = os.path.join(_EXE_PATH, "arrow-stream-to-file")
_FILE_TO_STREAM = os.path.join(_EXE_PATH, "arrow-file-to-stream")
Expand All @@ -37,12 +41,19 @@
"localhost",
]

_INTEGRATION_DLL = os.path.join(_EXE_PATH,
"libarrow_integration_testing" + cdata.dll_suffix)


class RustTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
C_DATA_SCHEMA_EXPORTER = True
C_DATA_ARRAY_EXPORTER = True
C_DATA_SCHEMA_IMPORTER = True
C_DATA_ARRAY_IMPORTER = True

name = 'Rust'

Expand Down Expand Up @@ -117,3 +128,102 @@ def flight_request(self, port, json_path=None, scenario_name=None):
if self.debug:
log(' '.join(cmd))
run_cmd(cmd)

def make_c_data_exporter(self):
return RustCDataExporter(self.debug, self.args)

def make_c_data_importer(self):
return RustCDataImporter(self.debug, self.args)


_rust_c_data_entrypoints = """
const char* arrow_rs_cdata_integration_export_schema_from_json(
const char* json_path, uintptr_t out);
const char* arrow_rs_cdata_integration_import_schema_and_compare_to_json(
const char* json_path, uintptr_t c_schema);
const char* arrow_rs_cdata_integration_export_batch_from_json(
const char* json_path, int num_batch, uintptr_t out);
const char* arrow_rs_cdata_integration_import_batch_and_compare_to_json(
const char* json_path, int num_batch, uintptr_t c_array);
void arrow_rs_free_error(const char*);
"""


@functools.lru_cache
def _load_ffi(ffi, lib_path=_INTEGRATION_DLL):
ffi.cdef(_rust_c_data_entrypoints)
dll = ffi.dlopen(lib_path)
return dll


class _CDataBase:

def __init__(self, debug, args):
self.debug = debug
self.args = args
self.ffi = cdata.ffi()
self.dll = _load_ffi(self.ffi)

def _pointer_to_int(self, c_ptr):
return self.ffi.cast('uintptr_t', c_ptr)

def _check_rust_error(self, rs_error):
"""
Check a `const char*` error return from an integration entrypoint.
A null means success, a non-empty string is an error message.
The string is dynamically allocated on the Rust side.
"""
assert self.ffi.typeof(rs_error) is self.ffi.typeof("const char*")
if rs_error != self.ffi.NULL:
try:
error = self.ffi.string(rs_error).decode(
'utf8', errors='replace')
raise RuntimeError(
f"Rust C Data Integration call failed: {error}")
finally:
self.dll.arrow_rs_free_error(rs_error)


class RustCDataExporter(CDataExporter, _CDataBase):

def export_schema_from_json(self, json_path, c_schema_ptr):
rs_error = self.dll.arrow_rs_cdata_integration_export_schema_from_json(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_rust_error(rs_error)

def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
rs_error = self.dll.arrow_rs_cdata_integration_export_batch_from_json(
str(json_path).encode(), num_batch,
self._pointer_to_int(c_array_ptr))
self._check_rust_error(rs_error)

@property
def supports_releasing_memory(self):
return True

def record_allocation_state(self):
# FIXME is it possible to measure the amount of Rust-allocated memory?
return 0


class RustCDataImporter(CDataImporter, _CDataBase):

def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
rs_error = \
self.dll.arrow_rs_cdata_integration_import_schema_and_compare_to_json(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_rust_error(rs_error)

def import_batch_and_compare_to_json(self, json_path, num_batch,
c_array_ptr):
rs_error = \
self.dll.arrow_rs_cdata_integration_import_batch_and_compare_to_json(
str(json_path).encode(), num_batch, self._pointer_to_int(c_array_ptr))
self._check_rust_error(rs_error)

@property
def supports_releasing_memory(self):
return True
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1716,8 +1716,9 @@ services:
environment:
<<: [*common, *ccache]
ARCHERY_INTEGRATION_WITH_RUST: 0
# Tell Archery where the arrow C++ binaries are located
# Tell Archery where Arrow binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
ARROW_RUST_EXE_PATH: /build/rust/debug
command:
["/arrow/ci/scripts/integration_arrow_build.sh /arrow /build &&
/arrow/ci/scripts/integration_arrow.sh /arrow /build"]
Expand Down

0 comments on commit ef0774f

Please sign in to comment.