Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary Table instances after contiguous split #1593

Merged
merged 5 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 5 additions & 35 deletions sql-plugin/src/main/format/ShuffleMetadataResponse.fbs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, NVIDIA CORPORATION.
// Copyright (c) 2019-2021, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,46 +16,16 @@ include "ShuffleCommon.fbs";

namespace com.nvidia.spark.rapids.format;

/// Metadata about cuDF Columns
table ColumnMeta {
/// number of nulls in the column or -1 if unknown
null_count: long;

/// number of rows in the column
row_count: long;

/// offset of the column's data buffer
data_offset: long;

/// length of the column's data buffer
data_length: long;

/// offset of the column's validity buffer
validity_offset: long;

/// offset of the column's offsets buffer
offsets_offset: long;

/// child column metadata
children: [ColumnMeta];

/// ordinal of DType enum
dtype_id: int;

/// DType scale for decimal types
dtype_scale: int;
}

/// Metadata about cuDF tables
table TableMeta {
/// metadata about the table buffer
/// metadata about the data encoding
buffer_meta: BufferMeta;

/// metadata for each column in the table buffer
column_metas: [ColumnMeta];

/// number of rows in the table
row_count: long;

/// opaque metadata describing the packed table schema and data layout
packed_meta: [byte];
abellina marked this conversation as resolved.
Show resolved Hide resolved
}

