Skip to content

Commit

Permalink
Fix style in last pr #2630 (#2644)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Dec 4, 2024
1 parent 3a0b458 commit 692623e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
43 changes: 29 additions & 14 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.*;
import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.Cuda;
import ai.rapids.cudf.HostColumnVector;
import ai.rapids.cudf.JCudfSerialization;
import ai.rapids.cudf.Schema;
import ai.rapids.cudf.Table;
import com.nvidia.spark.rapids.jni.Pair;
import com.nvidia.spark.rapids.jni.schema.Visitors;

import java.io.*;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

/**
* This class is used to serialize/deserialize a table using the Kudo format.
*
Expand Down Expand Up @@ -148,8 +157,9 @@
public class KudoSerializer {

private static final byte[] PADDING = new byte[64];
private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};
private static final BufferType[] ALL_BUFFER_TYPES =
new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET,
BufferType.DATA};

static {
Arrays.fill(PADDING, (byte) 0);
Expand All @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) {
* @param numRows number of rows to write
* @return number of bytes written
*/
WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) {
WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) {
HostColumnVector[] columns = null;
try {
columns = IntStream.range(0, table.getNumberOfColumns())
Expand All @@ -199,10 +209,12 @@ WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffs
* Write partition of an array of {@link HostColumnVector} to an output stream.
* See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more
* details.
*
* @return number of bytes written
*/
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) {
return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes();
public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset,
int numRows) {
return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes();
}

/**
Expand All @@ -218,7 +230,8 @@ public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowO
* @param numRows number of rows to write
* @return number of bytes written
*/
public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) {
public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out,
int rowOffset, int numRows) {
ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows);
ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " +
"please call writeRowCountToStream");
Expand Down Expand Up @@ -296,9 +309,11 @@ public Pair<Table, MergeMetrics> mergeToTable(List<KudoTable> kudoTables) throws
}
}

private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception {
private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset,
int numRows) throws Exception {
WriteMetrics metrics = new WriteMetrics();
KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
KudoTableHeaderCalc headerCalc =
new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount);
withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime);
KudoTableHeader header = headerCalc.getHeader();
long currentTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.jni.kudo;

import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment;
import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.withTime;

import ai.rapids.cudf.BufferType;
import ai.rapids.cudf.DType;
Expand Down

0 comments on commit 692623e

Please sign in to comment.