Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Appending host columnar data into ColumnBuilder by batch #4565

Closed
sperlingxx opened this issue Jan 19, 2022 · 7 comments
Closed

[FEA] Appending host columnar data into ColumnBuilder by batch #4565

sperlingxx opened this issue Jan 19, 2022 · 7 comments
Assignees
Labels
improve performance A performance related task/issue wontfix This will not be worked on

Comments

@sperlingxx
Copy link
Collaborator

sperlingxx commented Jan 19, 2022

During optimizing HostColumnVector.ColumnBuilder: rapidsai/cudf#10025 , I realized that it is quite inefficient to append a large amount of data into the off-heap buffer by row. Meanwhile, we perform the append behavior for numerous times in HostColumnarToGpu, because we can not access the Spark ColumnVector in a columnar manner.

I proposed an alternative approach which may accelerate this process: cache the ColumnVector data with a Array buffer, and append the buffer into ColumnBuilder whenever the buffer is full.

For fixed-width types without null, it is relatively easy implement the batch appending:

  private def batchAppending[T: ClassTag](size: Int,
      batchSize: Int,
      @inline accessor: Int => T,
      @inline appender: Array[T] => Unit): Unit = {
    var i = 0
    var j = 0
    val buffer = Array.ofDim[T](batchSize min size)
    while (i < size) {
      buffer(j) = accessor(i)
      i += 1
      if (j == batchSize - 1) {
        appender(buffer)
        j = 0
      } else {
        j += 1
      }
    }
    if (j > 0) {
      appender(buffer.slice(0, j))
    }
  }

For nullable fixed-width types, we need to maintain a bit set as the valid buffer. For now, the function batchAppendingWithNull works under the assumption that it can be called only once. Because we can not guarantee the input sizes are multiplies of 8. So, the subsequent appending may override the previous valid data. Perhaps we can address the problem via recording the last byte of each appending.

  private def batchAppendingWithNull[T: ClassTag](size: Int,
      batchSize: Int,
      @inline accessor: Int => T,
      @inline nullPredicate: Int => Boolean,
      @inline appender: (Array[T], Array[Byte]) => Unit): Unit = {
    require(batchSize % 8 == 0)
    var i = 0
    var j = 0
    val initial = 0xFF.toByte
    val bitMask = Array.tabulate(8)(i => ~(1 << i))
    val nullBuffer = Array.fill[Byte](batchSize >> 3)(initial)
    val buffer = Array.ofDim[T](batchSize)
    while (i < size) {
      if (nullPredicate(i)) {
        nullBuffer(j >> 3) = (nullBuffer(j >> 3) & bitMask(j & 7)).toByte
      } else {
        buffer(j) = accessor(i)
      }
      i += 1
      if (j == batchSize - 1) {
        appender(buffer, nullBuffer)
        j = 0
        nullBuffer.indices.foreach { i =>
          if (nullBuffer(i) != initial) {
            nullBuffer(i) = initial
          }
        }
      } else {
        j += 1
      }
    }
    if (j > 0) {
      appender(buffer.take(j), nullBuffer.take((j >> 3) + 1))
    }
  }

In cuDF side, we need to implement appendArray and appendNullableArray methods:

    public ColumnBuilder appendArray(long[] values) {
      assert type.isBackedByLong();
      growFixedWidthBuffersAndRows(values.length);
      data.setLongs(currentIndex << bitShiftBySize, values, 0, values.length);
      currentIndex += values.length;
      return this;
    }

    public ColumnBuilder appendNullableArray(long[] values, byte[] validMask) {
      assert type.isBackedByLong();
      growFixedWidthBuffersAndRows(values.length);
      data.setLongs(currentIndex << bitShiftBySize, values, 0, values.length);
      growValidBuffer();
      valid.setBytes(currentIndex >> 3, validMask, 0, validMask.length);
      currentIndex += values.length;
      return this;
    }
@sperlingxx sperlingxx added question Further information is requested ? - Needs Triage Need team to review and classify labels Jan 19, 2022
@sperlingxx sperlingxx changed the title [FEA] Read [FEA] Appending host columnar data into ColumnBuilder by batch Jan 19, 2022
@sperlingxx sperlingxx added improve and removed question Further information is requested labels Jan 20, 2022
@sperlingxx sperlingxx self-assigned this Jan 20, 2022
@sperlingxx
Copy link
Collaborator Author

Hi @revans2 @jlowe, I would like to hear your opinions on this idea. Thank you!

@sperlingxx sperlingxx added the performance A performance related task/issue label Jan 20, 2022
@jlowe
Copy link
Member

jlowe commented Jan 20, 2022

cache the ColumnVector data with a Array buffer, and append the buffer into ColumnBuilder whenever the buffer is full.

