diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 35c4460d47bc6..eb00bc5f92a8d 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -197,9 +197,7 @@ jobs: mingw-n-bits: - 64 ruby-version: - # TODO: Use the latest Ruby again when we fix GH-39130. - # - ruby - - "3.1" + - ruby env: ARROW_BUILD_STATIC: OFF ARROW_BUILD_TESTS: OFF diff --git a/.gitignore b/.gitignore index 3192069d1ac7a..52ffa6c6124c2 100644 --- a/.gitignore +++ b/.gitignore @@ -102,8 +102,8 @@ __debug_bin .envrc # Develocity -.mvn/.gradle-enterprise/ -.mvn/.develocity/ +java/.mvn/.gradle-enterprise/ +java/.mvn/.develocity/ # rat filtered_rat.txt diff --git a/ci/appveyor-cpp-setup.bat b/ci/appveyor-cpp-setup.bat index 5c4a11832d5ee..5a9dffa166fb7 100644 --- a/ci/appveyor-cpp-setup.bat +++ b/ci/appveyor-cpp-setup.bat @@ -66,6 +66,9 @@ set CONDA_PACKAGES=%CONDA_PACKAGES% --file=ci\conda_env_cpp.txt @rem Force conda to use conda-forge conda config --add channels conda-forge conda config --remove channels defaults +@rem Ensure using the latest information. If there are invalid caches, +@rem mamba may use invalid download URL. +mamba clean --all -y @rem Arrow conda environment mamba create -n arrow -y -c conda-forge ^ --file=ci\conda_env_python.txt ^ diff --git a/ci/scripts/java_full_build.sh b/ci/scripts/java_full_build.sh index 2734f3e9dbec2..d914aa2d8472e 100755 --- a/ci/scripts/java_full_build.sh +++ b/ci/scripts/java_full_build.sh @@ -49,21 +49,13 @@ fi # build the entire project mvn clean \ install \ - assembly:single \ - source:jar \ - javadoc:jar \ -Papache-release \ -Parrow-c-data \ -Parrow-jni \ -Darrow.cpp.build.dir=$dist_dir \ - -Darrow.c.jni.dist.dir=$dist_dir \ - -DdescriptorId=source-release + -Darrow.c.jni.dist.dir=$dist_dir # copy all jar, zip and pom files to the distribution folder -find . \ - "(" -name "*-javadoc.jar" -o -name "*-sources.jar" ")" \ - -exec echo {} ";" \ - -exec cp {} $dist_dir ";" find ~/.m2/repository/org/apache/arrow \ "(" \ -name "*.jar" -o \ diff --git a/ci/scripts/python_wheel_manylinux_build.sh b/ci/scripts/python_wheel_manylinux_build.sh index 6e29ef58d2318..aa86494a9d47d 100755 --- a/ci/scripts/python_wheel_manylinux_build.sh +++ b/ci/scripts/python_wheel_manylinux_build.sh @@ -160,6 +160,26 @@ export CMAKE_PREFIX_PATH=/tmp/arrow-dist pushd /arrow/python python setup.py bdist_wheel +echo "=== Strip symbols from wheel ===" +mkdir -p dist/temp-fix-wheel +mv dist/pyarrow-*.whl dist/temp-fix-wheel + +pushd dist/temp-fix-wheel +wheel_name=$(ls pyarrow-*.whl) +# Unzip and remove old wheel +unzip $wheel_name +rm $wheel_name +for filename in $(ls pyarrow/*.so pyarrow/*.so.*); do + echo "Stripping debug symbols from: $filename"; + strip --strip-debug $filename +done +# Zip wheel again after stripping symbols +zip -r $wheel_name . +mv $wheel_name .. +popd + +rm -rf dist/temp-fix-wheel + echo "=== (${PYTHON_VERSION}) Tag the wheel with manylinux${MANYLINUX_VERSION} ===" auditwheel repair -L . dist/pyarrow-*.whl -w repaired_wheels popd diff --git a/cpp/cmake_modules/Findutf8proc.cmake b/cpp/cmake_modules/Findutf8proc.cmake index e347414090549..9721f76f0631b 100644 --- a/cpp/cmake_modules/Findutf8proc.cmake +++ b/cpp/cmake_modules/Findutf8proc.cmake @@ -19,7 +19,7 @@ if(utf8proc_FOUND) return() endif() -if(ARROW_PACKAGE_KIND STREQUAL "vcpkg") +if(ARROW_PACKAGE_KIND STREQUAL "vcpkg" OR VCPKG_TOOLCHAIN) set(find_package_args "") if(utf8proc_FIND_VERSION) list(APPEND find_package_args ${utf8proc_FIND_VERSION}) diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index f102c7bb81683..9dcb426f079fe 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -2819,11 +2819,13 @@ macro(build_utf8proc) endmacro() if(ARROW_WITH_UTF8PROC) - resolve_dependency(utf8proc - PC_PACKAGE_NAMES - libutf8proc - REQUIRED_VERSION - "2.2.0") + set(utf8proc_resolve_dependency_args utf8proc PC_PACKAGE_NAMES libutf8proc) + if(NOT VCPKG_TOOLCHAIN) + # utf8proc in vcpkg doesn't provide version information: + # https://github.com/microsoft/vcpkg/issues/39176 + list(APPEND utf8proc_resolve_dependency_args REQUIRED_VERSION "2.2.0") + endif() + resolve_dependency(${utf8proc_resolve_dependency_args}) endif() macro(build_cares) @@ -4611,8 +4613,11 @@ macro(build_opentelemetry) set(_OPENTELEMETRY_LIBS common http_client_curl + logs + ostream_log_record_exporter ostream_span_exporter otlp_http_client + otlp_http_log_record_exporter otlp_http_exporter otlp_recordable proto @@ -4645,6 +4650,14 @@ macro(build_opentelemetry) set(_OPENTELEMETRY_STATIC_LIBRARY "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_otlp_http${CMAKE_STATIC_LIBRARY_SUFFIX}" ) + elseif(_OPENTELEMETRY_LIB STREQUAL "otlp_http_log_record_exporter") + set(_OPENTELEMETRY_STATIC_LIBRARY + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_otlp_http_log${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + elseif(_OPENTELEMETRY_LIB STREQUAL "ostream_log_record_exporter") + set(_OPENTELEMETRY_STATIC_LIBRARY + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_exporter_ostream_logs${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) else() set(_OPENTELEMETRY_STATIC_LIBRARY "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" @@ -4679,9 +4692,16 @@ macro(build_opentelemetry) IMPORTED_LOCATION) list(APPEND OPENTELEMETRY_CMAKE_ARGS - -DWITH_OTLP=ON -DWITH_OTLP_HTTP=ON -DWITH_OTLP_GRPC=OFF + # Disabled because it seemed to cause linking errors. May be worth a closer look. + -DWITH_FUNC_TESTS=OFF + # These options are slated for removal in v1.14 and their features are deemed stable + # as of v1.13. However, setting their corresponding ENABLE_* macros in headers seems + # finicky - resulting in build failures or ABI-related runtime errors during HTTP + # client initialization. There may still be a solution, but we disable them for now. + -DWITH_OTLP_HTTP_SSL_PREVIEW=OFF + -DWITH_OTLP_HTTP_SSL_TLS_PREVIEW=OFF "-DProtobuf_INCLUDE_DIR=${OPENTELEMETRY_PROTOBUF_INCLUDE_DIR}" "-DProtobuf_LIBRARY=${OPENTELEMETRY_PROTOBUF_INCLUDE_DIR}" "-DProtobuf_PROTOC_EXECUTABLE=${OPENTELEMETRY_PROTOC_EXECUTABLE}") @@ -4755,19 +4775,25 @@ macro(build_opentelemetry) target_link_libraries(opentelemetry-cpp::resources INTERFACE opentelemetry-cpp::common) target_link_libraries(opentelemetry-cpp::trace INTERFACE opentelemetry-cpp::common opentelemetry-cpp::resources) + target_link_libraries(opentelemetry-cpp::logs INTERFACE opentelemetry-cpp::common + opentelemetry-cpp::resources) target_link_libraries(opentelemetry-cpp::http_client_curl - INTERFACE opentelemetry-cpp::ext CURL::libcurl) + INTERFACE opentelemetry-cpp::common opentelemetry-cpp::ext + CURL::libcurl) target_link_libraries(opentelemetry-cpp::proto INTERFACE ${ARROW_PROTOBUF_LIBPROTOBUF}) target_link_libraries(opentelemetry-cpp::otlp_recordable - INTERFACE opentelemetry-cpp::trace opentelemetry-cpp::resources - opentelemetry-cpp::proto) + INTERFACE opentelemetry-cpp::logs opentelemetry-cpp::trace + opentelemetry-cpp::resources opentelemetry-cpp::proto) target_link_libraries(opentelemetry-cpp::otlp_http_client - INTERFACE opentelemetry-cpp::sdk opentelemetry-cpp::proto + INTERFACE opentelemetry-cpp::common opentelemetry-cpp::proto opentelemetry-cpp::http_client_curl nlohmann_json::nlohmann_json) target_link_libraries(opentelemetry-cpp::otlp_http_exporter INTERFACE opentelemetry-cpp::otlp_recordable opentelemetry-cpp::otlp_http_client) + target_link_libraries(opentelemetry-cpp::otlp_http_log_record_exporter + INTERFACE opentelemetry-cpp::otlp_recordable + opentelemetry-cpp::otlp_http_client) foreach(_OPENTELEMETRY_LIB ${_OPENTELEMETRY_LIBS}) add_dependencies(opentelemetry-cpp::${_OPENTELEMETRY_LIB} opentelemetry_ep) @@ -4789,7 +4815,11 @@ if(ARROW_WITH_OPENTELEMETRY) set(opentelemetry-cpp_SOURCE "AUTO") resolve_dependency(opentelemetry-cpp) set(ARROW_OPENTELEMETRY_LIBS - opentelemetry-cpp::trace opentelemetry-cpp::ostream_span_exporter + opentelemetry-cpp::trace + opentelemetry-cpp::logs + opentelemetry-cpp::otlp_http_log_record_exporter + opentelemetry-cpp::ostream_log_record_exporter + opentelemetry-cpp::ostream_span_exporter opentelemetry-cpp::otlp_http_exporter) get_target_property(OPENTELEMETRY_INCLUDE_DIR opentelemetry-cpp::api INTERFACE_INCLUDE_DIRECTORIES) diff --git a/cpp/cmake_modules/Usevcpkg.cmake b/cpp/cmake_modules/Usevcpkg.cmake index 37a732f4b85a0..b6192468da342 100644 --- a/cpp/cmake_modules/Usevcpkg.cmake +++ b/cpp/cmake_modules/Usevcpkg.cmake @@ -237,9 +237,6 @@ set(LZ4_ROOT CACHE STRING "") if(CMAKE_HOST_WIN32) - set(utf8proc_MSVC_STATIC_LIB_SUFFIX - "" - CACHE STRING "") set(LZ4_MSVC_LIB_PREFIX "" CACHE STRING "") diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 150a304975cad..6dc8358f502f5 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -200,22 +200,29 @@ function(arrow_add_object_library PREFIX) set(SOURCES ${ARGN}) string(TOLOWER "${PREFIX}" prefix) if(WIN32) - add_library(${prefix}_shared OBJECT ${SOURCES}) - add_library(${prefix}_static OBJECT ${SOURCES}) - set_target_properties(${prefix}_shared PROPERTIES POSITION_INDEPENDENT_CODE ON) - set_target_properties(${prefix}_static PROPERTIES POSITION_INDEPENDENT_CODE ON) - target_compile_definitions(${prefix}_shared PRIVATE ARROW_EXPORTING) - target_compile_definitions(${prefix}_static PRIVATE ARROW_STATIC) - target_compile_features(${prefix}_shared PRIVATE cxx_std_17) - target_compile_features(${prefix}_static PRIVATE cxx_std_17) - set(${PREFIX}_TARGET_SHARED - ${prefix}_shared - PARENT_SCOPE) - set(${PREFIX}_TARGET_STATIC - ${prefix}_static - PARENT_SCOPE) + set(targets) + if(ARROW_BUILD_SHARED) + add_library(${prefix}_shared OBJECT ${SOURCES}) + set_target_properties(${prefix}_shared PROPERTIES POSITION_INDEPENDENT_CODE ON) + target_compile_definitions(${prefix}_shared PRIVATE ARROW_EXPORTING) + target_compile_features(${prefix}_shared PRIVATE cxx_std_17) + set(${PREFIX}_TARGET_SHARED + ${prefix}_shared + PARENT_SCOPE) + list(APPEND targets ${prefix}_shared) + endif() + if(ARROW_BUILD_STATIC) + add_library(${prefix}_static OBJECT ${SOURCES}) + set_target_properties(${prefix}_static PROPERTIES POSITION_INDEPENDENT_CODE ON) + target_compile_definitions(${prefix}_static PRIVATE ARROW_STATIC) + target_compile_features(${prefix}_static PRIVATE cxx_std_17) + set(${PREFIX}_TARGET_STATIC + ${prefix}_static + PARENT_SCOPE) + list(APPEND targets ${prefix}_static) + endif() set(${PREFIX}_TARGETS - ${prefix}_shared ${prefix}_static + ${targets} PARENT_SCOPE) else() add_library(${prefix} OBJECT ${SOURCES}) @@ -515,6 +522,7 @@ set(ARROW_UTIL_SRCS util/int_util.cc util/io_util.cc util/list_util.cc + util/logger.cc util/logging.cc util/key_value_metadata.cc util/memory.cc @@ -620,6 +628,17 @@ if(ARROW_WITH_ZSTD) endforeach() endif() +if(ARROW_WITH_OPENTELEMETRY) + arrow_add_object_library(ARROW_TELEMETRY telemetry/logging.cc) + + foreach(ARROW_TELEMETRY_TARGET ${ARROW_TELEMETRY_TARGETS}) + target_link_libraries(${ARROW_TELEMETRY_TARGET} PRIVATE ${ARROW_OPENTELEMETRY_LIBS}) + endforeach() +else() + set(ARROW_TELEMETRY_TARGET_SHARED) + set(ARROW_TELEMETRY_TARGET_STATIC) +endif() + set(ARROW_TESTING_SHARED_LINK_LIBS arrow_shared ${ARROW_GTEST_GTEST}) set(ARROW_TESTING_SHARED_PRIVATE_LINK_LIBS arrow::flatbuffers RapidJSON) set(ARROW_TESTING_STATIC_LINK_LIBS arrow::flatbuffers RapidJSON arrow_static @@ -1009,6 +1028,7 @@ add_arrow_lib(arrow ${ARROW_JSON_TARGET_SHARED} ${ARROW_MEMORY_POOL_TARGET_SHARED} ${ARROW_ORC_TARGET_SHARED} + ${ARROW_TELEMETRY_TARGET_SHARED} ${ARROW_UTIL_TARGET_SHARED} ${ARROW_VENDORED_TARGET_SHARED} ${ARROW_SHARED_PRIVATE_LINK_LIBS} @@ -1024,6 +1044,7 @@ add_arrow_lib(arrow ${ARROW_JSON_TARGET_STATIC} ${ARROW_MEMORY_POOL_TARGET_STATIC} ${ARROW_ORC_TARGET_STATIC} + ${ARROW_TELEMETRY_TARGET_STATIC} ${ARROW_UTIL_TARGET_STATIC} ${ARROW_VENDORED_TARGET_STATIC} ${ARROW_SYSTEM_LINK_LIBS} @@ -1253,6 +1274,10 @@ if(ARROW_SUBSTRAIT) add_subdirectory(engine) endif() +if(ARROW_WITH_OPENTELEMETRY) + add_subdirectory(telemetry) +endif() + if(ARROW_TENSORFLOW) add_subdirectory(adapters/tensorflow) endif() diff --git a/cpp/src/arrow/array/builder_nested.h b/cpp/src/arrow/array/builder_nested.h index 6089cf04d421f..1851ef9122274 100644 --- a/cpp/src/arrow/array/builder_nested.h +++ b/cpp/src/arrow/array/builder_nested.h @@ -642,6 +642,8 @@ class ARROW_EXPORT MapBuilder : public ArrayBuilder { /// \brief Builder class for fixed-length list array value types class ARROW_EXPORT FixedSizeListBuilder : public ArrayBuilder { public: + using TypeClass = FixedSizeListType; + /// Use this constructor to define the built array's type explicitly. If value_builder /// has indeterminate type, this builder will also. FixedSizeListBuilder(MemoryPool* pool, diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 9cc5cc10917ee..5c87ef2cd0561 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -361,7 +361,8 @@ size_t InputType::Hash() const { case InputType::EXACT_TYPE: hash_combine(result, type_->Hash()); break; - default: + case InputType::ANY_TYPE: + case InputType::USE_TYPE_MATCHER: break; } return result; @@ -378,10 +379,8 @@ std::string InputType::ToString() const { break; case InputType::USE_TYPE_MATCHER: { ss << type_matcher_->ToString(); - } break; - default: - DCHECK(false); break; + } } return ss.str(); } @@ -400,9 +399,8 @@ bool InputType::Equals(const InputType& other) const { return type_->Equals(*other.type_); case InputType::USE_TYPE_MATCHER: return type_matcher_->Equals(*other.type_matcher_); - default: - return false; } + return false; } bool InputType::Matches(const DataType& type) const { @@ -411,21 +409,23 @@ bool InputType::Matches(const DataType& type) const { return type_->Equals(type); case InputType::USE_TYPE_MATCHER: return type_matcher_->Matches(type); - default: - // ANY_TYPE + case InputType::ANY_TYPE: return true; } + return false; } bool InputType::Matches(const Datum& value) const { switch (value.kind()) { + case Datum::NONE: + case Datum::RECORD_BATCH: + case Datum::TABLE: + DCHECK(false) << "Matches expects ARRAY, CHUNKED_ARRAY or SCALAR"; + return false; case Datum::ARRAY: case Datum::CHUNKED_ARRAY: case Datum::SCALAR: break; - default: - DCHECK(false); - return false; } return Matches(*value.type()); } @@ -445,11 +445,13 @@ const TypeMatcher& InputType::type_matcher() const { Result OutputType::Resolve(KernelContext* ctx, const std::vector& types) const { - if (kind_ == OutputType::FIXED) { - return type_.get(); - } else { - return resolver_(ctx, types); + switch (kind_) { + case OutputType::FIXED: + return type_; + case OutputType::COMPUTED: + break; } + return resolver_(ctx, types); } const std::shared_ptr& OutputType::type() const { @@ -463,11 +465,13 @@ const OutputType::Resolver& OutputType::resolver() const { } std::string OutputType::ToString() const { - if (kind_ == OutputType::FIXED) { - return type_->ToString(); - } else { - return "computed"; + switch (kind_) { + case OutputType::FIXED: + return type_->ToString(); + case OutputType::COMPUTED: + break; } + return "computed"; } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernels/gather_internal.h b/cpp/src/arrow/compute/kernels/gather_internal.h new file mode 100644 index 0000000000000..4c161533a7277 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/gather_internal.h @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/array/data.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/macros.h" + +// Implementation helpers for kernels that need to load/gather fixed-width +// data from multiple, arbitrary indices. +// +// https://en.wikipedia.org/wiki/Gather/scatter_(vector_addressing) + +namespace arrow::internal { + +// CRTP [1] base class for Gather that provides a gathering loop in terms of +// Write*() methods that must be implemented by the derived class. +// +// [1] https://en.wikipedia.org/wiki/Curiously_recurring_template_pattern +template +class GatherBaseCRTP { + public: + // Output offset is not supported by Gather and idx is supposed to have offset + // pre-applied. idx_validity parameters on functions can use the offset they + // carry to read the validity bitmap as bitmaps can't have pre-applied offsets + // (they might not align to byte boundaries). + + GatherBaseCRTP() = default; + ARROW_DISALLOW_COPY_AND_ASSIGN(GatherBaseCRTP); + ARROW_DEFAULT_MOVE_AND_ASSIGN(GatherBaseCRTP); + + protected: + ARROW_FORCE_INLINE int64_t ExecuteNoNulls(int64_t idx_length) { + auto* self = static_cast(this); + for (int64_t position = 0; position < idx_length; position++) { + self->WriteValue(position); + } + return idx_length; + } + + // See derived Gather classes below for the meaning of the parameters, pre and + // post-conditions. + // + // src_validity is not necessarily the source of the values that are being + // gathered (e.g. the source could be a nested fixed-size list array and the + // values being gathered are from the innermost buffer), so the ArraySpan is + // used solely to check for nulls in the source values and nothing else. + // + // idx_length is the number of elements in idx and consequently the number of + // bits that might be written to out_is_valid. Member `Write*()` functions will be + // called with positions from 0 to idx_length - 1. + // + // If `kOutputIsZeroInitialized` is true, then `WriteZero()` or `WriteZeroSegment()` + // doesn't have to be called for resulting null positions. A position is + // considered null if either the index or the source value is null at that + // position. + template + ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ArraySpan& src_validity, + int64_t idx_length, const IndexCType* idx, + const ArraySpan& idx_validity, + uint8_t* out_is_valid) { + auto* self = static_cast(this); + OptionalBitBlockCounter indices_bit_counter(idx_validity.buffers[0].data, + idx_validity.offset, idx_length); + int64_t position = 0; + int64_t valid_count = 0; + while (position < idx_length) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (!src_validity.MayHaveNulls()) { + // Source values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither source values nor index nulls + bit_util::SetBitsTo(out_is_valid, position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + self->WriteValue(position); + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some indices but not all are null + for (int64_t i = 0; i < block.length; ++i) { + ARROW_COMPILER_ASSUME(idx_validity.buffers[0].data != nullptr); + if (idx_validity.IsValid(position)) { + // index is not null + bit_util::SetBit(out_is_valid, position); + self->WriteValue(position); + } else if constexpr (!kOutputIsZeroInitialized) { + self->WriteZero(position); + } + ++position; + } + } else { + self->WriteZeroSegment(position, block.length); + position += block.length; + } + } else { + // Source values may be null, so we must do random access into src_validity + if (block.popcount == block.length) { + // Faster path: indices are not null but source values may be + for (int64_t i = 0; i < block.length; ++i) { + ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr); + if (src_validity.IsValid(idx[position])) { + // value is not null + self->WriteValue(position); + bit_util::SetBit(out_is_valid, position); + ++valid_count; + } else if constexpr (!kOutputIsZeroInitialized) { + self->WriteZero(position); + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr); + ARROW_COMPILER_ASSUME(idx_validity.buffers[0].data != nullptr); + if (idx_validity.IsValid(position) && src_validity.IsValid(idx[position])) { + // index is not null && value is not null + self->WriteValue(position); + bit_util::SetBit(out_is_valid, position); + ++valid_count; + } else if constexpr (!kOutputIsZeroInitialized) { + self->WriteZero(position); + } + ++position; + } + } else { + if constexpr (!kOutputIsZeroInitialized) { + self->WriteZeroSegment(position, block.length); + } + position += block.length; + } + } + } + return valid_count; + } +}; + +// A gather primitive for primitive fixed-width types with a integral byte width. If +// `kWithFactor` is true, the actual width is a runtime multiple of `kValueWidthInbits` +// (this can be useful for fixed-size list inputs and other input types with unusual byte +// widths that don't deserve value specialization). +template +class Gather : public GatherBaseCRTP> { + public: + static_assert(kValueWidthInBits >= 0 && kValueWidthInBits % 8 == 0); + static constexpr int kValueWidth = kValueWidthInBits / 8; + + private: + const int64_t src_length_; // number of elements of kValueWidth bytes in src_ + const uint8_t* src_; + const int64_t idx_length_; // number IndexCType elements in idx_ + const IndexCType* idx_; + uint8_t* out_; + int64_t factor_; + + public: + void WriteValue(int64_t position) { + if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memcpy(out_ + position * scaled_factor, src_ + idx_[position] * scaled_factor, + scaled_factor); + } else { + memcpy(out_ + position * kValueWidth, src_ + idx_[position] * kValueWidth, + kValueWidth); + } + } + + void WriteZero(int64_t position) { + if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memset(out_ + position * scaled_factor, 0, scaled_factor); + } else { + memset(out_ + position * kValueWidth, 0, kValueWidth); + } + } + + void WriteZeroSegment(int64_t position, int64_t length) { + if constexpr (kWithFactor) { + const int64_t scaled_factor = kValueWidth * factor_; + memset(out_ + position * scaled_factor, 0, length * scaled_factor); + } else { + memset(out_ + position * kValueWidth, 0, length * kValueWidth); + } + } + + public: + Gather(int64_t src_length, const uint8_t* src, int64_t zero_src_offset, + int64_t idx_length, const IndexCType* idx, uint8_t* out, int64_t factor) + : src_length_(src_length), + src_(src), + idx_length_(idx_length), + idx_(idx), + out_(out), + factor_(factor) { + assert(zero_src_offset == 0); + assert(src && idx && out); + assert((kWithFactor || factor == 1) && + "When kWithFactor is false, the factor is assumed to be 1 at compile time"); + } + + ARROW_FORCE_INLINE int64_t Execute() { return this->ExecuteNoNulls(idx_length_); } + + /// \pre If kOutputIsZeroInitialized, then this->out_ has to be zero initialized. + /// \pre Bits in out_is_valid have to always be zero initialized. + /// \post The bits for the valid elements (and only those) are set in out_is_valid. + /// \post If !kOutputIsZeroInitialized, then positions in this->_out containing null + /// elements have 0s written to them. This might be less efficient than + /// zero-initializing first and calling this->Execute() afterwards. + /// \return The number of valid elements in out. + template + ARROW_FORCE_INLINE int64_t Execute(const ArraySpan& src_validity, + const ArraySpan& idx_validity, + uint8_t* out_is_valid) { + assert(src_length_ == src_validity.length); + assert(idx_length_ == idx_validity.length); + assert(out_is_valid); + return this->template ExecuteWithNulls( + src_validity, idx_length_, idx_, idx_validity, out_is_valid); + } +}; + +// A gather primitive for boolean inputs. Unlike its counterpart above, +// this does not support passing a non-trivial factor parameter. +template +class Gather + : public GatherBaseCRTP> { + private: + const int64_t src_length_; // number of elements of bits bytes in src_ after offset + const uint8_t* src_; // the boolean array data buffer in bits + const int64_t src_offset_; // offset in bits + const int64_t idx_length_; // number IndexCType elements in idx_ + const IndexCType* idx_; + uint8_t* out_; // output boolean array data buffer in bits + + public: + Gather(int64_t src_length, const uint8_t* src, int64_t src_offset, int64_t idx_length, + const IndexCType* idx, uint8_t* out, int64_t factor) + : src_length_(src_length), + src_(src), + src_offset_(src_offset), + idx_length_(idx_length), + idx_(idx), + out_(out) { + assert(src && idx && out); + assert(factor == 1 && + "factor != 1 is not supported when Gather is used to gather bits/booleans"); + } + + void WriteValue(int64_t position) { + bit_util::SetBitTo(out_, position, + bit_util::GetBit(src_, src_offset_ + idx_[position])); + } + + void WriteZero(int64_t position) { bit_util::ClearBit(out_, position); } + + void WriteZeroSegment(int64_t position, int64_t block_length) { + bit_util::SetBitsTo(out_, position, block_length, false); + } + + ARROW_FORCE_INLINE int64_t Execute() { return this->ExecuteNoNulls(idx_length_); } + + /// \pre If kOutputIsZeroInitialized, then this->out_ has to be zero initialized. + /// \pre Bits in out_is_valid have to always be zero initialized. + /// \post The bits for the valid elements (and only those) are set in out_is_valid. + /// \post If !kOutputIsZeroInitialized, then positions in this->_out containing null + /// elements have 0s written to them. This might be less efficient than + /// zero-initializing first and calling this->Execute() afterwards. + /// \return The number of valid elements in out. + template + ARROW_FORCE_INLINE int64_t Execute(const ArraySpan& src_validity, + const ArraySpan& idx_validity, + uint8_t* out_is_valid) { + assert(src_length_ == src_validity.length); + assert(idx_length_ == idx_validity.length); + assert(out_is_valid); + return this->template ExecuteWithNulls( + src_validity, idx_length_, idx_, idx_validity, out_is_valid); + } +}; + +} // namespace arrow::internal diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc index a6d7f6097b59b..f60d8f2e19e98 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cast_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cast_test.cc @@ -2281,7 +2281,7 @@ TEST(Cast, ListToPrimitive) { Cast(*ArrayFromJSON(list(binary()), R"([["1", "2"], ["3", "4"]])"), utf8())); } -using make_list_t = std::shared_ptr(const std::shared_ptr&); +using make_list_t = std::shared_ptr(std::shared_ptr); static const auto list_factories = std::vector{&list, &large_list}; diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 44bb7372c3f68..5067298858132 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -698,13 +698,12 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty) DCHECK_OK(func->AddKernel(base)); } - // Example parametric types that we want to match only on Type::type - auto parametric_types = {time32(TimeUnit::SECOND), time64(TimeUnit::MICRO), - timestamp(TimeUnit::SECOND), duration(TimeUnit::SECOND), - fixed_size_binary(0)}; - for (const auto& ty : parametric_types) { - base.init = GetHashInit(ty->id()); - base.signature = KernelSignature::Make({ty->id()}, out_ty); + // Parametric types that we want matching to be dependent only on type id + auto parametric_types = {Type::TIME32, Type::TIME64, Type::TIMESTAMP, Type::DURATION, + Type::FIXED_SIZE_BINARY}; + for (const auto& type_id : parametric_types) { + base.init = GetHashInit(type_id); + base.signature = KernelSignature::Make({type_id}, out_ty); DCHECK_OK(func->AddKernel(base)); } diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc index 2ba660e49ac38..1009bea5e7b1b 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.cc @@ -547,39 +547,6 @@ struct VarBinarySelectionImpl : public Selection, T } }; -struct FSBSelectionImpl : public Selection { - using Base = Selection; - LIFT_BASE_MEMBERS(); - - TypedBufferBuilder data_builder; - - FSBSelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length, - ExecResult* out) - : Base(ctx, batch, output_length, out), data_builder(ctx->memory_pool()) {} - - template - Status GenerateOutput() { - FixedSizeBinaryArray typed_values(this->values.ToArrayData()); - int32_t value_size = typed_values.byte_width(); - - RETURN_NOT_OK(data_builder.Reserve(value_size * output_length)); - Adapter adapter(this); - return adapter.Generate( - [&](int64_t index) { - auto val = typed_values.GetView(index); - data_builder.UnsafeAppend(reinterpret_cast(val.data()), - value_size); - return Status::OK(); - }, - [&]() { - data_builder.UnsafeAppend(value_size, static_cast(0x00)); - return Status::OK(); - }); - } - - Status Finish() override { return data_builder.Finish(&out->buffers[1]); } -}; - template struct ListSelectionImpl : public Selection, Type> { using offset_type = typename Type::offset_type; @@ -939,23 +906,6 @@ Status LargeVarBinaryTakeExec(KernelContext* ctx, const ExecSpan& batch, return TakeExec>(ctx, batch, out); } -Status FSBTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - const ArraySpan& values = batch[0].array; - const auto byte_width = values.type->byte_width(); - // Use primitive Take implementation (presumably faster) for some byte widths - switch (byte_width) { - case 1: - case 2: - case 4: - case 8: - case 16: - case 32: - return PrimitiveTakeExec(ctx, batch, out); - default: - return TakeExec(ctx, batch, out); - } -} - Status ListTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { return TakeExec>(ctx, batch, out); } @@ -968,26 +918,12 @@ Status FSLTakeExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { const ArraySpan& values = batch[0].array; // If a FixedSizeList wraps a fixed-width type we can, in some cases, use - // PrimitiveTakeExec for a fixed-size list array. + // FixedWidthTakeExec for a fixed-size list array. if (util::IsFixedWidthLike(values, /*force_null_count=*/true, /*exclude_bool_and_dictionary=*/true)) { - const auto byte_width = util::FixedWidthInBytes(*values.type); - // Additionally, PrimitiveTakeExec is only implemented for specific byte widths. - // TODO(GH-41301): Extend PrimitiveTakeExec for any fixed-width type. - switch (byte_width) { - case 1: - case 2: - case 4: - case 8: - case 16: - case 32: - return PrimitiveTakeExec(ctx, batch, out); - default: - break; // fallback to TakeExec - } + return FixedWidthTakeExec(ctx, batch, out); } - return TakeExec(ctx, batch, out); } diff --git a/cpp/src/arrow/compute/kernels/vector_selection_internal.h b/cpp/src/arrow/compute/kernels/vector_selection_internal.h index a169f4b38a2b8..c5075d6dfe87b 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_internal.h +++ b/cpp/src/arrow/compute/kernels/vector_selection_internal.h @@ -73,8 +73,7 @@ Status MapFilterExec(KernelContext*, const ExecSpan&, ExecResult*); Status VarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*); Status LargeVarBinaryTakeExec(KernelContext*, const ExecSpan&, ExecResult*); -Status PrimitiveTakeExec(KernelContext*, const ExecSpan&, ExecResult*); -Status FSBTakeExec(KernelContext*, const ExecSpan&, ExecResult*); +Status FixedWidthTakeExec(KernelContext*, const ExecSpan&, ExecResult*); Status ListTakeExec(KernelContext*, const ExecSpan&, ExecResult*); Status LargeListTakeExec(KernelContext*, const ExecSpan&, ExecResult*); Status FSLTakeExec(KernelContext*, const ExecSpan&, ExecResult*); diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 1a9af0efcd700..dee80e9d258fb 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "arrow/array/builder_primitive.h" @@ -27,6 +28,7 @@ #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/kernels/gather_internal.h" #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/compute/kernels/vector_selection_take_internal.h" #include "arrow/memory_pool.h" @@ -324,238 +326,79 @@ namespace { using TakeState = OptionsWrapper; // ---------------------------------------------------------------------- -// Implement optimized take for primitive types from boolean to 1/2/4/8/16/32-byte -// C-type based types. Use common implementation for every byte width and only -// generate code for unsigned integer indices, since after boundschecking to -// check for negative numbers in the indices we can safely reinterpret_cast -// signed integers as unsigned. - -/// \brief The Take implementation for primitive (fixed-width) types does not -/// use the logical Arrow type but rather the physical C type. This way we -/// only generate one take function for each byte width. +// Implement optimized take for primitive types from boolean to +// 1/2/4/8/16/32-byte C-type based types and fixed-size binary (0 or more +// bytes). +// +// Use one specialization for each of these primitive byte-widths so the +// compiler can specialize the memcpy to dedicated CPU instructions and for +// fixed-width binary use the 1-byte specialization but pass WithFactor=true +// that makes the kernel consider the factor parameter provided at runtime. +// +// Only unsigned index types need to be instantiated since after +// boundschecking to check for negative numbers in the indices we can safely +// reinterpret_cast signed integers as unsigned. + +/// \brief The Take implementation for primitive types and fixed-width binary. /// /// Also note that this function can also handle fixed-size-list arrays if /// they fit the criteria described in fixed_width_internal.h, so use the /// function defined in that file to access values and destination pointers /// and DO NOT ASSUME `values.type()` is a primitive type. /// +/// NOTE: Template parameters are types instead of values to let +/// `TakeIndexDispatch<>` forward `typename... Args` after the index type. +/// /// \pre the indices have been boundschecked -template -struct PrimitiveTakeImpl { - static constexpr int kValueWidth = ValueWidthConstant::value; - - static void Exec(const ArraySpan& values, const ArraySpan& indices, - ArrayData* out_arr) { - DCHECK_EQ(util::FixedWidthInBytes(*values.type), kValueWidth); - const auto* values_data = util::OffsetPointerOfFixedByteWidthValues(values); - const uint8_t* values_is_valid = values.buffers[0].data; - auto values_offset = values.offset; - - const auto* indices_data = indices.GetValues(1); - const uint8_t* indices_is_valid = indices.buffers[0].data; - auto indices_offset = indices.offset; - - DCHECK_EQ(out_arr->offset, 0); - auto* out = util::MutableFixedWidthValuesPointer(out_arr); - auto out_is_valid = out_arr->buffers[0]->mutable_data(); - - // If either the values or indices have nulls, we preemptively zero out the - // out validity bitmap so that we don't have to use ClearBit in each - // iteration for nulls. - if (values.null_count != 0 || indices.null_count != 0) { - bit_util::SetBitsTo(out_is_valid, 0, indices.length, false); - } - - auto WriteValue = [&](int64_t position) { - memcpy(out + position * kValueWidth, - values_data + indices_data[position] * kValueWidth, kValueWidth); - }; - - auto WriteZero = [&](int64_t position) { - memset(out + position * kValueWidth, 0, kValueWidth); - }; - - auto WriteZeroSegment = [&](int64_t position, int64_t length) { - memset(out + position * kValueWidth, 0, kValueWidth * length); - }; - - OptionalBitBlockCounter indices_bit_counter(indices_is_valid, indices_offset, - indices.length); - int64_t position = 0; +template +struct FixedWidthTakeImpl { + static constexpr int kValueWidthInBits = ValueBitWidthConstant::value; + + static Status Exec(KernelContext* ctx, const ArraySpan& values, + const ArraySpan& indices, ArrayData* out_arr, int64_t factor) { +#ifndef NDEBUG + int64_t bit_width = util::FixedWidthInBits(*values.type); + DCHECK(WithFactor::value || (kValueWidthInBits == bit_width && factor == 1)); + DCHECK(!WithFactor::value || + (factor > 0 && kValueWidthInBits == 8 && // factors are used with bytes + static_cast(factor * kValueWidthInBits) == bit_width)); +#endif + const bool out_has_validity = values.MayHaveNulls() || indices.MayHaveNulls(); + + const uint8_t* src; + int64_t src_offset; + std::tie(src_offset, src) = util::OffsetPointerOfFixedBitWidthValues(values); + uint8_t* out = util::MutableFixedWidthValuesPointer(out_arr); int64_t valid_count = 0; - while (position < indices.length) { - BitBlockCount block = indices_bit_counter.NextBlock(); - if (values.null_count == 0) { - // Values are never null, so things are easier - valid_count += block.popcount; - if (block.popcount == block.length) { - // Fastest path: neither values nor index nulls - bit_util::SetBitsTo(out_is_valid, position, block.length, true); - for (int64_t i = 0; i < block.length; ++i) { - WriteValue(position); - ++position; - } - } else if (block.popcount > 0) { - // Slow path: some indices but not all are null - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(indices_is_valid, indices_offset + position)) { - // index is not null - bit_util::SetBit(out_is_valid, position); - WriteValue(position); - } else { - WriteZero(position); - } - ++position; - } - } else { - WriteZeroSegment(position, block.length); - position += block.length; - } - } else { - // Values have nulls, so we must do random access into the values bitmap - if (block.popcount == block.length) { - // Faster path: indices are not null but values may be - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(values_is_valid, - values_offset + indices_data[position])) { - // value is not null - WriteValue(position); - bit_util::SetBit(out_is_valid, position); - ++valid_count; - } else { - WriteZero(position); - } - ++position; - } - } else if (block.popcount > 0) { - // Slow path: some but not all indices are null. Since we are doing - // random access in general we have to check the value nullness one by - // one. - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(indices_is_valid, indices_offset + position) && - bit_util::GetBit(values_is_valid, - values_offset + indices_data[position])) { - // index is not null && value is not null - WriteValue(position); - bit_util::SetBit(out_is_valid, position); - ++valid_count; - } else { - WriteZero(position); - } - ++position; - } - } else { - WriteZeroSegment(position, block.length); - position += block.length; - } - } - } - out_arr->null_count = out_arr->length - valid_count; - } -}; - -template -struct BooleanTakeImpl { - static void Exec(const ArraySpan& values, const ArraySpan& indices, - ArrayData* out_arr) { - const uint8_t* values_data = values.buffers[1].data; - const uint8_t* values_is_valid = values.buffers[0].data; - auto values_offset = values.offset; - - const auto* indices_data = indices.GetValues(1); - const uint8_t* indices_is_valid = indices.buffers[0].data; - auto indices_offset = indices.offset; - - auto out = out_arr->buffers[1]->mutable_data(); - auto out_is_valid = out_arr->buffers[0]->mutable_data(); - auto out_offset = out_arr->offset; - - // If either the values or indices have nulls, we preemptively zero out the - // out validity bitmap so that we don't have to use ClearBit in each - // iteration for nulls. - if (values.null_count != 0 || indices.null_count != 0) { - bit_util::SetBitsTo(out_is_valid, out_offset, indices.length, false); - } - // Avoid uninitialized data in values array - bit_util::SetBitsTo(out, out_offset, indices.length, false); - - auto PlaceDataBit = [&](int64_t loc, IndexCType index) { - bit_util::SetBitTo(out, out_offset + loc, - bit_util::GetBit(values_data, values_offset + index)); - }; - - OptionalBitBlockCounter indices_bit_counter(indices_is_valid, indices_offset, - indices.length); - int64_t position = 0; - int64_t valid_count = 0; - while (position < indices.length) { - BitBlockCount block = indices_bit_counter.NextBlock(); - if (values.null_count == 0) { - // Values are never null, so things are easier - valid_count += block.popcount; - if (block.popcount == block.length) { - // Fastest path: neither values nor index nulls - bit_util::SetBitsTo(out_is_valid, out_offset + position, block.length, true); - for (int64_t i = 0; i < block.length; ++i) { - PlaceDataBit(position, indices_data[position]); - ++position; - } - } else if (block.popcount > 0) { - // Slow path: some but not all indices are null - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(indices_is_valid, indices_offset + position)) { - // index is not null - bit_util::SetBit(out_is_valid, out_offset + position); - PlaceDataBit(position, indices_data[position]); - } - ++position; - } - } else { - position += block.length; - } - } else { - // Values have nulls, so we must do random access into the values bitmap - if (block.popcount == block.length) { - // Faster path: indices are not null but values may be - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(values_is_valid, - values_offset + indices_data[position])) { - // value is not null - bit_util::SetBit(out_is_valid, out_offset + position); - PlaceDataBit(position, indices_data[position]); - ++valid_count; - } - ++position; - } - } else if (block.popcount > 0) { - // Slow path: some but not all indices are null. Since we are doing - // random access in general we have to check the value nullness one by - // one. - for (int64_t i = 0; i < block.length; ++i) { - if (bit_util::GetBit(indices_is_valid, indices_offset + position)) { - // index is not null - if (bit_util::GetBit(values_is_valid, - values_offset + indices_data[position])) { - // value is not null - PlaceDataBit(position, indices_data[position]); - bit_util::SetBit(out_is_valid, out_offset + position); - ++valid_count; - } - } - ++position; - } - } else { - position += block.length; - } - } + arrow::internal::Gather gather{ + /*src_length=*/values.length, + src, + src_offset, + /*idx_length=*/indices.length, + /*idx=*/indices.GetValues(1), + out, + factor}; + if (out_has_validity) { + DCHECK_EQ(out_arr->offset, 0); + // out_is_valid must be zero-initiliazed, because Gather::Execute + // saves time by not having to ClearBit on every null element. + auto out_is_valid = out_arr->GetMutableValues(0); + memset(out_is_valid, 0, bit_util::BytesForBits(out_arr->length)); + valid_count = gather.template Execute( + /*src_validity=*/values, /*idx_validity=*/indices, out_is_valid); + } else { + valid_count = gather.Execute(); } out_arr->null_count = out_arr->length - valid_count; + return Status::OK(); } }; template