Skip to content

Commit

Permalink
Merge branch 'apache:main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
tmct authored Feb 22, 2024
2 parents 619345c + b089c6a commit 0656c92
Show file tree
Hide file tree
Showing 133 changed files with 4,980 additions and 993 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev_pr/link.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function commentJIRAURL(github, context, pullRequestNumber, jiraID) {
async function commentGitHubURL(github, context, pullRequestNumber, issueID) {
// Make the call to ensure issue exists before adding comment
const issueInfo = await helpers.getGitHubInfo(github, context, issueID, pullRequestNumber);
const message = "* Closes: #" + issueInfo.number
const message = "* GitHub Issue: #" + issueInfo.number
if (issueInfo) {
const body = context.payload.pull_request.body || "";
if (body.includes(message)) {
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ jobs:
timeout-minutes: 60
env:
ARROW_HOME: /usr/local
ARROW_AZURE: ON
ARROW_DATASET: ON
ARROW_FLIGHT: ON
ARROW_GANDIVA: ON
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_archery.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jira
pygit2
pygithub
ruamel.yaml
setuptools_scm<8.0.0
setuptools_scm
toolz

# benchmark
Expand Down
6 changes: 6 additions & 0 deletions ci/conda_env_cpp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
# under the License.

aws-sdk-cpp=1.11.68
azure-core-cpp>=1.10.3
azure-identity-cpp>=1.6.0
azure-storage-blobs-cpp>=12.10.0
azure-storage-common-cpp>=12.5.0
azure-storage-files-datalake-cpp>=12.9.0
benchmark>=1.6.0
boost-cpp>=1.68.0
brotli
Expand All @@ -34,6 +39,7 @@ libutf8proc
lz4-c
make
ninja
nodejs
orc
pkg-config
python
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_crossbow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ jinja2
jira
pygit2
ruamel.yaml
setuptools_scm<8.0.0
setuptools_scm
toolz
2 changes: 1 addition & 1 deletion ci/conda_env_python.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ pytest<8
pytest-faulthandler
s3fs>=2023.10.0
setuptools
setuptools_scm<8.0.0
setuptools_scm
8 changes: 8 additions & 0 deletions ci/docker/conda-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ RUN mamba install -q -y \
valgrind && \
mamba clean --all

# Ensure npm, node and azurite are on path. npm and node are required to install azurite, which will then need to
# be on the path for the tests to run.
ENV PATH=/opt/conda/envs/arrow/bin:$PATH

COPY ci/scripts/install_azurite.sh /arrow/ci/scripts/
RUN /arrow/ci/scripts/install_azurite.sh

# We want to install the GCS testbench using the same Python binary that the Conda code will use.
COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts
RUN /arrow/ci/scripts/install_gcs_testbench.sh default
Expand All @@ -50,6 +57,7 @@ COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/
RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin

ENV ARROW_ACERO=ON \
ARROW_AZURE=ON \
ARROW_BUILD_TESTS=ON \
ARROW_DATASET=ON \
ARROW_DEPENDENCY_SOURCE=CONDA \
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/conda-python.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ COPY ci/conda_env_python.txt \
RUN mamba install -q -y \
--file arrow/ci/conda_env_python.txt \
$([ "$python" == $(gdb --batch --eval-command 'python import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")') ] && echo "gdb") \
python=${python} \
"python=${python}.*=*_cpython" \
nomkl && \
mamba clean --all

Expand Down
1 change: 1 addition & 0 deletions ci/scripts/cpp_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ ctest \
--label-regex unittest \
--output-on-failure \
--parallel ${n_jobs} \
--repeat until-pass:3 \
--timeout ${ARROW_CTEST_TIMEOUT:-300} \
"${ctest_options[@]}" \
"$@"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/python_sdist_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export PARQUET_TEST_DATA=${arrow_dir}/cpp/submodules/parquet-testing/data
export PYARROW_CMAKE_GENERATOR=${CMAKE_GENERATOR:-Ninja}
export PYARROW_BUILD_TYPE=${CMAKE_BUILD_TYPE:-debug}
export PYARROW_WITH_ACERO=${ARROW_ACERO:-ON}
export PYARROW_WITH_AZURE=${ARROW_AZURE:-OFF}
export PYARROW_WITH_S3=${ARROW_S3:-OFF}
export PYARROW_WITH_ORC=${ARROW_ORC:-OFF}
export PYARROW_WITH_CUDA=${ARROW_CUDA:-OFF}
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/python_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ fi
: ${PYARROW_TEST_S3:=${ARROW_S3:-ON}}

export PYARROW_TEST_ACERO
export PYARROW_TEST_AZURE
export PYARROW_TEST_CUDA
export PYARROW_TEST_DATASET
export PYARROW_TEST_FLIGHT
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ class AsofJoinNode : public ExecNode {
auto inputs = this->inputs();
for (size_t i = 0; i < inputs.size(); i++) {
RETURN_NOT_OK(key_hashers_[i]->Init(plan()->query_context()->exec_context(),
output_schema()));
inputs[i]->output_schema()));
ARROW_ASSIGN_OR_RAISE(
auto input_state,
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,70 @@ TEST(AsofJoinTest, BatchSequencing) {
return TestSequencing(MakeIntegerBatches, /*num_batches=*/32, /*batch_size=*/1);
}

template <typename BatchesMaker>
void TestSchemaResolution(BatchesMaker maker, int num_batches, int batch_size) {
// GH-39803: The key hasher needs to resolve the types of key columns. All other
// tests use int32 for all columns, but this test converts the key columns to
// strings via a projection node to test that the column is correctly resolved
// to string.
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r_schema =
schema({field("time", int32()), field("key", int32()), field("r0_value", int32())});

auto make_shift = [&maker, num_batches, batch_size](
const std::shared_ptr<Schema>& schema, int shift) {
return maker({[](int row) -> int64_t { return row; },
[num_batches](int row) -> int64_t { return row / num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
};
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r_batches, make_shift(r_schema, 1));

Declaration l_src = {"source",
SourceNodeOptions(l_schema, l_batches.gen(false, false))};
Declaration r_src = {"source",
SourceNodeOptions(r_schema, r_batches.gen(false, false))};
Declaration l_project = {
"project",
{std::move(l_src)},
ProjectNodeOptions({compute::field_ref("time"),
compute::call("cast", {compute::field_ref("key")},
compute::CastOptions::Safe(utf8())),
compute::field_ref("l_value")},
{"time", "key", "l_value"})};
Declaration r_project = {
"project",
{std::move(r_src)},
ProjectNodeOptions({compute::call("cast", {compute::field_ref("key")},
compute::CastOptions::Safe(utf8())),
compute::field_ref("r0_value"), compute::field_ref("time")},
{"key", "r0_value", "time"})};

Declaration asofjoin = {
"asofjoin", {l_project, r_project}, GetRepeatedOptions(2, "time", {"key"}, 1000)};

QueryOptions query_options;
query_options.use_threads = false;
ASSERT_OK_AND_ASSIGN(auto table, DeclarationToTable(asofjoin, query_options));

Int32Builder expected_r0_b;
for (int i = 1; i <= 91; i += 10) {
ASSERT_OK(expected_r0_b.Append(i));
}
ASSERT_OK_AND_ASSIGN(auto expected_r0, expected_r0_b.Finish());

auto actual_r0 = table->GetColumnByName("r0_value");
std::vector<std::shared_ptr<arrow::Array>> chunks = {expected_r0};
auto expected_r0_chunked = std::make_shared<arrow::ChunkedArray>(chunks);
ASSERT_TRUE(actual_r0->Equals(expected_r0_chunked));
}

TEST(AsofJoinTest, OutputSchemaResolution) {
return TestSchemaResolution(MakeIntegerBatches, /*num_batches=*/1, /*batch_size=*/10);
}

namespace {

Result<AsyncGenerator<std::optional<ExecBatch>>> MakeIntegerBatchGenForTest(
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/builder_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ constexpr int64_t kRounds = 256;
static VectorType kData = AlmostU8CompressibleVector();
constexpr int64_t kBytesProcessPerRound = kNumberOfElements * sizeof(ValueType);
constexpr int64_t kBytesProcessed = kRounds * kBytesProcessPerRound;
constexpr int64_t kItemsProcessed = kRounds * kNumberOfElements;

static const char* kBinaryString = "12345678";
static std::string_view kBinaryView(kBinaryString);
Expand All @@ -73,6 +74,7 @@ static void BuildIntArrayNoNulls(benchmark::State& state) { // NOLINT non-const
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildAdaptiveIntNoNulls(
Expand All @@ -89,6 +91,7 @@ static void BuildAdaptiveIntNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildAdaptiveIntNoNullsScalarAppend(
Expand All @@ -107,6 +110,7 @@ static void BuildAdaptiveIntNoNullsScalarAppend(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildBooleanArrayNoNulls(
Expand All @@ -127,6 +131,7 @@ static void BuildBooleanArrayNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildBinaryArray(benchmark::State& state) { // NOLINT non-const reference
Expand All @@ -142,6 +147,7 @@ static void BuildBinaryArray(benchmark::State& state) { // NOLINT non-const ref
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildChunkedBinaryArray(
Expand All @@ -161,6 +167,7 @@ static void BuildChunkedBinaryArray(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildFixedSizeBinaryArray(
Expand All @@ -179,6 +186,7 @@ static void BuildFixedSizeBinaryArray(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildDecimalArray(benchmark::State& state) { // NOLINT non-const reference
Expand All @@ -199,6 +207,7 @@ static void BuildDecimalArray(benchmark::State& state) { // NOLINT non-const re
}

state.SetBytesProcessed(state.iterations() * kRounds * kNumberOfElements * 16);
state.SetItemsProcessed(state.iterations() * kRounds * kNumberOfElements);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -317,6 +326,7 @@ static void BenchmarkDictionaryArray(
fodder_nbytes = fodder.size() * sizeof(Scalar);
}
state.SetBytesProcessed(state.iterations() * fodder_nbytes * kRounds);
state.SetItemsProcessed(state.iterations() * fodder.size() * kRounds);
}

static void BuildInt64DictionaryArrayRandom(
Expand Down Expand Up @@ -361,6 +371,7 @@ static void ArrayDataConstructDestruct(
InitArrays();
arrays.clear();
}
state.SetItemsProcessed(state.iterations() * kNumArrays);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -430,6 +441,7 @@ static void ReferenceBuildVectorNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

BENCHMARK(ReferenceBuildVectorNoNulls);
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/arrow/compute/kernels/scalar_cast_dictionary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ Status CastToDictionary(KernelContext* ctx, const ExecSpan& batch, ExecResult* o
return Status::OK();
}

// If the input type is STRING, it is first encoded as a dictionary to facilitate
// processing. This approach allows the subsequent code to uniformly handle STRING
// inputs as if they were originally provided in dictionary format. Encoding as a
// dictionary helps in reusing the same logic for dictionary operations.
if (batch[0].type()->id() == Type::STRING) {
// If the input type is string or binary-like, it is first encoded as a dictionary to
// facilitate processing. This approach allows the subsequent code to uniformly handle
// string or binary-like inputs as if they were originally provided in dictionary
// format. Encoding as a dictionary helps in reusing the same logic for dictionary
// operations.
if (is_base_binary_like(in_array->type->id())) {
in_array = DictionaryEncode(in_array)->array();
}
const auto& in_type = checked_cast<const DictionaryType&>(*in_array->type);
Expand Down Expand Up @@ -98,6 +99,9 @@ std::vector<std::shared_ptr<CastFunction>> GetDictionaryCasts() {
AddCommonCasts(Type::DICTIONARY, kOutputTargetType, cast_dict.get());
AddDictionaryCast<DictionaryType>(cast_dict.get());
AddDictionaryCast<StringType>(cast_dict.get());
AddDictionaryCast<LargeStringType>(cast_dict.get());
AddDictionaryCast<BinaryType>(cast_dict.get());
AddDictionaryCast<LargeBinaryType>(cast_dict.get());

return {cast_dict};
}
Expand Down
Loading

0 comments on commit 0656c92

Please sign in to comment.