Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into string-view-to-pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed Feb 21, 2024
2 parents 5955d5f + 6a22a1d commit 615398e
Show file tree
Hide file tree
Showing 102 changed files with 4,251 additions and 459 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev_pr/link.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function commentJIRAURL(github, context, pullRequestNumber, jiraID) {
async function commentGitHubURL(github, context, pullRequestNumber, issueID) {
// Make the call to ensure issue exists before adding comment
const issueInfo = await helpers.getGitHubInfo(github, context, issueID, pullRequestNumber);
const message = "* Closes: #" + issueInfo.number
const message = "* GitHub Issue: #" + issueInfo.number
if (issueInfo) {
const body = context.payload.pull_request.body || "";
if (body.includes(message)) {
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_archery.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jira
pygit2
pygithub
ruamel.yaml
setuptools_scm<8.0.0
setuptools_scm
toolz

# benchmark
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_crossbow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ jinja2
jira
pygit2
ruamel.yaml
setuptools_scm<8.0.0
setuptools_scm
toolz
2 changes: 1 addition & 1 deletion ci/conda_env_python.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ pytest<8
pytest-faulthandler
s3fs>=2023.10.0
setuptools
setuptools_scm<8.0.0
setuptools_scm
2 changes: 1 addition & 1 deletion ci/docker/conda-python.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ COPY ci/conda_env_python.txt \
RUN mamba install -q -y \
--file arrow/ci/conda_env_python.txt \
$([ "$python" == $(gdb --batch --eval-command 'python import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")') ] && echo "gdb") \
python=${python} \
"python=${python}.*=*_cpython" \
nomkl && \
mamba clean --all

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ class AsofJoinNode : public ExecNode {
auto inputs = this->inputs();
for (size_t i = 0; i < inputs.size(); i++) {
RETURN_NOT_OK(key_hashers_[i]->Init(plan()->query_context()->exec_context(),
output_schema()));
inputs[i]->output_schema()));
ARROW_ASSIGN_OR_RAISE(
auto input_state,
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,70 @@ TEST(AsofJoinTest, BatchSequencing) {
return TestSequencing(MakeIntegerBatches, /*num_batches=*/32, /*batch_size=*/1);
}

template <typename BatchesMaker>
void TestSchemaResolution(BatchesMaker maker, int num_batches, int batch_size) {
// GH-39803: The key hasher needs to resolve the types of key columns. All other
// tests use int32 for all columns, but this test converts the key columns to
// strings via a projection node to test that the column is correctly resolved
// to string.
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r_schema =
schema({field("time", int32()), field("key", int32()), field("r0_value", int32())});

auto make_shift = [&maker, num_batches, batch_size](
const std::shared_ptr<Schema>& schema, int shift) {
return maker({[](int row) -> int64_t { return row; },
[num_batches](int row) -> int64_t { return row / num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
};
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r_batches, make_shift(r_schema, 1));

Declaration l_src = {"source",
SourceNodeOptions(l_schema, l_batches.gen(false, false))};
Declaration r_src = {"source",
SourceNodeOptions(r_schema, r_batches.gen(false, false))};
Declaration l_project = {
"project",
{std::move(l_src)},
ProjectNodeOptions({compute::field_ref("time"),
compute::call("cast", {compute::field_ref("key")},
compute::CastOptions::Safe(utf8())),
compute::field_ref("l_value")},
{"time", "key", "l_value"})};
Declaration r_project = {
"project",
{std::move(r_src)},
ProjectNodeOptions({compute::call("cast", {compute::field_ref("key")},
compute::CastOptions::Safe(utf8())),
compute::field_ref("r0_value"), compute::field_ref("time")},
{"key", "r0_value", "time"})};

Declaration asofjoin = {
"asofjoin", {l_project, r_project}, GetRepeatedOptions(2, "time", {"key"}, 1000)};

QueryOptions query_options;
query_options.use_threads = false;
ASSERT_OK_AND_ASSIGN(auto table, DeclarationToTable(asofjoin, query_options));

Int32Builder expected_r0_b;
for (int i = 1; i <= 91; i += 10) {
ASSERT_OK(expected_r0_b.Append(i));
}
ASSERT_OK_AND_ASSIGN(auto expected_r0, expected_r0_b.Finish());

auto actual_r0 = table->GetColumnByName("r0_value");
std::vector<std::shared_ptr<arrow::Array>> chunks = {expected_r0};
auto expected_r0_chunked = std::make_shared<arrow::ChunkedArray>(chunks);
ASSERT_TRUE(actual_r0->Equals(expected_r0_chunked));
}

TEST(AsofJoinTest, OutputSchemaResolution) {
return TestSchemaResolution(MakeIntegerBatches, /*num_batches=*/1, /*batch_size=*/10);
}

namespace {

Result<AsyncGenerator<std::optional<ExecBatch>>> MakeIntegerBatchGenForTest(
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/builder_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ constexpr int64_t kRounds = 256;
static VectorType kData = AlmostU8CompressibleVector();
constexpr int64_t kBytesProcessPerRound = kNumberOfElements * sizeof(ValueType);
constexpr int64_t kBytesProcessed = kRounds * kBytesProcessPerRound;
constexpr int64_t kItemsProcessed = kRounds * kNumberOfElements;

static const char* kBinaryString = "12345678";
static std::string_view kBinaryView(kBinaryString);
Expand All @@ -73,6 +74,7 @@ static void BuildIntArrayNoNulls(benchmark::State& state) { // NOLINT non-const
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildAdaptiveIntNoNulls(
Expand All @@ -89,6 +91,7 @@ static void BuildAdaptiveIntNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildAdaptiveIntNoNullsScalarAppend(
Expand All @@ -107,6 +110,7 @@ static void BuildAdaptiveIntNoNullsScalarAppend(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildBooleanArrayNoNulls(
Expand All @@ -127,6 +131,7 @@ static void BuildBooleanArrayNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildBinaryArray(benchmark::State& state) { // NOLINT non-const reference
Expand All @@ -142,6 +147,7 @@ static void BuildBinaryArray(benchmark::State& state) { // NOLINT non-const ref
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildChunkedBinaryArray(
Expand All @@ -161,6 +167,7 @@ static void BuildChunkedBinaryArray(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildFixedSizeBinaryArray(
Expand All @@ -179,6 +186,7 @@ static void BuildFixedSizeBinaryArray(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

static void BuildDecimalArray(benchmark::State& state) { // NOLINT non-const reference
Expand All @@ -199,6 +207,7 @@ static void BuildDecimalArray(benchmark::State& state) { // NOLINT non-const re
}

state.SetBytesProcessed(state.iterations() * kRounds * kNumberOfElements * 16);
state.SetItemsProcessed(state.iterations() * kRounds * kNumberOfElements);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -317,6 +326,7 @@ static void BenchmarkDictionaryArray(
fodder_nbytes = fodder.size() * sizeof(Scalar);
}
state.SetBytesProcessed(state.iterations() * fodder_nbytes * kRounds);
state.SetItemsProcessed(state.iterations() * fodder.size() * kRounds);
}

static void BuildInt64DictionaryArrayRandom(
Expand Down Expand Up @@ -361,6 +371,7 @@ static void ArrayDataConstructDestruct(
InitArrays();
arrays.clear();
}
state.SetItemsProcessed(state.iterations() * kNumArrays);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -430,6 +441,7 @@ static void ReferenceBuildVectorNoNulls(
}

state.SetBytesProcessed(state.iterations() * kBytesProcessed);
state.SetItemsProcessed(state.iterations() * kItemsProcessed);
}

BENCHMARK(ReferenceBuildVectorNoNulls);
Expand Down
Loading

0 comments on commit 615398e

Please sign in to comment.