Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-383] Release SMJ input data immediately after being used (#387)
Browse files Browse the repository at this point in the history
* [NSE-383] Release SMJ input data immediately after being used

Also, add switch option spark.oap.sql.columnar.sortmergejoin.lazyread to
Spark config.

* rebase

* rebase
  • Loading branch information
zhztheplayer authored Jul 23, 2021
1 parent fe6c798 commit 6b12a93
Show file tree
Hide file tree
Showing 21 changed files with 806 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.intel.oap.ColumnarPluginConfig;
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.Spiller;
import org.apache.arrow.dataset.jni.NativeSerializedRecordBatchIterator;
import org.apache.arrow.memory.ArrowBuf;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -143,6 +144,12 @@ public ArrowRecordBatch[] evaluate(ArrowRecordBatch recordBatch) throws RuntimeE
return evaluate(recordBatch, null);
}

public void evaluate(NativeSerializedRecordBatchIterator batchItr)
throws RuntimeException, IOException {
jniWrapper.nativeEvaluateWithIterator(nativeHandler,
batchItr);
}

/**
* Evaluate input data using builded native function, and output as recordBatch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.intel.oap.vectorized;

import org.apache.arrow.dataset.jni.NativeSerializedRecordBatchIterator;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.spark.memory.MemoryConsumer;

import java.io.IOException;
Expand Down Expand Up @@ -147,6 +149,15 @@ native ArrowRecordBatchBuilder[] nativeEvaluate(long nativeHandler, int numRows,

native ArrowRecordBatchBuilder[] nativeEvaluate2(long nativeHandler, byte[] bytes) throws RuntimeException;

/**
* Evaluate the expressions represented by the nativeHandler on a record batch
* iterator. Throws an exception in case of errors
*
* @param nativeHandler a iterator instance carrying input record batches
*/
native void nativeEvaluateWithIterator(long nativeHandler,
NativeSerializedRecordBatchIterator batchItr) throws RuntimeException;