/// Flat buffer for Rapids UCX Shuffle Metadata Response
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -614,9 +614,9 @@ public final int numNulls() {
public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
if (batch.numCols() > 0) {
if (batch.column(0) instanceof GpuCompressedColumnVector) {
GpuCompressedColumnVector gccv = (GpuCompressedColumnVector) batch.column(0);
sum += gccv.getBuffer().getLength();
if (batch.column(0) instanceof WithTableBuffer) {
WithTableBuffer wtb = (WithTableBuffer) batch.column(0);
sum += wtb.getTableBuffer().getLength();
} else {
for (int i = 0; i < batch.numCols(); i++) {
sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
revans2 marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import ai.rapids.cudf.ContiguousTable;
import ai.rapids.cudf.DeviceMemoryBuffer;
import ai.rapids.cudf.Table;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

Expand All @@ -28,6 +29,7 @@
/** GPU column vector carved from a single buffer, like those from cudf's contiguousSplit. */
public final class GpuColumnVectorFromBuffer extends GpuColumnVector {
private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

/**
* Get a ColumnarBatch from a set of columns in a contiguous table. This differs from the
Expand All @@ -43,7 +45,8 @@ public final class GpuColumnVectorFromBuffer extends GpuColumnVector {
public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colTypes) {
DeviceMemoryBuffer buffer = contigTable.getBuffer();
Table table = contigTable.getTable();
return from(table, buffer, colTypes);
TableMeta meta = MetaUtils.buildTableMeta(0, contigTable);
return from(table, buffer, meta, colTypes);
}

/**
Expand All @@ -55,10 +58,12 @@ public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colType
*
* @param table a table with columns at offsets of `buffer`
* @param buffer a device buffer that packs data for columns in `table`
* @param meta metadata describing the table layout and schema
* @param colTypes the types the columns should have.
* @return batch of GpuColumnVectorFromBuffer instances derived from the table and buffer
*/
public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataType[] colTypes) {
public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer,
TableMeta meta, DataType[] colTypes) {
assert table != null : "Table cannot be null";
assert GpuColumnVector.typeConversionAllowed(table, colTypes) :
"Type conversion is not allowed from " + table + " to " + Arrays.toString(colTypes);
Expand All @@ -72,7 +77,7 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp
for (int i = 0; i < numColumns; ++i) {
ColumnVector v = table.getColumn(i);
DataType type = colTypes[i];
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer);
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer, meta);
}
return new ColumnarBatch(columns, (int) rows);
} catch (Exception e) {
Expand All @@ -99,11 +104,13 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp
* @param type the spark data type for this column
* @param cudfColumn a ColumnVector instance
* @param buffer the buffer to hold
* @param meta the metadata describing the buffer layout
*/
public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn,
DeviceMemoryBuffer buffer) {
DeviceMemoryBuffer buffer, TableMeta meta) {
super(type, cudfColumn);
this.buffer = buffer;
this.tableMeta = meta;
}

/**
Expand All @@ -114,4 +121,13 @@ public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn,
public DeviceMemoryBuffer getBuffer() {
return buffer;
}

/**
* Get the metadata describing the data layout in the buffer,
* shared between columns of the original `ContiguousTable`
* @return opaque metadata
*/
public TableMeta getTableMeta() {
return tableMeta;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,151 +16,66 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import static org.apache.spark.sql.types.DataTypes.NullType;

/**
* A GPU column vector that has been compressed. The columnar data within cannot
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* be accessed directly. This class primarily serves the role of tracking the
* compressed data and table metadata so it can be decompressed later.
*/
public final class GpuCompressedColumnVector extends GpuColumnVectorBase {
public final class GpuCompressedColumnVector extends GpuColumnVectorBase
implements WithTableBuffer {
private static final String BAD_ACCESS_MSG = "Column is compressed";

private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

/**
* Build a columnar batch from a compressed table.
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(CompressedTable compressedTable, DataType[] colTypes) {
return from(compressedTable.buffer(), compressedTable.meta(), colTypes);
public static ColumnarBatch from(CompressedTable compressedTable) {
return from(compressedTable.buffer(), compressedTable.meta());
}

public static boolean isBatchCompressed(ColumnarBatch batch) {
if (batch.numCols() == 0) {
if (batch.numCols() != 1) {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
return false;
} else {
return batch.column(0) instanceof GpuCompressedColumnVector;
}
}

/**
* This should only ever be called from an assertion.
*/
private static boolean typeConversionAllowed(ColumnMeta columnMeta, DataType colType) {
DType dt = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale());
if (!dt.isNestedType()) {
return GpuColumnVector.getNonNestedRapidsType(colType).equals(dt);
}
if (colType instanceof MapType) {
MapType mType = (MapType) colType;
// list of struct of key/value
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta structCm = columnMeta.children(0);
if (structCm.dtypeId() != DType.STRUCT.getTypeId().getNativeId()) {
return false;
}
if (structCm.childrenLength() != 2) {
return false;
}
ColumnMeta keyCm = structCm.children(0);
if (!typeConversionAllowed(keyCm, mType.keyType())) {
return false;
}
ColumnMeta valCm = structCm.children(1);
return typeConversionAllowed(valCm, mType.valueType());
} else if (colType instanceof ArrayType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta tmp = columnMeta.children(0);
return typeConversionAllowed(tmp, ((ArrayType) colType).elementType());
} else if (colType instanceof StructType) {
if (!(dt.equals(DType.STRUCT))) {
return false;
}
StructType st = (StructType) colType;
final int numChildren = columnMeta.childrenLength();
if (numChildren != st.size()) {
return false;
}
for (int childIndex = 0; childIndex < numChildren; childIndex++) {
ColumnMeta tmp = columnMeta.children(childIndex);
StructField entry = ((StructType) colType).apply(childIndex);
if (!typeConversionAllowed(tmp, entry.dataType())) {
return false;
}
}
return true;
} else if (colType instanceof BinaryType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta tmp = columnMeta.children(0);
return tmp.dtypeId() == DType.INT8.getTypeId().getNativeId() ||
tmp.dtypeId() == DType.UINT8.getTypeId().getNativeId();
} else {
// Unexpected type
return false;
}
}

/**
* Build a columnar batch from a compressed data buffer and specified table metadata
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer,
TableMeta tableMeta,
DataType[] colTypes) {
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) {
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
int batchRows = (int) rows;
if (rows != batchRows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}

ColumnMeta columnMeta = new ColumnMeta();
int numColumns = tableMeta.columnMetasLength();
assert numColumns == colTypes.length : "Size mismatch on types";
ColumnVector[] columns = new ColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
tableMeta.columnMetas(columnMeta, i);
DataType type = colTypes[i];
assert typeConversionAllowed(columnMeta, type) : "Type conversion is not allowed from " +
columnMeta + " to " + type + " at index " + i;
compressedBuffer.incRefCount();
columns[i] = new GpuCompressedColumnVector(type, compressedBuffer, tableMeta);
}
} catch (Throwable t) {
for (int i = 0; i < numColumns; ++i) {
if (columns[i] != null) {
columns[i].close();
}
}
throw t;
}

return new ColumnarBatch(columns, (int) rows);
ColumnVector column = new GpuCompressedColumnVector(compressedBuffer, tableMeta);
return new ColumnarBatch(new ColumnVector[] { column }, batchRows);
}

private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(type);
private GpuCompressedColumnVector(DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(NullType);
this.buffer = buffer;
this.tableMeta = tableMeta;
// reference the buffer so it remains valid for the duration of this column
this.buffer.incRefCount();
}

public DeviceMemoryBuffer getBuffer() {
@Override
public DeviceMemoryBuffer getTableBuffer() {
return buffer;
}

Expand All @@ -175,11 +90,11 @@ public void close() {

@Override
public boolean hasNull() {
throw new IllegalStateException("column vector is compressed");
throw new IllegalStateException(BAD_ACCESS_MSG);
}

@Override
public int numNulls() {
throw new IllegalStateException("column vector is compressed");
throw new IllegalStateException(BAD_ACCESS_MSG);
}
}
Loading