Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2650][SQL] Try to partially fix SPARK-2650 by adjusting initial buffer size and reducing memory allocation #1769

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024

private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import org.apache.spark.sql.Row
* }}}
*/
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var nulls: ByteBuffer = _
protected var nulls: ByteBuffer = _
protected var nullCount: Int = _
private var pos: Int = _
private var nullCount: Int = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
Expand Down Expand Up @@ -78,4 +78,9 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
buffer.rewind()
buffer
}

protected def buildNonNulls(): ByteBuffer = {
nulls.limit(nulls.position()).rewind()
super.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

this: NativeColumnBuilder[T] with WithCompressionSchemes =>

import CompressionScheme._

var compressionEncoders: Seq[Encoder[T]] = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
Expand Down Expand Up @@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
}
}

abstract override def build() = {
val rawBuffer = super.build()
override def build() = {
val nonNullBuffer = buildNonNulls()
val typeId = nonNullBuffer.getInt()
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}

val headerSize = columnHeaderSize(rawBuffer)
// Header = column type ID + null count + null positions
val headerSize = 4 + 4 + nulls.limit()
val compressedSize = if (encoder.compressedSize == 0) {
rawBuffer.limit - headerSize
nonNullBuffer.remaining()
} else {
encoder.compressedSize
}

// Reserves 4 bytes for compression scheme ID
val compressedBuffer = ByteBuffer
// Reserves 4 bytes for compression scheme ID
.allocate(headerSize + 4 + compressedSize)
.order(ByteOrder.nativeOrder)

copyColumnHeader(rawBuffer, compressedBuffer)
// Write the header
.putInt(typeId)
.putInt(nullCount)
.put(nulls)

logInfo(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
encoder.compress(rawBuffer, compressedBuffer, columnType)
encoder.compress(nonNullBuffer, compressedBuffer, columnType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,6 @@ private[sql] object CompressionScheme {
s"Unrecognized compression scheme type ID: $typeId"))
}

def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
// Writes column type ID
to.putInt(from.getInt())

// Writes null count
val nullCount = from.getInt()
to.putInt(nullCount)

// Writes null positions
var i = 0
while (i < nullCount) {
to.putInt(from.getInt())
i += 1
}
}

def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
val nullCount = header.getInt(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,3 @@ object TestCompressibleColumnBuilder {
builder
}
}