Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix style in pr #2630 #2644

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.*;
import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.Cuda;
import ai.rapids.cudf.HostColumnVector;
import ai.rapids.cudf.JCudfSerialization;
import ai.rapids.cudf.Schema;
import ai.rapids.cudf.Table;
import com.nvidia.spark.rapids.jni.Pair;
import com.nvidia.spark.rapids.jni.schema.Visitors;

import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

/**
* This class is used to serialize/deserialize a table using the Kudo format.
*
Expand Down Expand Up @@ -148,8 +157,9 @@
public class KudoSerializer {

private static final byte[] PADDING = new byte[64];
private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};
private static final BufferType[] ALL_BUFFER_TYPES =
new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};

static {
Arrays.fill(PADDING, (byte) 0);
Expand All @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) {
* @param numRows number of rows to write
* @return number of bytes written
*/
WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) {
WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) {
HostColumnVector[] columns = null;
try {
columns = IntStream.range(0, table.getNumberOfColumns())
Expand All @@ -199,10 +209,12 @@ WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffs
* Write partition of an array of {@link HostColumnVector} to an output stream.
* See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more
* details.
*
* @return number of bytes written
*/
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) {
return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes();
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset,
int numRows) {
return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes();
}

/**
Expand All @@ -218,7 +230,8 @@ public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowO
* @param numRows number of rows to write
* @return number of bytes written
*/
public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) {
public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out,
int rowOffset, int numRows) {
ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows);
ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " +
"please call writeRowCountToStream");
Expand Down Expand Up @@ -296,9 +309,11 @@ public Pair<Table, MergeMetrics> mergeToTable(List<KudoTable> kudoTables) throws
}
}

private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception {
private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset,
int numRows) throws Exception {
WriteMetrics metrics = new WriteMetrics();
KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
KudoTableHeaderCalc headerCalc =
new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime);
KudoTableHeader header = headerCalc.getHeader();
long currentTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.jni.kudo;

import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment;
import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.withTime;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.DType;
Expand Down
Loading