diff --git a/NOTICE b/NOTICE
index 5e01c7e14c..4c06d1da90 100644
--- a/NOTICE
+++ b/NOTICE
@@ -17,4 +17,24 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
-limitations under the License.
\ No newline at end of file
+limitations under the License.
+
+--------------------------------------------------------------------------------
+
+This project includes code from flatbuffers (https://github.com/google/flatbuffers).
+
+Copyright 2021 Google Inc. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+--------------------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 24daa4635e..c13cdd2030 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
OFF
OFF
ON
+ ON
false
false
cuda11
@@ -345,6 +346,8 @@
${cuda.version}-arm64
+
+ OFF
@@ -457,6 +460,7 @@
+
libcufilejni.so
+
+ ${native.build.path}/profiler
+
+ libprofilerjni.so
+
+
${libcudfjni.build.path}
diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt
index fd9d671d6f..88d48e1587 100644
--- a/src/main/cpp/CMakeLists.txt
+++ b/src/main/cpp/CMakeLists.txt
@@ -44,6 +44,7 @@ option(USE_GDS "Build with GPUDirect Storage (GDS)/cuFile support" OFF)
option(BUILD_TESTS "Configure CMake to build tests" OFF)
option(BUILD_BENCHMARKS "Configure CMake to build (google) benchmarks" OFF)
option(BUILD_FAULTINJ "Configure CMake to build fault injection" ON)
+option(BUILD_PROFILER "Configure CMake to build profiler" ON)
message(
VERBOSE "SPARK_RAPIDS_JNI: Build with per-thread default stream:
@@ -60,6 +61,12 @@ set(SPARK_RAPIDS_JNI_CUDA_DEFINITIONS "")
set(SPARK_RAPIDS_JNI_BUILD_TESTS ${BUILD_TESTS})
set(SPARK_RAPIDS_JNI_BUILD_BENCHMARKS ${BUILD_BENCHMARKS})
set(SPARK_RAPIDS_JNI_BUILD_FAULTINJ ${BUILD_FAULTINJ})
+if(NOT SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR)
+ set(SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR ${SPARK_RAPIDS_JNI_BINARY_DIR}/generated/include)
+endif()
+if(NOT SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR)
+ set(SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR ${SPARK_RAPIDS_JNI_BINARY_DIR}/generated/src)
+endif()
# Set RMM logging level
set(RMM_LOGGING_LEVEL
@@ -94,6 +101,21 @@ include(cmake/Modules/ConfigureCUDA.cmake) # set other CUDA compilation flags
# ##################################################################################################
# * dependencies ----------------------------------------------------------------------------------
+# version header
+find_package(Git REQUIRED)
+execute_process(COMMAND
+ "${GIT_EXECUTABLE}" describe --abbrev=40 --always --dirty --long
+ WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}"
+ OUTPUT_VARIABLE SPARK_RAPIDS_JNI_COMMIT_DETAILS
+ ERROR_QUIET
+ OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+configure_file(
+ src/spark_rapids_jni_version.cpp.in
+ "${SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR}/spark_rapids_jni_version.cpp"
+ @ONLY
+)
+
# find NVTX
include(${CUDF_DIR}/cpp/cmake/thirdparty/get_nvtx.cmake)
@@ -256,7 +278,7 @@ add_dependencies(cudfjnistub spark_rapids_jni)
if(USE_GDS)
include(${CUDF_DIR}/cpp/cmake/Modules/FindcuFile.cmake)
find_library(CUFILEJNI_LIB "libcufilejni.a" REQUIRED NO_DEFAULT_PATH
- HINTS "${PROJECT_BINARY_DIR}/../libcudfjni"
+ HINTS "${SPARK_RAPIDS_JNI_BINARY_DIR}/../libcudfjni"
)
add_library(cufilejni SHARED src/emptyfile.cpp)
set_target_properties(
@@ -300,3 +322,7 @@ endif()
if(SPARK_RAPIDS_JNI_BUILD_FAULTINJ)
add_subdirectory(faultinj)
endif()
+
+if(BUILD_PROFILER)
+ add_subdirectory(profiler)
+endif()
diff --git a/src/main/cpp/cmake/get_flatbuffers.cmake b/src/main/cpp/cmake/get_flatbuffers.cmake
new file mode 100644
index 0000000000..c7e0dfb549
--- /dev/null
+++ b/src/main/cpp/cmake/get_flatbuffers.cmake
@@ -0,0 +1,33 @@
+# =============================================================================
+# Copyright (c) 2024, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+# =============================================================================
+
+# Use CPM to find or clone flatbuffers
+function(find_and_configure_flatbuffers VERSION)
+
+ rapids_cpm_find(
+ flatbuffers ${VERSION}
+ GLOBAL_TARGETS flatbuffers
+ CPM_ARGS
+ GIT_REPOSITORY https://github.com/google/flatbuffers.git
+ GIT_TAG v${VERSION}
+ GIT_SHALLOW TRUE
+ )
+
+ rapids_export_find_package_root(
+ BUILD flatbuffers "${flatbuffers_BINARY_DIR}" EXPORT_SET profilerjni-exports
+ )
+
+endfunction()
+
+find_and_configure_flatbuffers(24.3.25)
diff --git a/src/main/cpp/profiler/CMakeLists.txt b/src/main/cpp/profiler/CMakeLists.txt
new file mode 100644
index 0000000000..03a552b3ea
--- /dev/null
+++ b/src/main/cpp/profiler/CMakeLists.txt
@@ -0,0 +1,98 @@
+# =============================================================================
+# Copyright (c) 2024, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+# =============================================================================
+
+include(../cmake/get_flatbuffers.cmake)
+
+# ##################################################################################################
+# * flatbuffer generation---------------------------------------------------------------------------
+
+set(SPARK_RAPIDS_JNI_FBS_DIR "${SPARK_RAPIDS_JNI_SOURCE_DIR}/../fbs")
+add_custom_command(
+ OUTPUT ${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}/profiler_generated.h
+ DEPENDS "${SPARK_RAPIDS_JNI_FBS_DIR}/profiler.fbs"
+ WORKING_DIRECTORY "${SPARK_RAPIDS_JNI_FBS_DIR}"
+ VERBATIM
+ COMMAND ${CMAKE_COMMAND} -E make_directory "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}"
+ COMMAND
+ $ --cpp -o "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}" profiler.fbs
+ COMMENT "Generating profiler flatbuffer code"
+)
+
+# ##################################################################################################
+# * profiler JNI -----------------------------------------------------------------------------------
+
+add_library(profilerjni SHARED
+ ProfilerJni.cpp
+ profiler_debug.cpp
+ profiler_serializer.cpp
+ "${SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR}/spark_rapids_jni_version.cpp"
+ "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}/profiler_generated.h"
+)
+
+set_target_properties(
+ profilerjni
+ PROPERTIES BUILD_RPATH "\$ORIGIN"
+ INSTALL_RPATH "\$ORIGIN"
+ # set target compile options
+ CXX_STANDARD 17
+ CXX_STANDARD_REQUIRED ON
+ CXX_VISIBILITY_PRESET "hidden"
+ VISIBILITY_INLINES_HIDDEN TRUE
+)
+
+target_include_directories(
+ profilerjni
+ PRIVATE "${JNI_INCLUDE_DIRS}"
+ "${CUDAToolkit_INCLUDE_DIRS}"
+ "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}"
+ "${SPARK_RAPIDS_JNI_SOURCE_DIR}/src"
+)
+
+find_library(CUPTI_LIBRARY_PATH cupti_static PATHS
+ "/usr/local/cuda/lib64"
+ "/usr/local/cuda/extras/CUPTI/lib64"
+)
+
+target_link_libraries(profilerjni
+ PRIVATE ${CUPTI_LIBRARY_PATH} nvtx3::nvtx3-cpp flatbuffers::flatbuffers
+)
+
+file(READ "${SPARK_RAPIDS_JNI_FBS_DIR}/profiler.fbs" SPARK_RAPIDS_JNI_PROFILER_SCHEMA)
+configure_file(
+ profiler_schema.cpp.in
+ "${SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR}/profiler_schema.cpp"
+ @ONLY
+)
+
+add_executable(spark_rapids_profile_converter
+ spark_rapids_profile_converter.cpp
+ "${SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR}/profiler_schema.cpp"
+ "${SPARK_RAPIDS_JNI_GENERATED_SOURCE_DIR}/spark_rapids_jni_version.cpp"
+ "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}/profiler_generated.h"
+)
+
+target_include_directories(
+ spark_rapids_profile_converter
+ PRIVATE
+ "${CUDAToolkit_INCLUDE_DIRS}"
+ "${SPARK_RAPIDS_JNI_SOURCE_DIR}/src"
+ "${SPARK_RAPIDS_JNI_GENERATED_INCLUDE_DIR}"
+)
+
+target_link_libraries(spark_rapids_profile_converter
+ "${CUPTI_LIBRARY_PATH}"
+ flatbuffers::flatbuffers
+ dl
+ pthread
+ rt)
diff --git a/src/main/cpp/profiler/ProfilerJni.cpp b/src/main/cpp/profiler/ProfilerJni.cpp
new file mode 100644
index 0000000000..1271b89d7b
--- /dev/null
+++ b/src/main/cpp/profiler/ProfilerJni.cpp
@@ -0,0 +1,527 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "profiler_generated.h"
+#include "profiler_serializer.hpp"
+#include "spark_rapids_jni_version.h"
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// Set this to true to have each CUPTI buffer dumped to stderr as it arrives.
+#define PROFILER_DEBUG_LOG_BUFFER 0
+
+#define JNI_EXCEPTION_OCCURRED_CHECK(env, ret_val) \
+ { \
+ if (env->ExceptionOccurred()) { return ret_val; } \
+ }
+
+#define JNI_THROW_NEW(env, class_name, message, ret_val) \
+ { \
+ jclass ex_class = env->FindClass(class_name); \
+ if (ex_class == NULL) { return ret_val; } \
+ env->ThrowNew(ex_class, message); \
+ return ret_val; \
+ }
+
+#define CATCH_STD_CLASS(env, class_name, ret_val) \
+ catch (const std::exception& e) { JNI_THROW_NEW(env, class_name, e.what(), ret_val) }
+
+#define CATCH_STD(env, ret_val) CATCH_STD_CLASS(env, "java/lang/RuntimeException", ret_val)
+
+namespace spark_rapids_jni::profiler {
+
+namespace {
+
+// Encapsulates a buffer of profile data
+struct profile_buffer {
+ explicit profile_buffer(size_t size) : size_(size), valid_size_(0)
+ {
+ auto err = posix_memalign(reinterpret_cast(&data_), ALIGN_BYTES, size_);
+ if (err != 0) {
+ std::cerr << "PROFILER: Failed to allocate CUPTI buffer: " << strerror(err) << std::endl;
+ data_ = nullptr;
+ size_ = 0;
+ }
+ }
+
+ profile_buffer(uint8_t* data, size_t size, size_t valid_size)
+ : data_(data), size_(size), valid_size_(valid_size)
+ {
+ }
+
+ // Disconnects the underlying buffer of memory from the instance.
+ // The caller is responsible for freeing the resulting buffer.
+ void release(uint8_t** data_ptr_ptr, size_t* size_ptr)
+ {
+ *data_ptr_ptr = data_;
+ *size_ptr = size_;
+ data_ = nullptr;
+ size_ = 0;
+ }
+
+ ~profile_buffer()
+ {
+ free(data_);
+ data_ = nullptr;
+ size_ = 0;
+ }
+
+ uint8_t const* data() const { return data_; }
+ uint8_t* data() { return data_; }
+ size_t size() const { return size_; }
+ size_t valid_size() const { return valid_size_; }
+ void set_valid_size(size_t size) { valid_size_ = size; }
+
+ private:
+ static constexpr size_t ALIGN_BYTES = 8;
+ uint8_t* data_;
+ size_t size_;
+ size_t valid_size_;
+};
+
+// Queue of profile buffers that have been filled with profile data.
+struct completed_buffer_queue {
+ // Gets the next available buffer of profile data, blocking until a buffer is available
+ // or the queue is shutdown. If the queue is shutdown, a nullptr is returned.
+ std::unique_ptr get()
+ {
+ std::unique_lock lock(lock_);
+ cv_.wait(lock, [this] { return shutdown_ || buffers_.size() > 0; });
+ if (buffers_.size() > 0) {
+ auto result = std::move(buffers_.front());
+ buffers_.pop();
+ return result;
+ }
+ return std::unique_ptr(nullptr);
+ }
+
+ void put(std::unique_ptr&& buffer)
+ {
+ std::unique_lock lock(lock_);
+ if (!shutdown_) {
+ buffers_.push(std::move(buffer));
+ lock.unlock();
+ cv_.notify_one();
+ }
+ }
+
+ void shutdown()
+ {
+ std::unique_lock lock(lock_);
+ shutdown_ = true;
+ lock.unlock();
+ cv_.notify_one();
+ }
+
+ private:
+ std::mutex lock_;
+ std::condition_variable cv_;
+ std::queue> buffers_;
+ bool shutdown_ = false;
+};
+
+// Stack of profile buffers that are ready to be filled with profile data.
+struct free_buffer_tracker {
+ explicit free_buffer_tracker(size_t size) : buffer_size_(size) {}
+
+ // Returns the next available profile buffer or creates one if none are available.
+ std::unique_ptr get()
+ {
+ {
+ std::lock_guard lock(lock_);
+ if (buffers_.size() > 0) {
+ auto result = std::move(buffers_.top());
+ buffers_.pop();
+ return result;
+ }
+ }
+ return std::make_unique(buffer_size_);
+ }
+
+ void put(std::unique_ptr&& buffer)
+ {
+ buffer->set_valid_size(0);
+ std::lock_guard lock(lock_);
+ if (buffers_.size() < NUM_CACHED_BUFFERS) {
+ buffers_.push(std::move(buffer));
+ } else {
+ buffer.reset(nullptr);
+ }
+ }
+
+ private:
+ static constexpr size_t NUM_CACHED_BUFFERS = 2;
+ std::mutex lock_;
+ std::stack> buffers_;
+ size_t buffer_size_;
+};
+
+void writer_thread_process(JavaVM* vm,
+ jobject j_writer,
+ size_t buffer_size,
+ size_t flush_threshold);
+
+struct subscriber_state {
+ CUpti_SubscriberHandle subscriber_handle;
+ jobject j_writer;
+ std::thread writer_thread;
+ free_buffer_tracker free_buffers;
+ completed_buffer_queue completed_buffers;
+ bool has_cupti_callback_errored;
+ bool is_shutdown;
+
+ subscriber_state(jobject writer, size_t buffer_size)
+ : j_writer(writer),
+ free_buffers(buffer_size),
+ has_cupti_callback_errored(false),
+ is_shutdown(false)
+ {
+ }
+};
+
+// Global variables
+subscriber_state* State = nullptr;
+uint64_t Flush_period_msec;
+std::atomic_uint64_t Last_flush_time_msec;
+
+JavaVM* get_jvm(JNIEnv* env)
+{
+ JavaVM* vm;
+ if (env->GetJavaVM(&vm) != 0) { throw std::runtime_error("Unable to get JavaVM"); }
+ return vm;
+}
+
+JNIEnv* attach_to_jvm(JavaVM* vm)
+{
+ JavaVMAttachArgs args;
+ args.version = JNI_VERSION_1_6;
+ args.name = const_cast("profiler writer");
+ args.group = nullptr;
+ JNIEnv* env;
+ if (vm->AttachCurrentThread(reinterpret_cast(&env), &args) != JNI_OK) {
+ char const* msg = "PROFILER: unable to attach to JVM";
+ std::cerr << msg << std::endl;
+ throw std::runtime_error(msg);
+ }
+ return env;
+}
+
+char const* get_cupti_error(CUptiResult rc)
+{
+ char const* err;
+ if (cuptiGetResultString(rc, &err) != CUPTI_SUCCESS) { err = "UNKNOWN"; }
+ return err;
+}
+
+void check_cupti(CUptiResult rc, std::string msg)
+{
+ if (rc != CUPTI_SUCCESS) { throw std::runtime_error(msg + ": " + get_cupti_error(rc)); }
+}
+
+uint64_t timestamp_now()
+{
+ timespec info;
+ if (clock_gettime(CLOCK_MONOTONIC_RAW, &info) != 0) {
+ static bool have_logged_error = false;
+ if (!have_logged_error) {
+ std::cerr << "PROFILER: Unable to determine current time!" << std::endl;
+ have_logged_error = true;
+ }
+ // No idea what time it is, so return the last flush time which will effectively
+ // disable periodic flushing but avoid pathologic flushing on every kernel launch.
+ return Last_flush_time_msec;
+ }
+ return info.tv_sec * 1e3 + info.tv_nsec / 1e6;
+}
+
+void on_driver_launch_exit()
+{
+ auto now = timestamp_now();
+ if (now - Last_flush_time_msec >= Flush_period_msec) {
+ auto rc = cuptiActivityFlushAll(0);
+ if (rc != CUPTI_SUCCESS) {
+ std::cerr << "PROFILER: Error interval flushing records: " << get_cupti_error(rc)
+ << std::endl;
+ }
+ Last_flush_time_msec = now;
+ }
+}
+
+void domain_driver_callback(CUpti_CallbackId callback_id, CUpti_CallbackData const* cb_data)
+{
+ if (cb_data->callbackSite == CUPTI_API_ENTER) { return; }
+
+ switch (callback_id) {
+ case CUPTI_DRIVER_TRACE_CBID_cuGraphLaunch:
+ case CUPTI_DRIVER_TRACE_CBID_cuGraphLaunch_ptsz:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunch:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernel:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernel_ptsz:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernelMultiDevice:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchGrid:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchGridAsync:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel:
+ case CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel_ptsz: on_driver_launch_exit(); break;
+ default:
+ std::cerr << "PROFILER: Unexpected driver API callback for " << callback_id << std::endl;
+ break;
+ }
+}
+
+void domain_runtime_callback(CUpti_CallbackId callback_id, CUpti_CallbackData const* data_ptr)
+{
+ switch (callback_id) {
+ case CUPTI_RUNTIME_TRACE_CBID_cudaDeviceReset_v3020:
+ if (data_ptr->callbackSite == CUPTI_API_ENTER) {
+ auto rc = cuptiActivityFlushAll(0);
+ if (rc != CUPTI_SUCCESS) {
+ std::cerr << "PROFILER: Error flushing CUPTI activity on device reset: "
+ << get_cupti_error(rc) << std::endl;
+ }
+ }
+ break;
+ default: break;
+ }
+}
+
+// Invoked by CUPTI when something occurs for which we previously requested a callback.
+void CUPTIAPI callback_handler(void*,
+ CUpti_CallbackDomain domain,
+ CUpti_CallbackId callback_id,
+ const void* callback_data_ptr)
+{
+ auto rc = cuptiGetLastError();
+ if (rc != CUPTI_SUCCESS && !State->has_cupti_callback_errored) {
+ // State->has_cupti_callback_errored = true;
+ std::cerr << "PROFILER: Error handling callback: " << get_cupti_error(rc) << std::endl;
+ return;
+ }
+
+ auto cb_data = static_cast(callback_data_ptr);
+ switch (domain) {
+ case CUPTI_CB_DOMAIN_DRIVER_API: domain_driver_callback(callback_id, cb_data); break;
+ case CUPTI_CB_DOMAIN_RUNTIME_API: domain_runtime_callback(callback_id, cb_data); break;
+ default: break;
+ }
+}
+
+// Invoked by CUPTI when a new buffer is needed to record CUPTI activity events.
+void CUPTIAPI buffer_requested_callback(uint8_t** buffer_ptr_ptr,
+ size_t* size_ptr,
+ size_t* max_num_records_ptr)
+{
+ *max_num_records_ptr = 0;
+ if (!State->is_shutdown) {
+ auto buffer = State->free_buffers.get();
+ buffer->release(buffer_ptr_ptr, size_ptr);
+ } else {
+ *buffer_ptr_ptr = nullptr;
+ *size_ptr = 0;
+ }
+}
+
+// Invoked by CUPTI when an activity event buffer has completed.
+void CUPTIAPI buffer_completed_callback(
+ CUcontext, uint32_t, uint8_t* buffer, size_t buffer_size, size_t valid_size)
+{
+ auto pb = std::make_unique(buffer, buffer_size, valid_size);
+ if (!State->is_shutdown) { State->completed_buffers.put(std::move(pb)); }
+}
+
+// Setup the environment variables for NVTX library injection so we can capture NVTX events.
+void setup_nvtx_env(JNIEnv* env, jstring j_lib_path)
+{
+ auto lib_path = env->GetStringUTFChars(j_lib_path, 0);
+ if (lib_path == NULL) { throw std::runtime_error("Error getting library path"); }
+ setenv("NVTX_INJECTION64_PATH", lib_path, 1);
+ env->ReleaseStringUTFChars(j_lib_path, lib_path);
+}
+
+// Main processing loop for the background writer thread
+void writer_thread_process(JavaVM* vm, jobject j_writer, size_t buffer_size, size_t flush_threshold)
+{
+ try {
+ JNIEnv* env = attach_to_jvm(vm);
+ profiler_serializer serializer(env, j_writer, buffer_size, flush_threshold);
+ auto buffer = State->completed_buffers.get();
+ while (buffer) {
+ serializer.process_cupti_buffer(buffer->data(), buffer->valid_size());
+ State->free_buffers.put(std::move(buffer));
+ buffer = State->completed_buffers.get();
+ }
+ serializer.flush();
+ } catch (std::exception const& e) {
+ std::cerr << "PROFILER: WRITER THREAD ERROR: " << e.what() << std::endl;
+ // no-op process buffers
+ auto buffer = State->completed_buffers.get();
+ while (buffer) {
+ State->free_buffers.put(std::move(buffer));
+ buffer = State->completed_buffers.get();
+ }
+ }
+ vm->DetachCurrentThread();
+}
+
+// Enable/disable capture of CUPTI activity events
+void update_activity_enable(bool enable)
+{
+ CUpti_ActivityKind const activity_ids[] = {CUPTI_ACTIVITY_KIND_DEVICE,
+ CUPTI_ACTIVITY_KIND_DRIVER,
+ CUPTI_ACTIVITY_KIND_RUNTIME,
+ CUPTI_ACTIVITY_KIND_MEMCPY,
+ CUPTI_ACTIVITY_KIND_MEMSET,
+ CUPTI_ACTIVITY_KIND_NAME,
+ CUPTI_ACTIVITY_KIND_MARKER,
+ CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL,
+ CUPTI_ACTIVITY_KIND_OVERHEAD};
+ if (enable) {
+ for (CUpti_ActivityKind const id : activity_ids) {
+ check_cupti(cuptiActivityEnable(id), "Error enabling device activity");
+ }
+ } else {
+ for (CUpti_ActivityKind const id : activity_ids) {
+ check_cupti(cuptiActivityDisable(id), "Error disabling device activity");
+ }
+ check_cupti(cuptiActivityFlushAll(0), "Error flushing activity records");
+ }
+}
+
+} // anonymous namespace
+
+} // namespace spark_rapids_jni::profiler
+
+extern "C" {
+
+using namespace spark_rapids_jni::profiler;
+
+JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIEnv* env,
+ jclass,
+ jstring j_lib_path,
+ jobject j_writer,
+ jlong write_buffer_size,
+ jint flush_period_msec)
+{
+ try {
+ setup_nvtx_env(env, j_lib_path);
+ // grab a global reference to the writer instance so it isn't garbage collected
+ auto writer = static_cast(env->NewGlobalRef(j_writer));
+ if (!writer) { throw std::runtime_error("Unable to create a global reference to writer"); }
+ State = new subscriber_state(writer, write_buffer_size);
+ State->writer_thread = std::thread(
+ writer_thread_process, get_jvm(env), writer, write_buffer_size, write_buffer_size);
+ auto rc = cuptiSubscribe(&State->subscriber_handle, callback_handler, nullptr);
+ check_cupti(rc, "Error initializing CUPTI");
+ rc = cuptiEnableCallback(1,
+ State->subscriber_handle,
+ CUPTI_CB_DOMAIN_RUNTIME_API,
+ CUPTI_RUNTIME_TRACE_CBID_cudaDeviceReset_v3020);
+ if (flush_period_msec > 0) {
+ std::cerr << "PROFILER: Flushing activity records every " << flush_period_msec
+ << " milliseconds" << std::endl;
+ Flush_period_msec = static_cast(flush_period_msec);
+ Last_flush_time_msec = timestamp_now();
+ // CUPTI's periodic flush does not appear to work in this environment. As a workaround,
+ // register a callback for all the various ways a GPU kernel gets launched. The callback
+ // checks if the flush period has elapsed since we last flushed, and if so, forces a flush.
+ CUpti_CallbackId const driver_launch_callback_ids[] = {
+ CUPTI_DRIVER_TRACE_CBID_cuGraphLaunch,
+ CUPTI_DRIVER_TRACE_CBID_cuGraphLaunch_ptsz,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunch,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernel,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernel_ptsz,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchCooperativeKernelMultiDevice,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchGrid,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchGridAsync,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel,
+ CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel_ptsz};
+ for (CUpti_CallbackId const id : driver_launch_callback_ids) {
+ rc = cuptiEnableCallback(1, State->subscriber_handle, CUPTI_CB_DOMAIN_DRIVER_API, id);
+ check_cupti(rc, "Error registering driver launch callbacks");
+ }
+ }
+ check_cupti(rc, "Error enabling device reset callback");
+ rc = cuptiActivityRegisterCallbacks(buffer_requested_callback, buffer_completed_callback);
+ check_cupti(rc, "Error registering activity buffer callbacks");
+ }
+ CATCH_STD(env, );
+}
+
+JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeStart(JNIEnv* env, jclass)
+{
+ try {
+ if (State && !State->is_shutdown) { update_activity_enable(true); }
+ }
+ CATCH_STD(env, );
+}
+
+JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeStop(JNIEnv* env, jclass)
+{
+ try {
+ if (State && !State->is_shutdown) { update_activity_enable(false); }
+ }
+ CATCH_STD(env, );
+}
+
+JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeShutdown(JNIEnv* env, jclass)
+{
+ try {
+ if (State && !State->is_shutdown) {
+ auto unsub_rc = cuptiUnsubscribe(State->subscriber_handle);
+ auto flush_rc = cuptiActivityFlushAll(1);
+ State->completed_buffers.shutdown();
+ State->writer_thread.join();
+ State->is_shutdown = true;
+ env->DeleteGlobalRef(State->j_writer);
+ // There can be late arrivals of CUPTI activity events and other callbacks, so it's safer
+ // and simpler to _not_ delete the State object on shutdown.
+ check_cupti(unsub_rc, "Error unsubscribing from CUPTI");
+ check_cupti(flush_rc, "Error flushing CUPTI records");
+ }
+ }
+ CATCH_STD(env, );
+}
+
+} // extern "C"
+
+// Extern the CUPTI NVTX initialization APIs. The APIs are thread-safe.
+extern "C" CUptiResult CUPTIAPI cuptiNvtxInitialize(void* pfnGetExportTable);
+extern "C" CUptiResult CUPTIAPI cuptiNvtxInitialize2(void* pfnGetExportTable);
+
+// Interface that may be called by NVTX to capture NVTX events
+extern "C" JNIEXPORT int InitializeInjectionNvtx(void* p)
+{
+ CUptiResult res = cuptiNvtxInitialize(p);
+ return (res == CUPTI_SUCCESS) ? 1 : 0;
+}
+
+// Interface that may be called by NVTX to capture NVTX events
+extern "C" JNIEXPORT int InitializeInjectionNvtx2(void* p)
+{
+ CUptiResult res = cuptiNvtxInitialize2(p);
+ return (res == CUPTI_SUCCESS) ? 1 : 0;
+}
diff --git a/src/main/cpp/profiler/profiler_debug.cpp b/src/main/cpp/profiler/profiler_debug.cpp
new file mode 100644
index 0000000000..3759b11e0d
--- /dev/null
+++ b/src/main/cpp/profiler/profiler_debug.cpp
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "profiler_debug.hpp"
+
+#include
+#include
+
+namespace spark_rapids_jni::profiler {
+
+namespace {
+
+std::string marker_flags_to_string(CUpti_ActivityFlag flags)
+{
+ std::string s("");
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS) { s += "INSTANTANEOUS "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_START) { s += "START "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_END) { s += "END "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE) { s += "SYNCACQUIRE "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE_SUCCESS) { s += "SYNCACQUIRESUCCESS "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE_FAILED) { s += "SYNCACQUIREFAILED "; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_RELEASE) { s += "SYNCRELEASE "; }
+ return s;
+}
+
+std::string activity_object_kind_to_string(CUpti_ActivityObjectKind kind)
+{
+ switch (kind) {
+ case CUPTI_ACTIVITY_OBJECT_PROCESS: return "PROCESS";
+ case CUPTI_ACTIVITY_OBJECT_THREAD: return "THREAD";
+ case CUPTI_ACTIVITY_OBJECT_DEVICE: return "DEVICE";
+ case CUPTI_ACTIVITY_OBJECT_CONTEXT: return "CONTEXT";
+ case CUPTI_ACTIVITY_OBJECT_STREAM: return "STREAM";
+ case CUPTI_ACTIVITY_OBJECT_UNKNOWN: return "UNKNOWN";
+ default: {
+ std::ostringstream oss;
+ oss << "UNRECOGNIZED(" << kind << ")";
+ return oss.str();
+ }
+ }
+}
+
+} // anonymous namespace
+
+std::string activity_kind_to_string(CUpti_ActivityKind kind)
+{
+ switch (kind) {
+ case CUPTI_ACTIVITY_KIND_MEMCPY: return "CUPTI_ACTIVITY_KIND_MEMCPY";
+ case CUPTI_ACTIVITY_KIND_MEMSET: return "CUPTI_ACTIVITY_KIND_MEMSET";
+ case CUPTI_ACTIVITY_KIND_KERNEL: return "CUPTI_ACTIVITY_KIND_KERNEL";
+ case CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL: return "CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL";
+ case CUPTI_ACTIVITY_KIND_DRIVER: return "CPUTI_ACTIVITY_KIND_DRIVER";
+ case CUPTI_ACTIVITY_KIND_RUNTIME: return "CUPTI_ACTIVITY_KIND_RUNTIME";
+ case CUPTI_ACTIVITY_KIND_INTERNAL_LAUNCH_API: return "CUPTI_ACTIVITY_KIND_INTERNAL_LAUNCH_API";
+ case CUPTI_ACTIVITY_KIND_EVENT: return "CUPTI_ACTIVITY_KIND_EVENT";
+ case CUPTI_ACTIVITY_KIND_METRIC: return "CUPTI_ACTIVITY_KIND_METRIC";
+ case CUPTI_ACTIVITY_KIND_DEVICE: return "CUPTI_ACTIVITY_KIND_DEVICE";
+ case CUPTI_ACTIVITY_KIND_CONTEXT: return "CUPTI_ACTIVITY_KIND_CONTEXT";
+ case CUPTI_ACTIVITY_KIND_NAME: return "CUPTI_ACTIVITY_KIND_NAME";
+ case CUPTI_ACTIVITY_KIND_MARKER: return "CUPTI_ACTIVITY_KIND_MARKER";
+ case CUPTI_ACTIVITY_KIND_MARKER_DATA: return "CUPTI_ACTIVITY_KIND_MARKER_DATA";
+ case CUPTI_ACTIVITY_KIND_SOURCE_LOCATOR: return "CUPTI_ACTIVITY_KIND_SOURCE_LOCATOR";
+ case CUPTI_ACTIVITY_KIND_GLOBAL_ACCESS: return "CUPTI_ACTIVITY_KIND_GLOBAL_ACCESS";
+ case CUPTI_ACTIVITY_KIND_BRANCH: return "CUPTI_ACTIVITY_KIND_BRANCH";
+ case CUPTI_ACTIVITY_KIND_OVERHEAD: return "CUPTI_ACTIVITY_KIND_OVERHEAD";
+ case CUPTI_ACTIVITY_KIND_CDP_KERNEL: return "CUPTI_ACTIVITY_KIND_CDP_KERNEL";
+ case CUPTI_ACTIVITY_KIND_PREEMPTION: return "CUPTI_ACTIVITY_KIND_PREEMPTION";
+ case CUPTI_ACTIVITY_KIND_ENVIRONMENT: return "CUPTI_ACTIVITY_KIND_ENVIRONMENT";
+ case CUPTI_ACTIVITY_KIND_EVENT_INSTANCE: return "CUPTI_ACTIVITY_KIND_EVENT_INSTANCE";
+ case CUPTI_ACTIVITY_KIND_MEMCPY2: return "CUPTI_ACTIVITY_KIND_MEMCPY2";
+ case CUPTI_ACTIVITY_KIND_METRIC_INSTANCE: return "CUPTI_ACTIVITY_KIND_METRIC_INSTANCE";
+ case CUPTI_ACTIVITY_KIND_INSTRUCTION_EXECUTION:
+ return "CUPTI_ACTIVITY_KIND_INSTRUCTION_EXECUTION";
+ case CUPTI_ACTIVITY_KIND_UNIFIED_MEMORY_COUNTER:
+ return "CUPTI_ACTIVITY_KIND_UNIFIED_MEMORY_COUNTER";
+ case CUPTI_ACTIVITY_KIND_FUNCTION: return "CUPTI_ACTIVITY_KIND_FUNCTION";
+ case CUPTI_ACTIVITY_KIND_MODULE: return "CUPTI_ACTIVITY_KIND_MODULE";
+ case CUPTI_ACTIVITY_KIND_DEVICE_ATTRIBUTE: return "CUPTI_ACTIVITY_KIND_DEVICE_ATTRIBUTE";
+ case CUPTI_ACTIVITY_KIND_SHARED_ACCESS: return "CUPTI_ACTIVITY_KIND_SHARED_ACCESS";
+ case CUPTI_ACTIVITY_KIND_PC_SAMPLING: return "CUPTI_ACTIVITY_KIND_PC_SAMPLING";
+ case CUPTI_ACTIVITY_KIND_PC_SAMPLING_RECORD_INFO:
+ return "CUPTI_ACTIVITY_KIND_PC_SAMPLING_RECORD_INFO";
+ case CUPTI_ACTIVITY_KIND_INSTRUCTION_CORRELATION:
+ return "CUPTI_ACTIVITY_KIND_INSTRUCTION_CORRELATION";
+ case CUPTI_ACTIVITY_KIND_OPENACC_DATA: return "CUPTI_ACTIVITY_KIND_OPENACC_DATA";
+ case CUPTI_ACTIVITY_KIND_OPENACC_LAUNCH: return "CUPTI_ACTIVITY_KIND_OPENACC_LAUNCH";
+ case CUPTI_ACTIVITY_KIND_OPENACC_OTHER: return "CUPTI_ACTIVITY_KIND_OPENACC_OTHER";
+ case CUPTI_ACTIVITY_KIND_CUDA_EVENT: return "CUPTI_ACTIVITY_KIND_CUDA_EVENT";
+ case CUPTI_ACTIVITY_KIND_STREAM: return "CUPTI_ACTIVITY_KIND_STREAM";
+ case CUPTI_ACTIVITY_KIND_SYNCHRONIZATION: return "CUPTI_ACTIVITY_KIND_SYNCHRONIZATION";
+ case CUPTI_ACTIVITY_KIND_EXTERNAL_CORRELATION:
+ return "CUPTI_ACTIVITY_KIND_EXTERNAL_CORRELATION";
+ case CUPTI_ACTIVITY_KIND_NVLINK: return "CUPTI_ACTIVITY_KIND_NVLINK";
+ case CUPTI_ACTIVITY_KIND_INSTANTANEOUS_EVENT: return "CUPTI_ACTIVITY_KIND_INSTANTANEOUS_EVENT";
+ case CUPTI_ACTIVITY_KIND_INSTANTANEOUS_EVENT_INSTANCE:
+ return "CUPTI_ACTIVITY_KIND_INSTANTANEOUS_EVENT_INSTANCE";
+ case CUPTI_ACTIVITY_KIND_INSTANTANEOUS_METRIC:
+ return "CUPTI_ACTIVITY_KIND_INSTANTANEOUS_METRIC";
+ case CUPTI_ACTIVITY_KIND_INSTANTANEOUS_METRIC_INSTANCE:
+ return "CUPTI_ACTIVITY_KIND_INSTANTANEOUS_METRIC_INSTANCE";
+ case CUPTI_ACTIVITY_KIND_MEMORY: return "CUPTI_ACTIVITY_KIND_MEMORY";
+ case CUPTI_ACTIVITY_KIND_PCIE: return "CUPTI_ACTIVITY_KIND_PCIE";
+ case CUPTI_ACTIVITY_KIND_OPENMP: return "CUPTI_ACTIVITY_KIND_OPENMP";
+ case CUPTI_ACTIVITY_KIND_MEMORY2: return "CUPTI_ACTIVITY_KIND_MEMORY2";
+ case CUPTI_ACTIVITY_KIND_MEMORY_POOL: return "CUPTI_ACTIVITY_KIND_MEMORY_POOL";
+ case CUPTI_ACTIVITY_KIND_GRAPH_TRACE: return "CUPTI_ACTIVITY_KIND_GRAPH_TRACE";
+ case CUPTI_ACTIVITY_KIND_JIT: return "CUPTI_ACTIVITY_KIND_JIT";
+ default: {
+ std::ostringstream oss;
+ oss << "UNRECOGNIZED(" << kind << ")";
+ return oss.str();
+ }
+ }
+}
+
+void print_cupti_buffer(uint8_t* buffer, size_t valid_size)
+{
+ if (valid_size > 0) {
+ std::cerr << "PROFILER: CUPTI buffer size: " << valid_size << std::endl;
+ CUpti_Activity* record_ptr = nullptr;
+ auto rc = cuptiActivityGetNextRecord(buffer, valid_size, &record_ptr);
+ while (rc == CUPTI_SUCCESS) {
+ std::cerr << "RECORD: " << activity_kind_to_string(record_ptr->kind) << std::endl;
+ switch (record_ptr->kind) {
+ case CUPTI_ACTIVITY_KIND_DRIVER: {
+ auto api_record = reinterpret_cast(record_ptr);
+ char const* name = nullptr;
+ cuptiGetCallbackName(CUPTI_CB_DOMAIN_DRIVER_API, api_record->cbid, &name);
+ name = name ? name : "NULL";
+ std::cerr << " NAME: " << name << " THREAD: " << api_record->threadId << std::endl;
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_DEVICE: {
+ auto device_record = reinterpret_cast(record_ptr);
+ char const* name = device_record->name != nullptr ? device_record->name : "NULL";
+ std::cerr << " " << activity_kind_to_string(device_record->kind) << " " << name
+ << std::endl;
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_RUNTIME: {
+ auto api_record = reinterpret_cast(record_ptr);
+ char const* name = nullptr;
+ cuptiGetCallbackName(CUPTI_CB_DOMAIN_RUNTIME_API, api_record->cbid, &name);
+ name = name ? name : "NULL";
+ std::cerr << " NAME: " << name << " THREAD: " << api_record->threadId << std::endl;
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MARKER: {
+ auto marker_record = reinterpret_cast(record_ptr);
+ std::cerr << " FLAGS: " << marker_flags_to_string(marker_record->flags)
+ << " ID: " << marker_record->id
+ << " OBJECTKIND: " << activity_object_kind_to_string(marker_record->objectKind)
+ << " NAME: " << std::string(marker_record->name ? marker_record->name : "NULL")
+ << " DOMAIN: "
+ << std::string(marker_record->domain ? marker_record->domain : "NULL")
+ << std::endl;
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MARKER_DATA: {
+ auto marker_record = reinterpret_cast(record_ptr);
+ std::cerr << " FLAGS: " << marker_flags_to_string(marker_record->flags)
+ << " ID: " << marker_record->id << " COLOR: " << marker_record->color
+ << " COLOR FLAG: " << marker_record->flags
+ << " CATEGORY: " << marker_record->category
+ << " DATA KIND: " << marker_record->payloadKind
+ << " DATA: " << marker_record->payload.metricValueUint64 << "/"
+ << marker_record->payload.metricValueDouble << std::endl;
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL: {
+ auto kernel_record = reinterpret_cast(record_ptr);
+ std::cerr << " NAME: " << kernel_record->name << std::endl;
+ }
+ default: break;
+ }
+ rc = cuptiActivityGetNextRecord(buffer, valid_size, &record_ptr);
+ }
+ }
+}
+
+} // namespace spark_rapids_jni::profiler
diff --git a/src/main/cpp/profiler/profiler_debug.hpp b/src/main/cpp/profiler/profiler_debug.hpp
new file mode 100644
index 0000000000..e44fdb87ff
--- /dev/null
+++ b/src/main/cpp/profiler/profiler_debug.hpp
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+
+#include
+#include
+
+namespace spark_rapids_jni::profiler {
+
+std::string activity_kind_to_string(CUpti_ActivityKind kind);
+
+void print_cupti_buffer(uint8_t* buffer, size_t valid_size);
+
+} // namespace spark_rapids_jni::profiler
diff --git a/src/main/cpp/profiler/profiler_schema.cpp.in b/src/main/cpp/profiler/profiler_schema.cpp.in
new file mode 100644
index 0000000000..f2940a91bf
--- /dev/null
+++ b/src/main/cpp/profiler/profiler_schema.cpp.in
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace spark_rapids_jni::profiler {
+char const* Profiler_Schema = R"raw(@SPARK_RAPIDS_JNI_PROFILER_SCHEMA@)raw";
+}
diff --git a/src/main/cpp/profiler/profiler_serializer.cpp b/src/main/cpp/profiler/profiler_serializer.cpp
new file mode 100644
index 0000000000..b47ff234ad
--- /dev/null
+++ b/src/main/cpp/profiler/profiler_serializer.cpp
@@ -0,0 +1,559 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "profiler_serializer.hpp"
+
+#include "profiler_debug.hpp"
+#include "profiler_generated.h"
+#include "spark_rapids_jni_version.h"
+
+#include
+
+#include
+
+namespace spark_rapids_jni::profiler {
+
+namespace {
+
+constexpr uint32_t PROFILE_VERSION = 1;
+
+flatbuffers::Offset add_object_id(flatbuffers::FlatBufferBuilder& fbb,
+ CUpti_ActivityObjectKind kind,
+ CUpti_ActivityObjectKindId const& object_id)
+{
+ switch (kind) {
+ case CUPTI_ACTIVITY_OBJECT_PROCESS:
+ case CUPTI_ACTIVITY_OBJECT_THREAD: {
+ ActivityObjectIdBuilder aoib(fbb);
+ aoib.add_process_id(object_id.pt.processId);
+ if (kind == CUPTI_ACTIVITY_OBJECT_THREAD) { aoib.add_thread_id(object_id.pt.threadId); }
+ return aoib.Finish();
+ }
+ case CUPTI_ACTIVITY_OBJECT_DEVICE:
+ case CUPTI_ACTIVITY_OBJECT_CONTEXT:
+ case CUPTI_ACTIVITY_OBJECT_STREAM: {
+ ActivityObjectIdBuilder aoib(fbb);
+ aoib.add_device_id(object_id.dcs.deviceId);
+ if (kind == CUPTI_ACTIVITY_OBJECT_CONTEXT || kind == CUPTI_ACTIVITY_OBJECT_STREAM) {
+ aoib.add_context_id(object_id.dcs.contextId);
+ if (kind == CUPTI_ACTIVITY_OBJECT_STREAM) { aoib.add_stream_id(object_id.dcs.streamId); }
+ }
+ return aoib.Finish();
+ }
+ default:
+ std::cerr << "PROFILER: Unrecognized object kind: " << kind << std::endl;
+ return flatbuffers::Offset();
+ }
+}
+
+MarkerFlags marker_flags_to_fb(CUpti_ActivityFlag flags)
+{
+ uint8_t result = 0;
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS) { result |= MarkerFlags_Instantaneous; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_START) { result |= MarkerFlags_Start; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_END) { result |= MarkerFlags_End; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE) { result |= MarkerFlags_SyncAcquire; }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE_SUCCESS) {
+ result |= MarkerFlags_SyncAcquireSuccess;
+ }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_ACQUIRE_FAILED) {
+ result |= MarkerFlags_SyncAcquireFailed;
+ }
+ if (flags & CUPTI_ACTIVITY_FLAG_MARKER_SYNC_RELEASE) { result |= MarkerFlags_SyncRelease; }
+ return static_cast(result);
+}
+
+ChannelType to_channel_type(CUpti_ChannelType t)
+{
+ switch (t) {
+ case CUPTI_CHANNEL_TYPE_INVALID: return ChannelType_Invalid;
+ case CUPTI_CHANNEL_TYPE_COMPUTE: return ChannelType_Compute;
+ case CUPTI_CHANNEL_TYPE_ASYNC_MEMCPY: return ChannelType_AsyncMemcpy;
+ default:
+ std::cerr << "PROFILER: Unrecognized channel type: " << t << std::endl;
+ return ChannelType_Invalid;
+ }
+}
+
+LaunchType to_launch_type(uint8_t t)
+{
+ switch (t) {
+ case CUPTI_ACTIVITY_LAUNCH_TYPE_REGULAR: return LaunchType_Regular;
+ case CUPTI_ACTIVITY_LAUNCH_TYPE_COOPERATIVE_SINGLE_DEVICE:
+ return LaunchType_CooperativeSingleDevice;
+ case CUPTI_ACTIVITY_LAUNCH_TYPE_COOPERATIVE_MULTI_DEVICE:
+ return LaunchType_CooperativeMultiDevice;
+ default:
+ std::cerr << "PROFILER: Unrecognized launch type: " << t << std::endl;
+ return LaunchType_Regular;
+ }
+}
+
+MemcpyFlags to_memcpy_flags(uint32_t flags)
+{
+ uint8_t result = 0;
+ if (flags & CUPTI_ACTIVITY_FLAG_MEMCPY_ASYNC) { result |= MemcpyFlags_Async; }
+ return static_cast(result);
+}
+
+MemcpyKind to_memcpy_kind(uint8_t k)
+{
+ switch (k) {
+ case CUPTI_ACTIVITY_MEMCPY_KIND_UNKNOWN: return MemcpyKind_Unknown;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_HTOD: return MemcpyKind_HtoD;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_DTOH: return MemcpyKind_DtoH;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_HTOA: return MemcpyKind_HtoA;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_ATOH: return MemcpyKind_AtoH;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_ATOA: return MemcpyKind_AtoA;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_ATOD: return MemcpyKind_AtoD;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_DTOA: return MemcpyKind_DtoA;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_DTOD: return MemcpyKind_DtoD;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_HTOH: return MemcpyKind_HtoH;
+ case CUPTI_ACTIVITY_MEMCPY_KIND_PTOP: return MemcpyKind_PtoP;
+ default:
+ std::cerr << "PROFILER: Unrecognized memcpy kind: " << k << std::endl;
+ return MemcpyKind_Unknown;
+ }
+}
+
+MemoryKind to_memory_kind(uint8_t k)
+{
+ switch (k) {
+ case CUPTI_ACTIVITY_MEMORY_KIND_UNKNOWN: return MemoryKind_Unknown;
+ case CUPTI_ACTIVITY_MEMORY_KIND_PAGEABLE: return MemoryKind_Pageable;
+ case CUPTI_ACTIVITY_MEMORY_KIND_PINNED: return MemoryKind_Pinned;
+ case CUPTI_ACTIVITY_MEMORY_KIND_DEVICE: return MemoryKind_Device;
+ case CUPTI_ACTIVITY_MEMORY_KIND_ARRAY: return MemoryKind_Array;
+ case CUPTI_ACTIVITY_MEMORY_KIND_MANAGED: return MemoryKind_Managed;
+ case CUPTI_ACTIVITY_MEMORY_KIND_DEVICE_STATIC: return MemoryKind_DeviceStatic;
+ case CUPTI_ACTIVITY_MEMORY_KIND_MANAGED_STATIC: return MemoryKind_ManagedStatic;
+ default:
+ std::cerr << "PROFILER: Unrecognized memory kind: " << k << std::endl;
+ return MemoryKind_Unknown;
+ }
+}
+
+MemsetFlags to_memset_flags(uint32_t flags)
+{
+ uint8_t result = 0;
+ if (flags & CUPTI_ACTIVITY_FLAG_MEMSET_ASYNC) { result |= MemsetFlags_Async; }
+ return static_cast(result);
+}
+
+OverheadKind to_overhead_kind(CUpti_ActivityOverheadKind k)
+{
+ switch (k) {
+ case CUPTI_ACTIVITY_OVERHEAD_UNKNOWN: return OverheadKind_Unknown;
+ case CUPTI_ACTIVITY_OVERHEAD_DRIVER_COMPILER: return OverheadKind_DriverCompiler;
+ case CUPTI_ACTIVITY_OVERHEAD_CUPTI_BUFFER_FLUSH: return OverheadKind_CUptiBufferFlush;
+ case CUPTI_ACTIVITY_OVERHEAD_CUPTI_INSTRUMENTATION: return OverheadKind_CUptiInstrumentation;
+ case CUPTI_ACTIVITY_OVERHEAD_CUPTI_RESOURCE: return OverheadKind_CUptiResource;
+ default:
+ std::cerr << "PROFILER: Unrecognized overhead kind: " << k << std::endl;
+ return OverheadKind_Unknown;
+ }
+}
+
+PartitionedGlobalCacheConfig to_partitioned_global_cache_config(
+ CUpti_ActivityPartitionedGlobalCacheConfig c)
+{
+ switch (c) {
+ case CUPTI_ACTIVITY_PARTITIONED_GLOBAL_CACHE_CONFIG_UNKNOWN:
+ return PartitionedGlobalCacheConfig_Unknown;
+ case CUPTI_ACTIVITY_PARTITIONED_GLOBAL_CACHE_CONFIG_NOT_SUPPORTED:
+ return PartitionedGlobalCacheConfig_NotSupported;
+ case CUPTI_ACTIVITY_PARTITIONED_GLOBAL_CACHE_CONFIG_OFF:
+ return PartitionedGlobalCacheConfig_Off;
+ case CUPTI_ACTIVITY_PARTITIONED_GLOBAL_CACHE_CONFIG_ON: return PartitionedGlobalCacheConfig_On;
+ default:
+ std::cerr << "PROFILER: Unrecognized partitioned global cache config: " << c << std::endl;
+ return PartitionedGlobalCacheConfig_Unknown;
+ }
+}
+
+ShmemLimitConfig to_shmem_limit_config(CUpti_FuncShmemLimitConfig c)
+{
+ switch (c) {
+ case CUPTI_FUNC_SHMEM_LIMIT_DEFAULT: return ShmemLimitConfig_Default;
+ case CUPTI_FUNC_SHMEM_LIMIT_OPTIN: return ShmemLimitConfig_Optin;
+ default:
+ std::cerr << "PROFILER: Unrecognized shmem limit config: " << c << std::endl;
+ return ShmemLimitConfig_Default;
+ }
+}
+
+} // anonymous namespace
+
+profiler_serializer::profiler_serializer(JNIEnv* env,
+ jobject writer,
+ size_t buffer_size,
+ size_t flush_threshold)
+ : env_(env), j_writer_(writer), flush_threshold_(flush_threshold), fbb_(buffer_size)
+{
+ auto writer_class = env->GetObjectClass(writer);
+ if (!writer_class) { throw std::runtime_error("Failed to locate class of data writer"); }
+ j_write_method_ = env->GetMethodID(writer_class, "write", "(Ljava/nio/ByteBuffer;)V");
+ if (!j_write_method_) { throw std::runtime_error("Failed to locate data writer write method"); }
+ write_profile_header();
+}
+
+void profiler_serializer::write_profile_header()
+{
+ auto writer_version = fbb_.CreateString(spark_rapids_jni::Version);
+ auto magic = fbb_.CreateString("spark-rapids profile");
+ auto header = CreateProfileHeader(fbb_, magic, PROFILE_VERSION, writer_version);
+ fbb_.FinishSizePrefixed(header);
+ write_current_fb();
+}
+
+void profiler_serializer::process_cupti_buffer(uint8_t* buffer, size_t valid_size)
+{
+ report_num_dropped_records();
+ if (valid_size > 0) {
+ CUpti_Activity* record_ptr = nullptr;
+ auto rc = cuptiActivityGetNextRecord(buffer, valid_size, &record_ptr);
+ while (rc == CUPTI_SUCCESS) {
+ switch (record_ptr->kind) {
+ case CUPTI_ACTIVITY_KIND_DEVICE: {
+ auto device_record = reinterpret_cast(record_ptr);
+ process_device_activity(device_record);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_DRIVER:
+ case CUPTI_ACTIVITY_KIND_RUNTIME: {
+ auto api_record = reinterpret_cast(record_ptr);
+ process_api_activity(api_record);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MARKER: {
+ auto marker = reinterpret_cast(record_ptr);
+ process_marker_activity(marker);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MARKER_DATA: {
+ auto marker = reinterpret_cast(record_ptr);
+ process_marker_data(marker);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MEMCPY: {
+ auto r = reinterpret_cast(record_ptr);
+ process_memcpy(r);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_MEMSET: {
+ auto r = reinterpret_cast(record_ptr);
+ process_memset(r);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL: {
+ auto r = reinterpret_cast(record_ptr);
+ process_kernel(r);
+ break;
+ }
+ case CUPTI_ACTIVITY_KIND_OVERHEAD: {
+ auto r = reinterpret_cast(record_ptr);
+ process_overhead(r);
+ break;
+ }
+ default:
+ std::cerr << "PROFILER: Ignoring activity record "
+ << activity_kind_to_string(record_ptr->kind) << std::endl;
+ break;
+ }
+ if (fbb_.GetSize() >= flush_threshold_) { flush(); }
+ rc = cuptiActivityGetNextRecord(buffer, valid_size, &record_ptr);
+ }
+ }
+}
+
+void profiler_serializer::flush()
+{
+ if (fbb_.GetSize() > 0) {
+ using flatbuffers::Offset;
+ using flatbuffers::Vector;
+ Offset>> api_vec;
+ Offset>> device_vec;
+ Offset>> dropped_vec;
+ Offset>> kernel_vec;
+ Offset>> marker_vec;
+ Offset>> marker_data_vec;
+ Offset>> memcpy_vec;
+ Offset>> memset_vec;
+ Offset>> overhead_vec;
+ if (api_offsets_.size() > 0) { api_vec = fbb_.CreateVector(api_offsets_); }
+ if (device_offsets_.size() > 0) { device_vec = fbb_.CreateVector(device_offsets_); }
+ if (dropped_offsets_.size() > 0) { dropped_vec = fbb_.CreateVector(dropped_offsets_); }
+ if (kernel_offsets_.size() > 0) { kernel_vec = fbb_.CreateVector(kernel_offsets_); }
+ if (marker_offsets_.size() > 0) { marker_vec = fbb_.CreateVector(marker_offsets_); }
+ if (marker_data_offsets_.size() > 0) {
+ marker_data_vec = fbb_.CreateVector(marker_data_offsets_);
+ }
+ if (memcpy_offsets_.size() > 0) { memcpy_vec = fbb_.CreateVector(memcpy_offsets_); }
+ if (memset_offsets_.size() > 0) { memset_vec = fbb_.CreateVector(memset_offsets_); }
+ if (overhead_offsets_.size() > 0) { overhead_vec = fbb_.CreateVector(overhead_offsets_); }
+ ActivityRecordsBuilder arb(fbb_);
+ arb.add_api(api_vec);
+ arb.add_device(device_vec);
+ arb.add_dropped(dropped_vec);
+ arb.add_kernel(kernel_vec);
+ arb.add_marker(marker_vec);
+ arb.add_marker_data(marker_data_vec);
+ arb.add_memcpy(memcpy_vec);
+ arb.add_memset(memset_vec);
+ arb.add_overhead(overhead_vec);
+ auto r = arb.Finish();
+ fbb_.FinishSizePrefixed(r);
+ write_current_fb();
+ }
+}
+
+void profiler_serializer::process_api_activity(CUpti_ActivityAPI const* r)
+{
+ auto api_kind = ApiKind_Runtime;
+ if (r->kind == CUPTI_ACTIVITY_KIND_DRIVER) {
+ api_kind = ApiKind_Driver;
+ } else if (r->kind == CUPTI_ACTIVITY_KIND_RUNTIME) {
+ // skip some very common and uninteresting APIs to reduce the profile size
+ switch (r->cbid) {
+ case CUPTI_RUNTIME_TRACE_CBID_cudaGetDevice_v3020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaGetLastError_v3020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaPeekAtLastError_v3020:
+ case CUPTI_RUNTIME_TRACE_CBID_cudaDeviceGetAttribute_v5000: return;
+ default: break;
+ }
+ } else {
+ std::cerr << "PROFILER: Ignoring API activity record kind: " << activity_kind_to_string(r->kind)
+ << std::endl;
+ return;
+ }
+ ApiActivityBuilder aab(fbb_);
+ aab.add_kind(api_kind);
+ aab.add_cbid(r->cbid);
+ aab.add_start(r->start);
+ aab.add_end(r->end);
+ aab.add_process_id(r->processId);
+ aab.add_thread_id(r->threadId);
+ aab.add_correlation_id(r->correlationId);
+ aab.add_return_value(r->returnValue);
+ api_offsets_.push_back(aab.Finish());
+}
+
+void profiler_serializer::process_device_activity(CUpti_ActivityDevice4 const* r)
+{
+ auto name = fbb_.CreateSharedString(r->name);
+ DeviceActivityBuilder dab(fbb_);
+ dab.add_global_memory_bandwidth(r->globalMemoryBandwidth);
+ dab.add_global_memory_size(r->globalMemorySize);
+ dab.add_constant_memory_size(r->constantMemorySize);
+ dab.add_l2_cache_size(r->l2CacheSize);
+ dab.add_num_threads_per_warp(r->numThreadsPerWarp);
+ dab.add_core_clock_rate(r->coreClockRate);
+ dab.add_num_memcpy_engines(r->numMemcpyEngines);
+ dab.add_num_multiprocessors(r->numMultiprocessors);
+ dab.add_max_ipc(r->maxIPC);
+ dab.add_max_warps_per_multiprocessor(r->maxWarpsPerMultiprocessor);
+ dab.add_max_blocks_per_multiprocessor(r->maxBlocksPerMultiprocessor);
+ dab.add_max_shared_memory_per_multiprocessor(r->maxSharedMemoryPerMultiprocessor);
+ dab.add_max_registers_per_multiprocessor(r->maxRegistersPerMultiprocessor);
+ dab.add_max_registers_per_block(r->maxRegistersPerBlock);
+ dab.add_max_shared_memory_per_block(r->maxSharedMemoryPerBlock);
+ dab.add_max_threads_per_block(r->maxThreadsPerBlock);
+ dab.add_max_block_dim_x(r->maxBlockDimX);
+ dab.add_max_block_dim_y(r->maxBlockDimY);
+ dab.add_max_block_dim_z(r->maxBlockDimZ);
+ dab.add_max_grid_dim_x(r->maxGridDimX);
+ dab.add_max_grid_dim_y(r->maxGridDimY);
+ dab.add_max_grid_dim_z(r->maxGridDimZ);
+ dab.add_compute_capability_major(r->computeCapabilityMajor);
+ dab.add_compute_capability_minor(r->computeCapabilityMinor);
+ dab.add_id(r->id);
+ dab.add_ecc_enabled(r->eccEnabled);
+ dab.add_name(name);
+ device_offsets_.push_back(dab.Finish());
+}
+
+void profiler_serializer::process_dropped_records(size_t num_dropped)
+{
+ auto dropped = CreateDroppedRecords(fbb_, num_dropped);
+ dropped_offsets_.push_back(dropped);
+}
+
+void profiler_serializer::process_kernel(CUpti_ActivityKernel8 const* r)
+{
+ auto name = fbb_.CreateSharedString(r->name);
+ KernelActivityBuilder kab(fbb_);
+ kab.add_requested(r->cacheConfig.config.requested);
+ kab.add_executed(r->cacheConfig.config.executed);
+ kab.add_shared_memory_config(r->sharedMemoryConfig);
+ kab.add_registers_per_thread(r->registersPerThread);
+ kab.add_partitioned_global_cache_requested(
+ to_partitioned_global_cache_config(r->partitionedGlobalCacheRequested));
+ kab.add_partitioned_global_cache_executed(
+ to_partitioned_global_cache_config(r->partitionedGlobalCacheExecuted));
+ kab.add_start(r->start);
+ kab.add_end(r->end);
+ kab.add_completed(r->completed);
+ kab.add_device_id(r->deviceId);
+ kab.add_context_id(r->contextId);
+ kab.add_stream_id(r->streamId);
+ kab.add_grid_x(r->gridX);
+ kab.add_grid_y(r->gridY);
+ kab.add_grid_z(r->gridZ);
+ kab.add_block_x(r->blockX);
+ kab.add_block_y(r->blockY);
+ kab.add_block_z(r->blockZ);
+ kab.add_static_shared_memory(r->staticSharedMemory);
+ kab.add_dynamic_shared_memory(r->dynamicSharedMemory);
+ kab.add_local_memory_per_thread(r->localMemoryPerThread);
+ kab.add_local_memory_total(r->localMemoryTotal);
+ kab.add_correlation_id(r->correlationId);
+ kab.add_grid_id(r->gridId);
+ kab.add_name(name);
+ kab.add_queued(r->queued);
+ kab.add_submitted(r->submitted);
+ kab.add_launch_type(to_launch_type(r->launchType));
+ kab.add_is_shared_memory_carveout_requested(r->isSharedMemoryCarveoutRequested);
+ kab.add_shared_memory_carveout_requested(r->sharedMemoryCarveoutRequested);
+ kab.add_shared_memory_executed(r->sharedMemoryExecuted);
+ kab.add_graph_node_id(r->graphNodeId);
+ kab.add_shmem_limit_config(to_shmem_limit_config(r->shmemLimitConfig));
+ kab.add_graph_id(r->graphId);
+ kab.add_channel_id(r->channelID);
+ kab.add_channel_type(to_channel_type(r->channelType));
+ kab.add_cluster_x(r->clusterX);
+ kab.add_cluster_y(r->clusterY);
+ kab.add_cluster_z(r->clusterZ);
+ kab.add_cluster_scheduling_policy(r->clusterSchedulingPolicy);
+ kab.add_local_memory_total_v2(r->localMemoryTotal_v2);
+ kernel_offsets_.push_back(kab.Finish());
+}
+
+void profiler_serializer::process_marker_activity(CUpti_ActivityMarker2 const* r)
+{
+ auto object_id = add_object_id(fbb_, r->objectKind, r->objectId);
+ auto has_name = r->name != nullptr;
+ auto has_domain = r->name != nullptr;
+ flatbuffers::Offset name;
+ flatbuffers::Offset domain;
+ if (has_name) { name = fbb_.CreateSharedString(r->name); }
+ if (has_domain) { domain = fbb_.CreateSharedString(r->domain); }
+ MarkerActivityBuilder mab(fbb_);
+ mab.add_flags(marker_flags_to_fb(r->flags));
+ mab.add_timestamp(r->timestamp);
+ mab.add_id(r->id);
+ mab.add_object_id(object_id);
+ mab.add_name(name);
+ mab.add_domain(domain);
+ marker_offsets_.push_back(mab.Finish());
+}
+
+void profiler_serializer::process_marker_data(CUpti_ActivityMarkerData const* r)
+{
+ MarkerDataBuilder mdb(fbb_);
+ mdb.add_flags(marker_flags_to_fb(r->flags));
+ mdb.add_id(r->id);
+ mdb.add_color(r->color);
+ mdb.add_category(r->category);
+ marker_data_offsets_.push_back(mdb.Finish());
+}
+
+void profiler_serializer::process_memcpy(CUpti_ActivityMemcpy5 const* r)
+{
+ MemcpyActivityBuilder mab(fbb_);
+ mab.add_copy_kind(to_memcpy_kind(r->copyKind));
+ mab.add_src_kind(to_memory_kind(r->srcKind));
+ mab.add_dst_kind(to_memory_kind(r->dstKind));
+ mab.add_flags(to_memcpy_flags(r->flags));
+ mab.add_bytes(r->bytes);
+ mab.add_start(r->start);
+ mab.add_end(r->end);
+ mab.add_device_id(r->deviceId);
+ mab.add_context_id(r->contextId);
+ mab.add_stream_id(r->streamId);
+ mab.add_correlation_id(r->correlationId);
+ mab.add_runtime_correlation_id(r->runtimeCorrelationId);
+ mab.add_graph_node_id(r->graphNodeId);
+ mab.add_graph_id(r->graphId);
+ mab.add_channel_id(r->channelID);
+ mab.add_channel_type(to_channel_type(r->channelType));
+ memcpy_offsets_.push_back(mab.Finish());
+}
+
+void profiler_serializer::process_memset(CUpti_ActivityMemset4 const* r)
+{
+ MemsetActivityBuilder mab(fbb_);
+ mab.add_value(r->value);
+ mab.add_bytes(r->bytes);
+ mab.add_start(r->start);
+ mab.add_end(r->end);
+ mab.add_device_id(r->deviceId);
+ mab.add_context_id(r->contextId);
+ mab.add_stream_id(r->streamId);
+ mab.add_correlation_id(r->correlationId);
+ mab.add_flags(to_memset_flags(r->flags));
+ mab.add_memory_kind(to_memory_kind(r->memoryKind));
+ mab.add_graph_node_id(r->graphNodeId);
+ mab.add_graph_id(r->graphId);
+ mab.add_channel_id(r->channelID);
+ mab.add_channel_type(to_channel_type(r->channelType));
+ memset_offsets_.push_back(mab.Finish());
+}
+
+void profiler_serializer::process_overhead(CUpti_ActivityOverhead const* r)
+{
+ auto object_id = add_object_id(fbb_, r->objectKind, r->objectId);
+ OverheadActivityBuilder oab(fbb_);
+ oab.add_overhead_kind(to_overhead_kind(r->overheadKind));
+ oab.add_object_id(object_id);
+ oab.add_start(r->start);
+ oab.add_end(r->end);
+ overhead_offsets_.push_back(oab.Finish());
+}
+
+// Query CUPTI for dropped records, and if any, record in the current activity record
+void profiler_serializer::report_num_dropped_records()
+{
+ size_t num_dropped = 0;
+ auto rc = cuptiActivityGetNumDroppedRecords(NULL, 0, &num_dropped);
+ if (rc == CUPTI_SUCCESS && num_dropped > 0) { process_dropped_records(num_dropped); }
+}
+
+// Write out the current flatbuffer and reset state for the next flatbuffer.
+void profiler_serializer::write_current_fb()
+{
+ auto fb_size = fbb_.GetSize();
+ if (fb_size > 0) {
+ auto fb = fbb_.GetBufferPointer();
+ auto bytebuf_obj = env_->NewDirectByteBuffer(fb, fb_size);
+ if (bytebuf_obj != nullptr) {
+ env_->CallVoidMethod(j_writer_, j_write_method_, bytebuf_obj);
+ } else {
+ std::cerr << "PROFILER: Unable to create ByteBuffer for writer" << std::endl;
+ }
+ }
+ fbb_.Clear();
+ api_offsets_.clear();
+ device_offsets_.clear();
+ dropped_offsets_.clear();
+ kernel_offsets_.clear();
+ marker_offsets_.clear();
+ marker_data_offsets_.clear();
+ memcpy_offsets_.clear();
+ memset_offsets_.clear();
+ overhead_offsets_.clear();
+}
+
+} // namespace spark_rapids_jni::profiler
diff --git a/src/main/cpp/profiler/profiler_serializer.hpp b/src/main/cpp/profiler/profiler_serializer.hpp
new file mode 100644
index 0000000000..1feebf1b96
--- /dev/null
+++ b/src/main/cpp/profiler/profiler_serializer.hpp
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "profiler_generated.h"
+
+#include
+#include
+#include
+
+#include
+#include
+
+namespace spark_rapids_jni::profiler {
+
+// Serializes profile data as flatbuffers
+struct profiler_serializer {
+ profiler_serializer(JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold);
+ void process_cupti_buffer(uint8_t* buffer, size_t valid_size);
+ void flush();
+
+ private:
+ void write_profile_header();
+ void process_api_activity(CUpti_ActivityAPI const*);
+ void process_device_activity(CUpti_ActivityDevice4 const*);
+ void process_dropped_records(size_t num_dropped);
+ void process_marker_activity(CUpti_ActivityMarker2 const*);
+ void process_marker_data(CUpti_ActivityMarkerData const*);
+ void process_memcpy(CUpti_ActivityMemcpy5 const*);
+ void process_memset(CUpti_ActivityMemset4 const*);
+ void process_kernel(CUpti_ActivityKernel8 const*);
+ void process_overhead(CUpti_ActivityOverhead const*);
+ void report_num_dropped_records();
+ void write_current_fb();
+
+ JNIEnv* env_;
+ jmethodID j_write_method_;
+ jobject j_writer_;
+ size_t flush_threshold_;
+ flatbuffers::FlatBufferBuilder fbb_;
+ std::vector> api_offsets_;
+ std::vector> device_offsets_;
+ std::vector> dropped_offsets_;
+ std::vector> kernel_offsets_;
+ std::vector> marker_offsets_;
+ std::vector> marker_data_offsets_;
+ std::vector> memcpy_offsets_;
+ std::vector> memset_offsets_;
+ std::vector> overhead_offsets_;
+};
+
+} // namespace spark_rapids_jni::profiler
diff --git a/src/main/cpp/profiler/spark_rapids_profile_converter.cpp b/src/main/cpp/profiler/spark_rapids_profile_converter.cpp
new file mode 100644
index 0000000000..b916020392
--- /dev/null
+++ b/src/main/cpp/profiler/spark_rapids_profile_converter.cpp
@@ -0,0 +1,754 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* A tool that converts a spark-rapids profile binary into other forms. */
+
+#if 0
+#include
+#define FLATBUFFERS_ASSERT(x) \
+ do { \
+ if (!(x)) { throw std::runtime_error("flatbuffers assert"); } \
+ } while (0)
+#define FLATBUFFERS_DEBUG_VERIFICATION_FAILURE
+#endif
+
+#include "profiler_generated.h"
+#include "spark_rapids_jni_version.h"
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace spark_rapids_jni::profiler {
+extern char const* Profiler_Schema;
+}
+
+struct program_options {
+ std::optional output_path;
+ bool help = false;
+ bool json = false;
+ bool nvtxt = false;
+ int json_indent = 2;
+ bool version = false;
+};
+
+struct event {
+ enum struct type_id { API, DEVICE, KERNEL, MARKER, MARKER_DATA, MEMCPY, MEMSET, OVERHEAD };
+ type_id id;
+ void const* fb_data;
+};
+
+struct thread_id {
+ uint32_t pid;
+ uint32_t tid;
+
+ bool operator==(thread_id const& o) const { return pid == o.pid && tid == o.tid; }
+};
+
+template <>
+struct std::hash {
+ std::size_t operator()(thread_id const& t) const
+ {
+ return std::hash{}(t.pid) ^ (std::hash{}(t.tid) << 1);
+ }
+};
+
+struct stream_id {
+ uint32_t device;
+ uint32_t context;
+ uint32_t stream;
+
+ bool operator==(stream_id const& s) const
+ {
+ return device == s.device && context == s.context && stream == s.stream;
+ }
+};
+
+template <>
+struct std::hash {
+ std::size_t operator()(stream_id const& s) const
+ {
+ return std::hash{}(s.device) ^ (std::hash{}(s.context) << 1) ^
+ (std::hash{}(s.stream) << 2);
+ }
+};
+
+struct event_streams {
+ std::unordered_map> cpu;
+ std::unordered_map> gpu;
+};
+
+void print_usage()
+{
+ std::cout << "spark_rapids_profile_converter [OPTION]... profilebin" << std::endl;
+ std::cout << R"(
+Converts the spark-rapids profile in profile.bin into other forms.
+
+ -h, --help show this usage message
+ -j, --json convert to JSON, default output is stdout
+ -i, --json-indent=INDENT indentation to use for JSON. 0 is no indent, less than 0 also removes newlines
+ -o, --output=PATH use PATH as the output filename
+ -t. --nvtxt convert to NVTXT, default output is stdout
+ -V, --version print the version number
+ )" << std::endl;
+}
+
+void print_version()
+{
+ std::cout << "spark_rapids_profile_converter " << spark_rapids_jni::Version << std::endl;
+}
+
+std::pair> parse_options(
+ std::vector args)
+{
+ program_options opts{};
+ std::string_view long_output("--output=");
+ std::string_view long_json_indent("--json-indent=");
+ bool seen_output = false;
+ bool seen_json_indent = false;
+ auto argp = args.begin();
+ while (argp != args.end()) {
+ if (*argp == "-o" || *argp == "--output") {
+ if (seen_output) { throw std::runtime_error("output path cannot be specified twice"); }
+ seen_output = true;
+ if (++argp != args.end()) {
+ opts.output_path = std::make_optional(*argp++);
+ } else {
+ throw std::runtime_error("missing argument for output path");
+ }
+ } else if (argp->substr(0, long_output.size()) == long_output) {
+ if (seen_output) { throw std::runtime_error("output path cannot be specified twice"); }
+ seen_output = true;
+ argp->remove_prefix(long_output.size());
+ if (argp->empty()) {
+ throw std::runtime_error("missing argument for output path");
+ } else {
+ opts.output_path = std::make_optional(*argp++);
+ }
+ } else if (*argp == "-h" || *argp == "--help") {
+ opts.help = true;
+ ++argp;
+ } else if (*argp == "-i" || *argp == "--json-indent") {
+ if (seen_json_indent) { throw std::runtime_error("JSON indent cannot be specified twice"); }
+ seen_json_indent = true;
+ if (++argp != args.end()) {
+ auto [ptr, err] = std::from_chars(argp->data(), argp->end(), opts.json_indent);
+ if (err != std::errc() || ptr != argp->end()) {
+ throw std::runtime_error("invalid JSON indent value");
+ }
+ ++argp;
+ } else {
+ throw std::runtime_error("missing argument for JSON indent");
+ }
+ } else if (argp->substr(0, long_json_indent.size()) == long_json_indent) {
+ if (seen_json_indent) { throw std::runtime_error("JSON indent cannot be specified twice"); }
+ seen_json_indent = true;
+ argp->remove_prefix(long_json_indent.size());
+ if (argp->empty()) {
+ throw std::runtime_error("missing argument for JSON indent");
+ } else {
+ auto [ptr, err] = std::from_chars(argp->data(), argp->end(), opts.json_indent);
+ if (err != std::errc() || ptr != argp->end()) {
+ throw std::runtime_error("invalid JSON indent value");
+ }
+ ++argp;
+ }
+ } else if (*argp == "-j" || *argp == "--json") {
+ if (opts.nvtxt) { throw std::runtime_error("JSON and NVTXT output are mutually exclusive"); }
+ opts.json = true;
+ ++argp;
+ } else if (*argp == "-t" || *argp == "--nvtxt") {
+ if (opts.json) { throw std::runtime_error("JSON and NVTXT output are mutually exclusive"); }
+ opts.nvtxt = true;
+ ++argp;
+ } else if (*argp == "-V" || *argp == "--version") {
+ opts.version = true;
+ ++argp;
+ } else if (argp->empty()) {
+ throw std::runtime_error("empty argument");
+ } else if (argp->at(0) == '-') {
+ throw std::runtime_error(std::string("unrecognized option: ") + std::string(*argp));
+ } else {
+ break;
+ }
+ }
+ return std::make_pair(opts, std::vector(argp, args.end()));
+}
+
+void checked_read(std::ifstream& in, char* buffer, size_t size)
+{
+ in.read(buffer, size);
+ if (in.fail()) {
+ if (in.eof()) {
+ throw std::runtime_error("Unexpected EOF");
+ } else {
+ throw std::runtime_error(std::strerror(errno));
+ }
+ }
+}
+
+flatbuffers::uoffset_t read_flatbuffer_size(std::ifstream& in)
+{
+ flatbuffers::uoffset_t fb_size;
+ checked_read(in, reinterpret_cast(&fb_size), sizeof(fb_size));
+ return flatbuffers::EndianScalar(fb_size);
+}
+
+std::unique_ptr> read_flatbuffer(std::ifstream& in)
+{
+ auto size = read_flatbuffer_size(in);
+ // Allocate a buffer that can hold the flatbuffer along with the prefixed size.
+ // SizePrefixed APIs require size to be at the front of the buffer and alignment
+ // of fields is planned out with that size.
+ auto buffer = std::make_unique>(size + sizeof(flatbuffers::uoffset_t));
+ auto size_ptr = reinterpret_cast(buffer->data());
+ *size_ptr = size;
+ checked_read(in, buffer->data() + sizeof(flatbuffers::uoffset_t), size);
+ return buffer;
+}
+
+std::ofstream open_output(std::filesystem::path const& path,
+ std::ios::openmode mode = std::ios::out)
+{
+ if (std::filesystem::exists(path)) {
+ throw std::runtime_error(path.string() + " already exists");
+ }
+ std::ofstream out(path, mode);
+ out.exceptions(std::ios::badbit);
+ return out;
+}
+
+template
+T const* validate_fb(std::vector const& fb, std::string_view const& name)
+{
+ flatbuffers::Verifier::Options verifier_opts;
+ verifier_opts.assert = true;
+ flatbuffers::Verifier verifier(
+ reinterpret_cast(fb.data()), fb.size(), verifier_opts);
+ if (not verifier.VerifySizePrefixedBuffer(nullptr)) {
+ throw std::runtime_error(std::string("malformed ") + std::string(name) + " record");
+ }
+ return flatbuffers::GetSizePrefixedRoot(fb.data());
+}
+
+void verify_profile_header(std::ifstream& in)
+{
+ auto fb_ptr = read_flatbuffer(in);
+ auto header = validate_fb(*fb_ptr, "profile header");
+ auto magic = header->magic();
+ if (magic == nullptr) {
+ throw std::runtime_error("does not appear to be a spark-rapids profile");
+ }
+ if (magic->str() != "spark-rapids profile") {
+ std::ostringstream oss;
+ oss << "bad profile magic, expected 'spark-rapids profile' found '" << magic->str() << "'";
+ throw std::runtime_error(oss.str());
+ }
+ auto version = header->version();
+ if (version != 1) {
+ std::ostringstream oss;
+ oss << "unsupported profile version: " << version;
+ throw std::runtime_error(oss.str());
+ }
+}
+
+void convert_to_nsys_rep(std::ifstream& in,
+ std::string_view const& in_filename,
+ program_options const& opts)
+{
+ event_streams events;
+ size_t num_dropped_records = 0;
+ while (!in.eof()) {
+ auto fb_ptr = read_flatbuffer(in);
+ auto records =
+ validate_fb(*fb_ptr, "ActivityRecords");
+ auto api = records->api();
+ if (api != nullptr) {
+ for (int i = 0; i < api->size(); ++i) {
+ auto a = api->Get(i);
+ thread_id tid{a->process_id(), a->thread_id()};
+ event e{event::type_id::API, a};
+ auto it = events.cpu.find(tid);
+ if (it == events.cpu.end()) {
+ events.cpu.emplace(tid, std::initializer_list{e});
+ } else {
+ it->second.push_back(e);
+ }
+ }
+ }
+ auto device = records->device();
+ if (device != nullptr) { std::cerr << "NUM DEVICES=" << device->size() << std::endl; }
+ auto dropped = records->dropped();
+ if (dropped != nullptr) {
+ for (int i = 0; i < dropped->size(); ++i) {
+ auto d = dropped->Get(i);
+ num_dropped_records += d->num_dropped();
+ }
+ }
+ auto kernel = records->kernel();
+ if (kernel != nullptr) { std::cerr << "NUM KERNEL=" << kernel->size() << std::endl; }
+ auto marker = records->marker();
+ if (marker != nullptr) { std::cerr << "NUM MARKERS=" << marker->size() << std::endl; }
+ auto marker_data = records->marker_data();
+ if (marker_data != nullptr) {
+ std::cerr << "NUM MARKER DATA=" << marker_data->size() << std::endl;
+ for (int i = 0; i < marker_data->size(); ++i) {
+ std::cerr << "MARKER DATA " << i << std::endl;
+ auto md = marker_data->Get(i);
+ std::cerr << " FLAGS: " << md->flags();
+ std::cerr << " ID: " << md->id();
+ std::cerr << " COLOR: " << md->color();
+ std::cerr << " CATEGORY: " << md->category() << std::endl;
+ }
+ }
+ auto memcpy = records->memcpy();
+ if (memcpy != nullptr) { std::cerr << "NUM MEMCPY=" << memcpy->size() << std::endl; }
+ auto memset = records->memset();
+ if (device != nullptr) { std::cerr << "NUM MEMSET=" << memset->size() << std::endl; }
+ auto overhead = records->overhead();
+ if (overhead != nullptr) { std::cerr << "NUM OVERHEADS=" << overhead->size() << std::endl; }
+
+ in.peek();
+ }
+ if (not in.eof()) { throw std::runtime_error(std::strerror(errno)); }
+ if (num_dropped_records) {
+ std::cerr << "Warning: " << num_dropped_records
+ << " records were noted as dropped in the profile" << std::endl;
+ }
+}
+
+void convert_to_json(std::ifstream& in, std::ostream& out, program_options const& opts)
+{
+ flatbuffers::Parser parser;
+ if (parser.Parse(spark_rapids_jni::profiler::Profiler_Schema) != 0) {
+ std::runtime_error("Internal error: Unable to parse profiler schema");
+ }
+ parser.opts.strict_json = true;
+ parser.opts.indent_step = opts.json_indent;
+ while (!in.eof()) {
+ auto fb_ptr = read_flatbuffer(in);
+ auto records =
+ validate_fb(*fb_ptr, "ActivityRecords");
+ std::string json;
+ char const* err =
+ flatbuffers::GenText(parser, fb_ptr->data() + sizeof(flatbuffers::uoffset_t), &json);
+ if (err != nullptr) { throw std::runtime_error(std::string("Error generating JSON: ") + err); }
+ out << json;
+
+ in.peek();
+ }
+ if (not in.eof()) { throw std::runtime_error(std::strerror(errno)); }
+}
+
+char const* get_api_name(spark_rapids_jni::profiler::ApiActivity const* a)
+{
+ char const* name = nullptr;
+ switch (a->kind()) {
+ case spark_rapids_jni::profiler::ApiKind_Driver:
+ cuptiGetCallbackName(CUPTI_CB_DOMAIN_DRIVER_API, a->cbid(), &name);
+ break;
+ case spark_rapids_jni::profiler::ApiKind_Runtime:
+ cuptiGetCallbackName(CUPTI_CB_DOMAIN_RUNTIME_API, a->cbid(), &name);
+ break;
+ default: {
+ std::ostringstream oss;
+ oss << "unsupported API kind: " << a->kind();
+ throw std::runtime_error(oss.str());
+ }
+ }
+ return name;
+}
+
+std::string demangle(char const* s)
+{
+ int status = 0;
+ char* demangled = abi::__cxa_demangle(s, nullptr, nullptr, &status);
+ if (status == 0) {
+ std::string result(demangled);
+ free(demangled);
+ return result;
+ } else {
+ return s;
+ }
+}
+
+std::string memcpy_to_string(spark_rapids_jni::profiler::MemcpyActivity const* m)
+{
+ char const* kind_str;
+ char const* pinned = "";
+ switch (m->copy_kind()) {
+ case spark_rapids_jni::profiler::MemcpyKind_HtoD:
+ kind_str = "HtoD";
+ if (m->src_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { pinned = " Pinned"; }
+ break;
+ case spark_rapids_jni::profiler::MemcpyKind_DtoH:
+ kind_str = "DtoH";
+ if (m->dst_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { pinned = " Pinned"; }
+ break;
+ case spark_rapids_jni::profiler::MemcpyKind_HtoA:
+ kind_str = "HtoA";
+ if (m->dst_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { pinned = " Pinned"; }
+ break;
+ case spark_rapids_jni::profiler::MemcpyKind_AtoH:
+ kind_str = "AtoH";
+ if (m->dst_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { pinned = " Pinned"; }
+ break;
+ case spark_rapids_jni::profiler::MemcpyKind_AtoA: kind_str = "AtoA"; break;
+ case spark_rapids_jni::profiler::MemcpyKind_AtoD: kind_str = "AtoD"; break;
+ case spark_rapids_jni::profiler::MemcpyKind_DtoA: kind_str = "DtoA"; break;
+ case spark_rapids_jni::profiler::MemcpyKind_DtoD: kind_str = "DtoD"; break;
+ case spark_rapids_jni::profiler::MemcpyKind_HtoH:
+ kind_str = "HtoH";
+ if (m->src_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned &&
+ m->dst_kind() == m->src_kind()) {
+ pinned = " Pinned";
+ }
+ break;
+ case spark_rapids_jni::profiler::MemcpyKind_PtoP: kind_str = "PtoP"; break;
+ case spark_rapids_jni::profiler::MemcpyKind_Unknown: kind_str = "Unknown"; break;
+ default: kind_str = "Unknown"; break;
+ }
+ std::ostringstream oss;
+ oss << kind_str << pinned;
+ oss << " " << m->bytes() << " bytes";
+ if (m->flags() == spark_rapids_jni::profiler::MemcpyFlags_Async) { oss << " async"; }
+ return oss.str();
+}
+
+const char* memcpy_to_color(spark_rapids_jni::profiler::MemcpyActivity const* m)
+{
+ switch (m->copy_kind()) {
+ case spark_rapids_jni::profiler::MemcpyKind_HtoD:
+ if (m->src_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { return "MediumPurple"; }
+ return "Gold";
+ case spark_rapids_jni::profiler::MemcpyKind_DtoH:
+ if (m->dst_kind() == spark_rapids_jni::profiler::MemoryKind_Pinned) { return "MediumPurple"; }
+ return "Gold";
+ case spark_rapids_jni::profiler::MemcpyKind_HtoA:
+ case spark_rapids_jni::profiler::MemcpyKind_AtoH:
+ case spark_rapids_jni::profiler::MemcpyKind_AtoA:
+ case spark_rapids_jni::profiler::MemcpyKind_AtoD:
+ case spark_rapids_jni::profiler::MemcpyKind_DtoA: return "Gold";
+ case spark_rapids_jni::profiler::MemcpyKind_DtoD: return "Gold";
+ case spark_rapids_jni::profiler::MemcpyKind_HtoH: return "Ivory";
+ case spark_rapids_jni::profiler::MemcpyKind_PtoP: return "LightSalmon";
+ case spark_rapids_jni::profiler::MemcpyKind_Unknown:
+ default: return "DarkRed";
+ }
+}
+
+std::string memset_to_string(spark_rapids_jni::profiler::MemsetActivity const* m)
+{
+ std::ostringstream oss;
+ oss << "Memset " << m->bytes() << " bytes to " << m->value();
+ if (m->flags() == spark_rapids_jni::profiler::MemsetFlags_Async) { oss << " async"; }
+ return oss.str();
+}
+
+char const* overhead_kind_to_string(spark_rapids_jni::profiler::OverheadKind k)
+{
+ switch (k) {
+ case spark_rapids_jni::profiler::OverheadKind_Unknown: return "Unknown";
+ case spark_rapids_jni::profiler::OverheadKind_DriverCompiler: return "Driver compiler";
+ case spark_rapids_jni::profiler::OverheadKind_CUptiBufferFlush: return "Buffer flush";
+ case spark_rapids_jni::profiler::OverheadKind_CUptiInstrumentation: return "Instrumentation";
+ case spark_rapids_jni::profiler::OverheadKind_CUptiResource: return "Resource";
+ default: return "Unknown";
+ }
+}
+
+// Convert a CUPTI thread ID to an NVTXT thread ID.
+uint32_t to_nvtxt_tid(uint32_t tid)
+{
+ // NVTXT thread IDs are limited to 24-bit.
+ // Take the upper 24 bits which empirically are the most unique bits returned by CUPTI.
+ return tid >> 8;
+}
+
+void convert_to_nvtxt(std::ifstream& in, std::ostream& out, program_options const& opts)
+{
+ struct marker_start {
+ uint64_t timestamp;
+ uint32_t process_id;
+ uint32_t thread_id;
+ uint32_t color;
+ uint32_t category;
+ std::string name;
+ };
+ std::unordered_set streams_seen;
+ std::unordered_map marker_data_map;
+ std::unordered_map marker_start_map;
+ size_t num_dropped_records = 0;
+ out << "@NameProcess,ProcessId,Name" << std::endl;
+ out << "NameProcess,0,\"GPU\"" << std::endl;
+ out << "@NameOsThread,ProcessId,ThreadId,Name" << std::endl;
+ out << "@RangePush,Time,ProcessId,ThreadId,CategoryId,Color,Message" << std::endl;
+ out << "@RangePop,Time,ProcessId,ThreadId" << std::endl;
+ out << "TimeBase=Relative" << std::endl;
+ out << "Payload=0" << std::endl;
+ while (!in.eof()) {
+ auto fb_ptr = read_flatbuffer(in);
+ auto records =
+ validate_fb(*fb_ptr, "ActivityRecords");
+ auto dropped = records->dropped();
+ if (dropped != nullptr) {
+ for (int i = 0; i < dropped->size(); ++i) {
+ auto d = dropped->Get(i);
+ num_dropped_records += d->num_dropped();
+ }
+ }
+ auto api = records->api();
+ if (api != nullptr) {
+ for (int i = 0; i < api->size(); ++i) {
+ auto a = api->Get(i);
+ out << "RangePush," << a->start() << "," << a->process_id() << ","
+ << to_nvtxt_tid(a->thread_id()) << ",0,PaleGreen"
+ << ","
+ << "\"" << get_api_name(a) << "\"" << std::endl;
+ out << "RangePop," << a->end() << "," << a->process_id() << ","
+ << to_nvtxt_tid(a->thread_id()) << std::endl;
+ }
+ }
+ auto marker_data = records->marker_data();
+ if (marker_data != nullptr) {
+ for (int i = 0; i < marker_data->size(); ++i) {
+ auto m = marker_data->Get(i);
+ auto [it, inserted] = marker_data_map.insert({m->id(), m});
+ if (not inserted) {
+ std::ostringstream oss;
+ oss << "duplicate marker data for " << m->id();
+ throw std::runtime_error(oss.str());
+ }
+ }
+ }
+ auto marker = records->marker();
+ if (marker != nullptr) {
+ for (int i = 0; i < marker->size(); ++i) {
+ auto m = marker->Get(i);
+ auto object_id = m->object_id();
+ if (object_id != nullptr) {
+ uint32_t process_id = object_id->process_id();
+ uint32_t thread_id = to_nvtxt_tid(object_id->thread_id());
+ if (process_id == 0) {
+ // abusing thread ID as stream ID since NVTXT does not support GPU activity directly
+ thread_id = object_id->stream_id();
+ // TODO: Ignoring device ID and context here
+ auto [it, inserted] = streams_seen.insert(stream_id{0, 0, thread_id});
+ if (inserted) { out << "NameOsThread,0,\"Stream " << thread_id << "\"" << std::endl; }
+ }
+ if (m->flags() & spark_rapids_jni::profiler::MarkerFlags_Start) {
+ auto it = marker_data_map.find(m->id());
+ uint32_t color = 0x444444;
+ uint32_t category = 0;
+ if (it != marker_data_map.end()) {
+ color = it->second->color();
+ category = it->second->category();
+ }
+ marker_start ms{
+ m->timestamp(), process_id, thread_id, color, category, m->name()->str()};
+ auto [ignored, inserted] = marker_start_map.insert({m->id(), ms});
+ if (not inserted) {
+ std::ostringstream oss;
+ oss << "duplicate marker start for ID " << m->id();
+ throw std::runtime_error(oss.str());
+ }
+ } else if (m->flags() & spark_rapids_jni::profiler::MarkerFlags_End) {
+ auto it = marker_start_map.find(m->id());
+ if (it != marker_start_map.end()) {
+ auto const& ms = it->second;
+ out << "RangePush," << ms.timestamp << "," << ms.process_id << "," << ms.thread_id
+ << "," << ms.category << "," << ms.color << ","
+ << "\"" << ms.name << "\"" << std::endl;
+ out << "RangePop," << m->timestamp() << "," << ms.process_id << "," << ms.thread_id
+ << std::endl;
+ marker_start_map.erase(it);
+ } else {
+ std::cerr << "Ignoring marker end without start for ID " << m->id() << std::endl;
+ }
+ } else {
+ std::cerr << "Ignoring marker with unsupported flags: " << m->flags() << std::endl;
+ }
+ } else {
+ std::cerr << "Marker " << m->id() << " has no object ID" << std::endl;
+ }
+ }
+ }
+ marker_data_map.clear();
+ auto kernel = records->kernel();
+ if (kernel != nullptr) {
+ for (int i = 0; i < kernel->size(); ++i) {
+ auto k = kernel->Get(i);
+ uint32_t process_id = 0;
+ // abusing thread ID as stream ID since NVTXT does not support GPU activity directly
+ uint32_t thread_id = k->stream_id();
+ // TODO: Ignoring device ID and context here
+ auto [it, inserted] = streams_seen.insert(stream_id{0, 0, thread_id});
+ if (inserted) {
+ out << "NameOsThread,0," << thread_id << ",\"Stream " << thread_id << "\"" << std::endl;
+ }
+ out << "RangePush," << k->start() << "," << process_id << "," << thread_id << ",0,Blue"
+ << ","
+ << "\"" << demangle(k->name()->c_str()) << "\"" << std::endl;
+ out << "RangePop," << k->end() << "," << process_id << "," << thread_id << std::endl;
+ }
+ }
+ auto memcpy = records->memcpy();
+ if (memcpy != nullptr) {
+ for (int i = 0; i < memcpy->size(); ++i) {
+ auto m = memcpy->Get(i);
+ uint32_t process_id = 0;
+ // abusing thread ID as stream ID since NVTXT does not support GPU activity directly
+ uint32_t thread_id = m->stream_id();
+ // TODO: Ignoring device ID and context here
+ auto [it, inserted] = streams_seen.insert(stream_id{0, 0, thread_id});
+ if (inserted) {
+ out << "NameOsThread,0," << thread_id << ",\"Stream " << thread_id << "\"" << std::endl;
+ }
+ out << "RangePush," << m->start() << "," << process_id << "," << thread_id << ",0,"
+ << memcpy_to_color(m) << ","
+ << "\"" << memcpy_to_string(m) << "\"" << std::endl;
+ out << "RangePop," << m->end() << "," << process_id << "," << thread_id << std::endl;
+ }
+ }
+ auto memset = records->memset();
+ if (memset != nullptr) {
+ for (int i = 0; i < memset->size(); ++i) {
+ auto m = memset->Get(i);
+ uint32_t process_id = 0;
+ // abusing thread ID as stream ID since NVTXT does not support GPU activity directly
+ uint32_t thread_id = m->stream_id();
+ // TODO: Ignoring device ID and context here
+ auto [it, inserted] = streams_seen.insert(stream_id{0, 0, thread_id});
+ if (inserted) {
+ out << "NameOsThread,0," << thread_id << ",\"Stream " << thread_id << "\"" << std::endl;
+ }
+ out << "RangePush," << m->start() << "," << process_id << "," << thread_id << ",0,Olive"
+ << ","
+ << "\"" << memset_to_string(m) << "\"" << std::endl;
+ out << "RangePop," << m->end() << "," << process_id << "," << thread_id << std::endl;
+ }
+ }
+ auto overhead = records->overhead();
+ if (overhead != nullptr) {
+ for (int i = 0; i < overhead->size(); ++i) {
+ auto o = overhead->Get(i);
+ auto object_id = o->object_id();
+ if (object_id != nullptr) {
+ uint32_t process_id = object_id->process_id();
+ uint32_t thread_id = to_nvtxt_tid(object_id->thread_id());
+ if (process_id == 0) {
+ // abusing thread ID as stream ID since NVTXT does not support GPU activity directly
+ thread_id = object_id->stream_id();
+ // TODO: Ignoring device ID and context here
+ auto [it, inserted] = streams_seen.insert(stream_id{0, 0, thread_id});
+ if (inserted) { out << "NameOsThread,0,\"Stream " << thread_id << "\"" << std::endl; }
+ }
+ out << "RangePush," << o->start() << "," << process_id << "," << thread_id
+ << ",0,OrangeRed"
+ << ","
+ << "\"" << overhead_kind_to_string(o->overhead_kind()) << "\"" << std::endl;
+ out << "RangePop," << o->end() << "," << process_id << "," << thread_id << std::endl;
+ } else {
+ std::cerr << "Overhead activity has no object ID" << std::endl;
+ }
+ }
+ }
+
+ in.peek();
+ }
+ if (num_dropped_records) {
+ std::cerr << "Warning: " << num_dropped_records
+ << " records were noted as dropped in the profile" << std::endl;
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ constexpr int RESULT_SUCCESS = 0;
+ constexpr int RESULT_FAILURE = 1;
+ constexpr int RESULT_USAGE = 2;
+ program_options opts;
+ std::vector files;
+ if (argc < 2) {
+ print_usage();
+ return RESULT_USAGE;
+ }
+ std::vector args(argv + 1, argv + argc);
+ try {
+ auto [options, inputs] = parse_options(args);
+ opts = options;
+ files = inputs;
+ } catch (std::exception const& e) {
+ std::cerr << "spark_rapids_profile_converter: " << e.what() << std::endl;
+ print_usage();
+ return RESULT_USAGE;
+ }
+ if (opts.help) {
+ print_usage();
+ return RESULT_USAGE;
+ }
+ if (opts.version) {
+ print_version();
+ return RESULT_SUCCESS;
+ }
+ if (files.size() != 1) {
+ std::cerr << "Missing input file." << std::endl;
+ print_usage();
+ return RESULT_USAGE;
+ }
+ auto input_file = files.front();
+ try {
+ std::ifstream in(std::string(input_file), std::ios::binary | std::ios::in);
+ in.exceptions(std::istream::badbit);
+ verify_profile_header(in);
+ if (opts.json) {
+ if (opts.output_path) {
+ std::ofstream out = open_output(opts.output_path.value());
+ convert_to_json(in, out, opts);
+ } else {
+ convert_to_json(in, std::cout, opts);
+ }
+ } else if (opts.nvtxt) {
+ if (opts.output_path) {
+ std::ofstream out = open_output(opts.output_path.value());
+ convert_to_nvtxt(in, out, opts);
+ } else {
+ convert_to_nvtxt(in, std::cout, opts);
+ }
+ } else {
+ convert_to_nsys_rep(in, input_file, opts);
+ }
+ } catch (std::system_error const& e) {
+ std::cerr << "Error converting " << input_file << ": " << e.code().message() << std::endl;
+ return RESULT_FAILURE;
+ } catch (std::exception const& e) {
+ std::cerr << "Error converting " << input_file << ": " << e.what() << std::endl;
+ return RESULT_FAILURE;
+ }
+ return RESULT_SUCCESS;
+}
diff --git a/src/main/cpp/src/spark_rapids_jni_version.cpp.in b/src/main/cpp/src/spark_rapids_jni_version.cpp.in
new file mode 100644
index 0000000000..fdc2aa3007
--- /dev/null
+++ b/src/main/cpp/src/spark_rapids_jni_version.cpp.in
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "spark_rapids_jni_version.h"
+
+namespace spark_rapids_jni {
+
+char const Version[] = "@CMAKE_PROJECT_VERSION@ @SPARK_RAPIDS_JNI_COMMIT_DETAILS@";
+
+}
diff --git a/src/main/cpp/src/spark_rapids_jni_version.h b/src/main/cpp/src/spark_rapids_jni_version.h
new file mode 100644
index 0000000000..c77a8ec5a9
--- /dev/null
+++ b/src/main/cpp/src/spark_rapids_jni_version.h
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace spark_rapids_jni {
+
+extern char const Version[];
+
+}
diff --git a/src/main/fbs/profiler.fbs b/src/main/fbs/profiler.fbs
new file mode 100644
index 0000000000..0770be33cf
--- /dev/null
+++ b/src/main/fbs/profiler.fbs
@@ -0,0 +1,287 @@
+// Copyright (c) 2024, NVIDIA CORPORATION.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Flatbuffer schema for the profiler
+// NOTE: The schema needs to be in a single file because the build embeds it
+// into the converter tool to be able to emit profile records as JSON.
+
+// Profiling data is written as a series of size-prefixed flatbuffers.
+// The first flatbuffer is always ProfileHeader followed by zero or more ActivityRecords.
+
+namespace spark_rapids_jni.profiler;
+
+table ActivityObjectId {
+ process_id:uint32; // present if object kind is Process or Thread
+ thread_id:uint32; // present if object kind is Thread
+ device_id:uint32; // present if object kind is Device or Context or Stream
+ context_id:uint32; // present if object kind is Context or Stream
+ stream_id:uint32; // present if object kind is Stream
+}
+
+enum ApiKind:byte {
+ Driver = 0,
+ Runtime = 1
+}
+
+enum ChannelType:uint8 {
+ Invalid = 0,
+ Compute = 1,
+ AsyncMemcpy = 2
+}
+
+table CommandBufferFullData {
+ command_buffer_length:uint32;
+ channel_id:uint32;
+ channel_type:uint32;
+}
+
+enum LaunchType:uint8 {
+ Regular = 0,
+ CooperativeSingleDevice = 1,
+ CooperativeMultiDevice = 2
+}
+
+enum MarkerFlags:uint8 (bit_flags) {
+ Instantaneous = 0,
+ Start = 1,
+ End = 2,
+ SyncAcquire = 3,
+ SyncAcquireSuccess = 4,
+ SyncAcquireFailed = 5,
+ SyncRelease = 6
+}
+
+enum MemcpyFlags:uint8 (bit_flags) {
+ Async = 0
+}
+
+enum MemcpyKind:uint8 {
+ Unknown = 0,
+ HtoD = 1,
+ DtoH = 2,
+ HtoA = 3,
+ AtoH = 4,
+ AtoA = 5,
+ AtoD = 6,
+ DtoA = 7,
+ DtoD = 8,
+ HtoH = 9,
+ PtoP = 10
+}
+
+enum MemoryKind:uint8 {
+ Unknown = 0,
+ Pageable = 1,
+ Pinned = 2,
+ Device = 3,
+ Array = 4,
+ Managed = 5,
+ DeviceStatic = 6,
+ ManagedStatic = 7
+}
+
+enum MemsetFlags:uint8 (bit_flags) {
+ Async = 0
+}
+
+enum OverheadKind:uint8 {
+ Unknown = 0,
+ DriverCompiler = 1,
+ CUptiBufferFlush = 2,
+ CUptiInstrumentation = 3,
+ CUptiResource = 4
+}
+
+enum PartitionedGlobalCacheConfig:uint8 {
+ Unknown = 0,
+ NotSupported = 1,
+ Off = 2,
+ On = 3
+}
+
+enum ShmemLimitConfig:uint8 {
+ Default = 0,
+ Optin = 1
+}
+
+table ProfileHeader {
+ magic:string;
+ version:uint32;
+ writer_version:string;
+}
+
+table ActivityRecords {
+ api:[ApiActivity];
+ device:[DeviceActivity];
+ dropped:[DroppedRecords];
+ kernel:[KernelActivity];
+ marker:[MarkerActivity];
+ marker_data:[MarkerData];
+ memcpy:[MemcpyActivity];
+ memset:[MemsetActivity];
+ overhead:[OverheadActivity];
+}
+
+table ApiActivity {
+ kind:ApiKind = Runtime;
+ cbid:uint32;
+ start:uint64;
+ end:uint64;
+ process_id:uint32;
+ thread_id:uint32;
+ correlation_id:uint32;
+ return_value:uint32 = 0;
+}
+
+table DeviceActivity {
+ global_memory_bandwidth:uint64;
+ global_memory_size:uint64;
+ constant_memory_size:uint32;
+ l2_cache_size:uint32;
+ num_threads_per_warp:uint32;
+ core_clock_rate:uint32;
+ num_memcpy_engines:uint32;
+ num_multiprocessors:uint32;
+ max_ipc:uint32;
+ max_warps_per_multiprocessor:uint32;
+ max_blocks_per_multiprocessor:uint32;
+ max_shared_memory_per_multiprocessor:uint32;
+ max_registers_per_multiprocessor:uint32;
+ max_registers_per_block:uint32;
+ max_shared_memory_per_block:uint32;
+ max_threads_per_block:uint32;
+ max_block_dim_x:uint32;
+ max_block_dim_y:uint32;
+ max_block_dim_z:uint32;
+ max_grid_dim_x:uint32;
+ max_grid_dim_y:uint32;
+ max_grid_dim_z:uint32;
+ compute_capability_major:uint32;
+ compute_capability_minor:uint32;
+ id:uint32;
+ ecc_enabled:uint32;
+ name:string;
+}
+
+table DroppedRecords {
+ num_dropped:uint64;
+}
+
+table KernelActivity {
+ requested:uint8;
+ executed:uint8;
+ shared_memory_config:uint8;
+ registers_per_thread:uint16;
+ partitioned_global_cache_requested:PartitionedGlobalCacheConfig;
+ partitioned_global_cache_executed:PartitionedGlobalCacheConfig;
+ start:uint64;
+ end:uint64;
+ completed:uint64 = 0;
+ device_id:uint32;
+ context_id:uint32;
+ stream_id:uint32;
+ grid_x:int32;
+ grid_y:int32;
+ grid_z:int32;
+ block_x:int32;
+ block_y:int32;
+ block_z:int32;
+ static_shared_memory:int32;
+ dynamic_shared_memory:int32;
+ local_memory_per_thread:uint32;
+ local_memory_total:uint32;
+ correlation_id:uint32;
+ grid_id:int64;
+ name:string;
+ queued:uint64 = 0;
+ submitted:uint64 = 0;
+ launch_type:LaunchType = Regular;
+ is_shared_memory_carveout_requested:uint8;
+ shared_memory_carveout_requested:uint8;
+ shared_memory_executed:uint32;
+ graph_node_id:uint64 = 0;
+ shmem_limit_config:ShmemLimitConfig = Default;
+ graph_id:uint32 = 0;
+ //access_policy_window:???;
+ channel_id:uint32;
+ channel_type:ChannelType;
+ cluster_x:uint32;
+ cluster_y:uint32;
+ cluster_z:uint32;
+ cluster_scheduling_policy:uint32;
+ local_memory_total_v2:uint64;
+}
+
+table MarkerActivity {
+ flags:MarkerFlags = Start;
+ timestamp:uint64;
+ id:int32;
+ object_id:ActivityObjectId;
+ name:string;
+ domain:string;
+}
+
+table MarkerData {
+ flags:MarkerFlags = Start;
+ id:int32;
+ //payload_kind:MetricValueKind;
+ //payload:MetricValue;
+ color:uint32;
+ category:uint32;
+}
+
+table MemcpyActivity {
+ copy_kind:MemcpyKind;
+ src_kind:MemoryKind;
+ dst_kind:MemoryKind;
+ flags:MemcpyFlags;
+ bytes:uint64;
+ start:uint64;
+ end:uint64;
+ device_id:uint32;
+ context_id:uint32;
+ stream_id:uint32;
+ correlation_id:uint32;
+ runtime_correlation_id:uint32;
+ graph_node_id:uint64 = 0;
+ graph_id:uint32 = 0;
+ channel_id:uint32;
+ channel_type:ChannelType;
+}
+
+table MemsetActivity {
+ value:uint32;
+ bytes:uint64;
+ start:uint64;
+ end:uint64;
+ device_id:uint32;
+ context_id:uint32;
+ stream_id:uint32;
+ correlation_id:uint32;
+ flags:MemsetFlags;
+ memory_kind:MemoryKind;
+ graph_node_id:uint64 = 0;
+ graph_id:uint32 = 0;
+ channel_id:uint32;
+ channel_type:ChannelType;
+}
+
+table OverheadActivity {
+ overhead_kind:OverheadKind;
+ object_id:ActivityObjectId;
+ start:uint64;
+ end:uint64;
+}
+
+root_type ActivityRecords;
diff --git a/src/main/java/com/nvidia/spark/rapids/jni/Profiler.java b/src/main/java/com/nvidia/spark/rapids/jni/Profiler.java
new file mode 100644
index 0000000000..86d5b0edde
--- /dev/null
+++ b/src/main/java/com/nvidia/spark/rapids/jni/Profiler.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.nvidia.spark.rapids.jni;
+
+import ai.rapids.cudf.NativeDepsLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** Profiler that collects CUDA and NVTX events for the current process. */
+public class Profiler {
+ private static final long DEFAULT_WRITE_BUFFER_SIZE = 1024 * 1024;
+ private static final int DEFAULT_FLUSH_PERIOD_MILLIS = 0;
+ private static DataWriter writer = null;
+
+ /**
+ * Initialize the profiler in a standby state. The start method must be called after this
+ * to start collecting profiling data.
+ * @param w data writer for writing profiling data
+ */
+ public static void init(DataWriter w) {
+ init(w, DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_FLUSH_PERIOD_MILLIS);
+ }
+
+ /**
+ * Initialize the profiler in a standby state. The start method must be called after this
+ * to start collecting profiling data.
+ * @param w data writer for writing profiling data
+ * @param writeBufferSize size of host memory buffer to use for collecting profiling data.
+ * Recommended to be between 1-8 MB in size to balance callback
+ * overhead with latency.
+ * @param flushPeriodMillis time period in milliseconds to explicitly flush collected
+ * profiling data to the writer. A value <= 0 will disable explicit
+ * flushing.
+ */
+ public static void init(DataWriter w, long writeBufferSize, int flushPeriodMillis) {
+ if (writer == null) {
+ File libPath;
+ try {
+ libPath = NativeDepsLoader.loadNativeDep("profilerjni", true);
+ } catch (IOException e) {
+ throw new RuntimeException("Error loading profiler library", e);
+ }
+ nativeInit(libPath.getAbsolutePath(), w, writeBufferSize, flushPeriodMillis);
+ writer = w;
+ } else {
+ throw new IllegalStateException("Already initialized");
+ }
+ }
+
+ /**
+ * Shutdown the profiling session. Flushes collected profiling data to the writer and
+ * closes the writer.
+ */
+ public static void shutdown() {
+ if (writer != null) {
+ nativeShutdown();
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Error closing writer", e);
+ } finally {
+ writer = null;
+ }
+ }
+ }
+
+ /**
+ * Start collecting profiling data. Safe to call if profiling data is already being collected.
+ */
+ public static void start() {
+ if (writer != null) {
+ nativeStart();
+ } else {
+ throw new IllegalStateException("Profiler not initialized");
+ }
+ }
+
+ /**
+ * Stop collecting profiling data. Safe to call if the profiler is initialized but not
+ * actively collecting data.
+ */
+ public static void stop() {
+ if (writer != null) {
+ nativeStop();
+ } else {
+ throw new IllegalStateException("Profiler not initialized");
+ }
+ }
+
+ private static native void nativeInit(String libPath, DataWriter writer,
+ long writeBufferSize, int flushPeriodMillis);
+
+ private static native void nativeStart();
+
+ private static native void nativeStop();
+
+ private static native void nativeShutdown();
+
+ /** Interface for profiler data writers */
+ public interface DataWriter extends AutoCloseable {
+ /**
+ * Called by the profiler to write a block of profiling data. Profiling data is written
+ * in a size-prefixed flatbuffer format. See profiler.fbs for the schema.
+ * @param data profiling data to be written
+ */
+ void write(ByteBuffer data);
+ }
+}