If this really is faster (do you have empirical evidence?), why not put this logic in ColumnBuilder directly? ColumnBuilder could buffer rows as they are added, and when we get to some determined size (ideally a multiple of 8 rows to simplify validity handling), it adds those rows to the off-heap buffer in bulk. When the caller finally calls .build() then it can push the last remaining data in the buffer. With this approach we don't have to track down everyone using ColumnBuilder to speed them up. It also nicely avoids the problem of the caller attempting to add a batch that isn't a multiple of 8, since ColumnBuilder controls when batches are pushed off-heap.

Then the discussion pivots to memory usage which is increased with this approach. It would be good to plot a curve for performance vs. array size to see if there's a "sweet spot," ideally with a relatively low buffer size.

If we're willing to throw more memory at it, we could also avoid off-heap reallocations by tracking multiple array buffers and then when build() is called we know how big the off-heap buffer needs to be. Downside is we are doubling the memory cost of building the column. Maybe there's a hybrid approach where we buffer data up to some limit, hoping the caller calls build() before we are forced to push the memory off-heap.

Anyway, the first step is to get some hard numbers as to how this idea performs across a variety of types and buffer sizes, so we can make informed decisions. First step is to prototype the easy case of fixed-width and see how it performs.

@revans2
Copy link
Collaborator

revans2 commented Jan 20, 2022

In Spark CPU the producer of the ColumnarBatch owns it and any memory backing it. There is no API to tell the producer that the data was not fully processed. The Parquet reader and others actually explicitly reuse the ColumnVector memory for each batch that they produce because it improves performance. So if we want to hold onto that data and call next again to get more data we would have to make a copy of the original input data. But we are already doing this when we write it into the off heap memory.

@sperlingxx
Copy link
Collaborator Author

sperlingxx commented Jan 24, 2022

Hi @jlowe, I ran some benchmarks locally with the batch appending codes shown above. The result was frustrating: row-wise appending beated batch-wise appending of any batch size (16, 64, 256, 1024, 4096, 8192) on performance. The result incidates that batch size itself doesn't affect the performance. All batch appending attempts were significantly slower than the improved row appending:

  private def rowAppending[T: ClassTag](size: Int,
      @inline accessor: Int => T,
      @inline appender: T => Unit): Unit = {
    var i = 0
    while (i < size) {
      val data = accessor(i)
      appender(data)
      i += 1
    }
  }
  private def batchAppending[T: ClassTag](size: Int,
      batchSize: Int,
      @inline accessor: Int => T,
      @inline appender: Array[T] => Unit): Unit = {
    var i = 0
    var j = 0
    val buffer = Array.ofDim[T](batchSize min size)
    while (i < size) {
      buffer(j) = accessor(i)
      i += 1
      if (j == batchSize - 1) {
        appender(buffer)
        j = 0
      } else {
        j += 1
      }
    }
    if (j > 0) {
      appender(buffer.slice(0, j))
    }
  }

I tested with non-nullable fixed-length columns. Each of them contains 1 billion rows.

@revans2
Copy link
Collaborator

revans2 commented Jan 24, 2022

The performance is going to be impacted by the number of columns much more than it is by the length of the columns. You also have to think about memory access patterns to guess at the performance, assuming that main memory is going to be the bottleneck.

When reading a single fixed width column the CPU will read an entire cache line of data around the part we are accessing. Because we are reading the data sequentially it means we can get all of the other reads on the same cache line essentially for free. Some CPUs can even see the read pattern and read data ahead. So depending on the CPU/etc we should be able to read in the data at close to memory speeds and write it back out at close to memory speeds.

When copying to an array first, even if the array fits in the cache, it is on the heap. Because of that we should be able to read the data at main memory speeds, like with the first one, but we are going to write it at least twice. Once to the array that will write through the cache to main memory, and then again to copy it to the final location. If the array is large enough to not fit in the cache, then we are now reading and writing the data twice, so I would expect it to be twice as slow.

When we start to get into multiple columns, they will fight with each other over space in the cache. The more columns you have the more likely it is that one column will read data into the cache, but then it will be evicted by another column before the other parts of the same cache line can be used. This will effectively slow down the performance until each entry is read from main memory and written to main memory separately.

This is why in the copy code we try to copy a single column at a time. Once we hit arrays the APIs don't let us get at the underlying columnar array data. If they did we could copy it columnar to columnar at hopefully close to main memory speeds.

@sperlingxx
Copy link
Collaborator Author

@revans2 Thank you for such a detailed elaboration.

@sameerz
Copy link
Collaborator

sameerz commented Jan 25, 2022

@sperlingxx closing unless you have other thoughts.

@sameerz sameerz closed this as completed Jan 25, 2022
@sameerz sameerz added wontfix This will not be worked on and removed ? - Needs Triage Need team to review and classify labels Jan 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improve performance A performance related task/issue wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

4 participants