Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1413] Support Spark 4.0 #2813

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,12 @@ jobs:
run: |
SPARK_BINARY_VERSION=${{ matrix.spark }}
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
SPARK_MODULE_NAME=$SPARK_MAJOR_VERSION
if [[ $SPARK_MAJOR_VERSION == "3" ]]; then
SPARK_MODULE_NAME="3-4"
fi
PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MODULE_NAME},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
- name: Upload test log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.DAGScheduler;

public class SparkCommonUtils {
public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException {
int DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4;
int maxStageAttempts =
conf.getInt(
"spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
conf.getInt("spark.stage.maxConsecutiveAttempts", DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS);
// In Spark 2, the parameter is referred to as MAX_TASK_FAILURES, while in Spark 3, it has been
// changed to TASK_MAX_FAILURES. The default value for both is consistently set to 4.
int maxTaskAttempts = conf.getInt("spark.task.maxFailures", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Client for Spark 3</name>
<name>Celeborn Client for Spark 3 and 4</name>

<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-columnar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-columnar-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/spark-3-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<artifactId>celeborn-client-spark-3-4_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
Expand Down
68 changes: 68 additions & 0 deletions client-spark/spark-4-columnar-shuffle/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>celeborn-spark-4-columnar-shuffle_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Client for Spark 4 Columnar Shuffle</name>

<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-spark-3-columnar-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-client-spark-3_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.celeborn;

import java.io.IOException;

import scala.Product2;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Private;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.UnsafeRowSerializer;
import org.apache.spark.sql.execution.columnar.CelebornBatchBuilder;
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchBuilder;
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchCodeGenBuild;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;

@Private
public class ColumnarHashBasedShuffleWriter<K, V, C> extends HashBasedShuffleWriter<K, V, C> {

private static final Logger logger =
LoggerFactory.getLogger(ColumnarHashBasedShuffleWriter.class);

private final int stageId;
private final int shuffleId;
private final CelebornBatchBuilder[] celebornBatchBuilders;
private final StructType schema;
private final Serializer depSerializer;
private final boolean isColumnarShuffle;
private final int columnarShuffleBatchSize;
private final boolean columnarShuffleCodeGenEnabled;
private final boolean columnarShuffleDictionaryEnabled;
private final double columnarShuffleDictionaryMaxFactor;

public ColumnarHashBasedShuffleWriter(
int shuffleId,
CelebornShuffleHandle<K, V, C> handle,
TaskContext taskContext,
CelebornConf conf,
ShuffleClient client,
ShuffleWriteMetricsReporter metrics,
SendBufferPool sendBufferPool)
throws IOException {
super(shuffleId, handle, taskContext, conf, client, metrics, sendBufferPool);
columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
columnarShuffleDictionaryEnabled = conf.columnarShuffleDictionaryEnabled();
columnarShuffleDictionaryMaxFactor = conf.columnarShuffleDictionaryMaxFactor();
ShuffleDependency<?, ?, ?> shuffleDependency = handle.dependency();
this.stageId = taskContext.stageId();
this.shuffleId = shuffleDependency.shuffleId();
this.schema = CustomShuffleDependencyUtils.getSchema(shuffleDependency);
this.depSerializer = handle.dependency().serializer();
this.celebornBatchBuilders =
new CelebornBatchBuilder[handle.dependency().partitioner().numPartitions()];
this.isColumnarShuffle = schema != null && CelebornBatchBuilder.supportsColumnarType(schema);
}

@Override
protected void fastWrite0(scala.collection.Iterator iterator)
throws IOException, InterruptedException {
if (isColumnarShuffle) {
logger.info("Fast columnar write of columnar shuffle {} for stage {}.", shuffleId, stageId);
fastColumnarWrite0(iterator);
} else {
super.fastWrite0(iterator);
}
}

private void fastColumnarWrite0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records = iterator;

SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
final UnsafeRow row = record._2();

if (celebornBatchBuilders[partitionId] == null) {
CelebornBatchBuilder columnBuilders;
if (columnarShuffleCodeGenEnabled && !columnarShuffleDictionaryEnabled) {
columnBuilders =
new CelebornColumnarBatchCodeGenBuild().create(schema, columnarShuffleBatchSize);
} else {
columnBuilders =
new CelebornColumnarBatchBuilder(
schema,
columnarShuffleBatchSize,
columnarShuffleDictionaryMaxFactor,
columnarShuffleDictionaryEnabled);
}
columnBuilders.newBuilders();
celebornBatchBuilders[partitionId] = columnBuilders;
}

celebornBatchBuilders[partitionId].writeRow(row);
if (celebornBatchBuilders[partitionId].getRowCnt() >= columnarShuffleBatchSize) {
byte[] arr = celebornBatchBuilders[partitionId].buildColumnBytes();
pushGiantRecord(partitionId, arr, arr.length);
if (dataSize != null) {
dataSize.add(arr.length);
}
celebornBatchBuilders[partitionId].newBuilders();
}
tmpRecordsWritten++;
}
}

@Override
protected void closeWrite() throws IOException {
if (canUseFastWrite() && isColumnarShuffle) {
closeColumnarWrite();
} else {
super.closeWrite();
}
}

private void closeColumnarWrite() throws IOException {
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer) depSerializer);
for (int i = 0; i < celebornBatchBuilders.length; i++) {
final CelebornBatchBuilder builders = celebornBatchBuilders[i];
if (builders != null && builders.getRowCnt() > 0) {
byte[] buffers = builders.buildColumnBytes();
if (dataSize != null) {
dataSize.add(buffers.length);
}
mergeData(i, buffers, 0, buffers.length);
// free buffer
celebornBatchBuilders[i] = null;
}
}
}

@VisibleForTesting
public boolean isColumnarShuffle() {
return isColumnarShuffle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.celeborn

import org.apache.spark.{ShuffleDependency, TaskContext}
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.sql.execution.UnsafeRowSerializer
import org.apache.spark.sql.execution.columnar.{CelebornBatchBuilder, CelebornColumnarBatchSerializer}

import org.apache.celeborn.common.CelebornConf

class CelebornColumnarShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter,
shuffleIdTracker: ExecutorShuffleIdTracker)
extends CelebornShuffleReader[K, C](
handle,
startPartition,
endPartition,
startMapIndex,
endMapIndex,
context,
conf,
metrics,
shuffleIdTracker) {

override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): SerializerInstance = {
val schema = CustomShuffleDependencyUtils.getSchema(dep)
if (schema != null && CelebornBatchBuilder.supportsColumnarType(schema)) {
logInfo(s"Creating column batch serializer of columnar shuffle ${dep.shuffleId}.")
val dataSize = SparkUtils.getDataSize(dep.serializer.asInstanceOf[UnsafeRowSerializer])
new CelebornColumnarBatchSerializer(
schema,
conf.columnarShuffleOffHeapEnabled,
dataSize).newInstance()
} else {
super.newSerializerInstance(dep)
}
}
}
Loading
Loading