/**
* Get native kernel signature by the nativeHandler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines = try source.mkString finally source.close()
return true
//TODO(): check CPU flags to enable/disable AVX512
if (lines.contains("GenuineIntel")) {
return true
Expand Down Expand Up @@ -79,6 +80,9 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
val enableColumnarSortMergeJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.sortmergejoin", "true").toBoolean && enableCpu

val enableColumnarSortMergeJoinLazyRead: Boolean =
conf.getConfString("spark.oap.sql.columnar.sortmergejoin.lazyread", "false").toBoolean

// enable or disable columnar union
val enableColumnarUnion: Boolean =
conf.getConfString("spark.oap.sql.columnar.union", "true").toBoolean && enableCpu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package com.intel.oap.execution

import java.util.concurrent.TimeUnit.NANOSECONDS

import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized.{BatchIterator, ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
Expand All @@ -33,10 +36,13 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.apache.arrow.dataset.jni.NativeSerializedRecordBatchIterator
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch

case class ColumnarCodegenContext(inputSchema: Schema, outputSchema: Schema, root: TreeNode) {}

trait ColumnarCodegenSupport extends SparkPlan {
Expand Down Expand Up @@ -73,6 +79,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
val enableColumnarSortMergeJoinLazyRead = ColumnarPluginConfig.getConf.enableColumnarSortMergeJoinLazyRead

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down Expand Up @@ -290,6 +297,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
var idx = 0
var curRDD = inputRDDs()(0)
while (idx < buildPlans.length) {

val curPlan = buildPlans(idx)._1
val parentPlan = buildPlans(idx)._2

Expand Down Expand Up @@ -408,15 +416,46 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
Lists.newArrayList(expression),
outputSchema,
true)
while (depIter.hasNext) {
val dep_cb = depIter.next()
if (dep_cb.numRows > 0) {
(0 until dep_cb.numCols).toList.foreach(i =>
dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain())
buildRelationBatchHolder += dep_cb
val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb)
cachedRelationKernel.evaluate(dep_rb)
ConverterUtils.releaseArrowRecordBatch(dep_rb)

if (enableColumnarSortMergeJoinLazyRead) {
// Used as ABI to prevent from serializing buffer data
val serializedItr: NativeSerializedRecordBatchIterator = {
new NativeSerializedRecordBatchIterator {

override def hasNext: Boolean = {
depIter.hasNext
}

override def next(): Array[Byte] = {
val dep_cb = depIter.next()
if (dep_cb.numRows > 0) {
val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb)
serialize(dep_rb)
} else {
throw new IllegalStateException()
}
}

private def serialize(batch: ArrowRecordBatch) = {
UnsafeRecordBatchSerializer.serializeUnsafe(batch)
}

override def close(): Unit = {
}
}
}
cachedRelationKernel.evaluate(serializedItr)
} else {
while (depIter.hasNext) {
val dep_cb = depIter.next()
if (dep_cb.numRows > 0) {
(0 until dep_cb.numCols).toList.foreach(i =>
dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain())
buildRelationBatchHolder += dep_cb
val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb)
cachedRelationKernel.evaluate(dep_rb)
ConverterUtils.releaseArrowRecordBatch(dep_rb)
}
}
}
dependentKernels += cachedRelationKernel
Expand Down Expand Up @@ -570,7 +609,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
def close = {
closed = true
pipelineTime += (eval_elapse + build_elapse) / 1000000
buildRelationBatchHolder.foreach(_.close)
buildRelationBatchHolder.foreach(_.close) // fixing: ref cnt goes nagative
dependentKernels.foreach(_.close)
dependentKernelIterators.foreach(_.close)
nativeKernel.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
.set("spark.unsafe.exceptionOnMemoryLeak", "false")
.set("spark.network.io.preferDirectBufs", "false")
.set("spark.sql.sources.useV1SourceList", "arrow,parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
return conf
}


override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
val tGen = new TPCDSTableGen(spark, 0.01D, TPCDS_WRITE_PATH)
val tGen = new TPCDSTableGen(spark, 0.1D, TPCDS_WRITE_PATH)
tGen.gen()
tGen.createTables()
runner = new TPCRunner(spark, TPCDS_QUERIES_RESOURCE)
Expand Down Expand Up @@ -91,6 +92,10 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
runner.runTPCQuery("q67", 1, true)
}

test("smj query") {
runner.runTPCQuery("q1", 1, true)
}

test("window function with non-decimal input") {
val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_category_id)" +
" OVER (PARTITION BY i_class_id) FROM item LIMIT 1000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TPCHSuite extends QueryTest with SharedSparkSession {
.set("spark.executor.heartbeatInterval", "3600000")
.set("spark.network.timeout", "3601s")
.set("spark.oap.sql.columnar.preferColumnar", "true")
.set("spark.oap.sql.columnar.sortmergejoin", "true")
.set("spark.oap.sql.columnar.sortmergejoin", "true")
.set("spark.sql.columnar.codegen.hashAggregate", "false")
.set("spark.sql.columnar.sort", "true")
.set("spark.sql.columnar.window", "true")
Expand Down Expand Up @@ -96,13 +96,21 @@ class TPCHSuite extends QueryTest with SharedSparkSession {
}
}

ignore("q12 SMJ failure") {
test("q12 SMJ") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.oap.sql.columnar.sortmergejoin", "true")) {
runner.runTPCQuery("q12", 1, true)
}
}

test("q12 SMJ lazy") {
withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1"),
("spark.oap.sql.columnar.sortmergejoin", "true"),
("spark.oap.sql.columnar.sortmergejoin.lazyread", "true")) {
runner.runTPCQuery("q12", 1, true)
}
}

private def runMemoryUsageTest(exclusions: Array[String] = Array[String](), comment: String = ""): Unit = {
val enableTPCHTests = Option(System.getenv("ENABLE_TPCH_TESTS"))
if (!enableTPCHTests.exists(_.toBoolean)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ class ArrowComputeCodeGenerator : public CodeGenerator {
return status;
}

arrow::Status evaluate(arrow::RecordBatchIterator in) override {
for (auto visitor : visitor_list_) {
TIME_MICRO_OR_RAISE(eval_elapse_time_, visitor->Eval(std::move(in)));
}
return arrow::Status::OK();
};

arrow::Status evaluate(const std::shared_ptr<arrow::Array>& selection_in,
const std::shared_ptr<arrow::RecordBatch>& in,
std::vector<std::shared_ptr<arrow::RecordBatch>>* out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ arrow::Status ExprVisitor::Eval(std::shared_ptr<arrow::RecordBatch>& in) {
return arrow::Status::OK();
}

arrow::Status ExprVisitor::Eval(arrow::RecordBatchIterator in) {
input_type_ = ArrowComputeInputType::Iterator;
in_iterator_ = std::move(in);
RETURN_NOT_OK(Eval());
return arrow::Status::OK();
}

arrow::Status ExprVisitor::Eval() {
if (return_type_ != ArrowComputeResultType::None) {
#ifdef DEBUG_LEVEL_2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow/compute/api.h>
#include <arrow/status.h>
#include <arrow/util/iterator.h>
#include <gandiva/node.h>
#include <gandiva/node_visitor.h>

Expand All @@ -40,6 +41,7 @@ class ExprVisitorImpl;

using ExprVisitorMap = std::unordered_map<std::string, std::shared_ptr<ExprVisitor>>;
using ArrayList = std::vector<std::shared_ptr<arrow::Array>>;
enum class ArrowComputeInputType { Legacy, Iterator };
enum class ArrowComputeResultType { Array, Batch, BatchList, BatchIterator, None };
enum class BuilderVisitorNodeType { FunctionNode, FieldNode };

Expand Down Expand Up @@ -153,6 +155,7 @@ class ExprVisitor : public std::enable_shared_from_this<ExprVisitor> {
arrow::Status Eval(const std::shared_ptr<arrow::Array>& selection_in,
const std::shared_ptr<arrow::RecordBatch>& in);
arrow::Status Eval(std::shared_ptr<arrow::RecordBatch>& in);
arrow::Status Eval(arrow::RecordBatchIterator in);
arrow::Status Eval();
std::string GetSignature() { return signature_; }
arrow::Status SetMember(const std::shared_ptr<arrow::RecordBatch>& ms);
Expand Down Expand Up @@ -191,6 +194,8 @@ class ExprVisitor : public std::enable_shared_from_this<ExprVisitor> {
std::shared_ptr<arrow::Array> in_selection_array_;
std::shared_ptr<arrow::RecordBatch> in_record_batch_;
std::vector<std::shared_ptr<arrow::RecordBatch>> in_record_batch_holder_;
ArrowComputeInputType input_type_ = ArrowComputeInputType::Legacy;
arrow::RecordBatchIterator in_iterator_;
std::vector<std::shared_ptr<arrow::Field>> ret_fields_;

// For dual input kernels like probe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,15 @@ class CachedRelationVisitorImpl : public ExprVisitorImpl {
arrow::Status Eval() override {
switch (p_->dependency_result_type_) {
case ArrowComputeResultType::None: {
std::vector<std::shared_ptr<arrow::Array>> col_list;
for (auto col : p_->in_record_batch_->columns()) {
col_list.push_back(col);
if (p_->input_type_ == ArrowComputeInputType::Iterator) {
RETURN_NOT_OK(kernel_->Evaluate(std::move(p_->in_iterator_)));
} else {
std::vector<std::shared_ptr<arrow::Array>> col_list;
for (auto col : p_->in_record_batch_->columns()) {
col_list.push_back(col);
}
RETURN_NOT_OK(kernel_->Evaluate(col_list));
}
RETURN_NOT_OK(kernel_->Evaluate(col_list));
} break;
default:
return arrow::Status::NotImplemented(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

#pragma once
#include <arrow/util/iterator.h>

#include "codegen/common/result_iterator.h"
#include "precompile/array.h"

Expand All @@ -37,6 +39,10 @@ class CodeGenBase {
return arrow::Status::NotImplemented(
"CodeGenBase Evaluate is an abstract interface.");
}
virtual arrow::Status Evaluate(arrow::RecordBatchIterator in) {
return arrow::Status::NotImplemented(
"CodeGenBase Evaluate is an abstract interface.");
}
virtual arrow::Status Evaluate(const ArrayList& in, const ArrayList& projected_batch) {
return arrow::Status::NotImplemented(
"CodeGenBase Evaluate is an abstract interface.");
Expand Down
Loading

0 comments on commit 6b12a93

Please sign in to comment.