Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-4241][VL] Add plan node to convert Vanilla spark columnar format data to Velox columnar format data #4818

Closed
wants to merge 5 commits into from

Conversation

boneanxs
Copy link
Contributor

@boneanxs boneanxs commented Feb 29, 2024

What changes were proposed in this pull request?

Add plan node to convert Vanilla spark columnar format data to Velox columnar format data

This pr tries to convert from Spark columnar batch -> arrow Array -> velox columnar batch, all primitive types(decimal types requires: facebookincubator/velox#8957) and Map/Array types are supported.

Users can enable it by setting spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar, which is by default false.

(Fixes: #4241)

How was this patch tested?

Added tests

@boneanxs boneanxs marked this pull request as draft February 29, 2024 12:43
Copy link

#4241

Copy link

Run Gluten Clickhouse CI

@boneanxs boneanxs marked this pull request as ready for review March 4, 2024 12:42
Copy link

github-actions bot commented Mar 4, 2024

Run Gluten Clickhouse CI


ArrowSchema cSchema;
ArrowArray arrowArray;
exportToArrow(vector, cSchema, ArrowUtils::getBridgeOptions());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding well, there might be a bug that velox doesn't handle short decimal well when converting arrowArray-> rowVector

In exportToArrow, we can see it needs to convert to int128_t since it's arrow short decimal.

rows.apply([&](vector_size_t i) {
      int128_t value = buf.as<int64_t>()[i];
      memcpy(dst + (j++) * sizeof(int128_t), &value, sizeof(int128_t));
    });

But in importFromArrow, velox directly handle it as int64_t, whereas this test won't pass.

values = wrapInBufferView(
        arrowArray.buffers[1], arrowArray.length * type->cppSizeInBytes());

cc @PHILO-HE @zhouyuan pls correct me if I'm wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 looks a bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, let me try to fix velox firstly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

github-actions bot commented Mar 4, 2024

Run Gluten Clickhouse CI

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I didn't take look in the PR in detail but overall the direction looks promising.

Also, would you like to add a little bit more explanation in PR description ? Like it supports all datatypes or just some of them, it will be turned on or off by default, what's the config key to turn it on/off, etc. That will help further users getting started with the feature quickly.

Comment on lines 42 to 52
new ValidatorApiImpl().doSchemaValidate(schema).foreach {
reason =>
throw new UnsupportedOperationException(
s"Input schema contains unsupported type when performing columnar" +
s" to columnar for $schema " + s"due to $reason")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks a little bit weird to do validation during execution. Would you like to share the reason doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I follow RowToVeloxColumnarExec to do the validation here, you're right, it looks weird to do so during execution, I also have the same doubt when seeing RowToVeloxColumnarExec...

Let me fix it by overriding doValidateInternal instead.

Comment on lines 552 to 555
case p: RowToColumnarExecBase if p.child.isInstanceOf[ColumnarToRowExec] =>
val replacedChild = replaceWithColumnarToColumnar(
p.child.asInstanceOf[ColumnarToRowExec].child)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToColumnarExec(replacedChild)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case condition says it matches vanilla-columnar->row->gluten-columnar but it calls a general interface to do columnar->columnar. Do we need to specialize the interface name to make sure we exactly request for vanilla-columnar->gluten-columnar transition here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case condition here tries to match rowTocolumnar -> ColumnarToRow, for e.g.

Filter
  RowToVeloxColumnar
    ColumnarToRow
       FileScan xx

We tries to match RowToVeloxColumnar->ColumnarToRow here with ColumnarToColumnar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

If it's for ColumnarToRow + RowToVeloxColumnar,

Would it be better to rename the method genColumnarToColumnarExec with genVanillaColumnarToGlutenColumnarExec or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's clearer that we are not doing Gluten -> Vanilla c2c transition here.

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 5, 2024

Do you convert to Velox format directly? or convert to Arrow then to Velox?

@boneanxs
Copy link
Contributor Author

boneanxs commented Mar 5, 2024

Do you convert to Velox format directly? or convert to Arrow then to Velox?

Convert to Arrow firstly, then to velox

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 5, 2024

Do you convert to Velox format directly? or convert to Arrow then to Velox?

Convert to Arrow firstly, then to velox

make sense. We may upstream the parquet columnar format => arrow format to Spark

Copy link

github-actions bot commented Mar 6, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented Mar 7, 2024

Run Gluten Clickhouse CI

@boneanxs boneanxs requested a review from zhztheplayer March 7, 2024 03:07
Comment on lines 123 to 128
if (arrowArray != null) {
arrowArray.release()
arrowArray.close()
arrowConverter.reset()
arrowArray = null
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be that the file includes a bunch of manual life cycle management calls like _.close _.release _.reset, etc. Would you like to review them to see if we can eliminate some? I recall that recycleIterator / recyclePayloads can do some of the clean up jobs automatically.

It's a non-blocking comment so feel free to do that in another PR. But anyway we'd try best to make code here easier to maintain from further developers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reset arrowArray and arrowConverter when iterate each item, so recyclePayload is reasonable, but it only passes the result of the next(), which is ColumnarBatch, we cannot pass arrowArray to close them.

So here instead I add an extra method releaseArrowBuf to release them to reduce duplicates.

zhztheplayer
zhztheplayer previously approved these changes Mar 7, 2024
Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it's good to me. Just a non-blocking comment. Thanks!

Copy link

github-actions bot commented Mar 7, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented Mar 7, 2024

Run Gluten Clickhouse CI

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 7, 2024

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

@zhztheplayer
Copy link
Member

zhztheplayer commented Mar 7, 2024

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

@boneanxs If you'd like to address the questions also, thanks.

I believe the patch reused our old ArrowWritableColumnarVector code to write Spark columnar data to native so there should be a bunch of "onheap => offheap" copies. And we should count on how much of copies the implementation exactly does ideally. @boneanxs You can also check on this part.

What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time so we should have more tests here especially for complex data types.

Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs . Overall the PR's writing looks fine to me and we had removed most of the unsafe APIs but still there might be some. Let's check this part carefully too.

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 8, 2024

Let's document the conversion clearly here. I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion (not considered about arrow=>velox conversion). But we will needs some work to avoid it.

If parquet scan is onheap, we can't avoid the onheap->offheap copy, but then we should reuse the offheap buffer.

If we can't make sure how the conversion happens, let's use memcpy as long we need, make sure it's 100% no memleak for now.

@boneanxs
Copy link
Contributor Author

boneanxs commented Mar 8, 2024

@zhztheplayer can you check how the memory is allocated during the conversion? Where the arrow memory is allocated? how many memcpy during the conversion? Is there onheap=>offheap copy?

Hey, @FelixYBW for each columnar batch, before the conversion, this pr tries to allocate offheap memory to perform spark columnar batch -> arrow array(here this pr doesn't treat onheap/offheap spark columnar batch, it uses ArrowFieldWriter(which is implemented by spark) to do this transformation, can see ArrowColumnarBatchConverter#write, which is this pr newly added).

So yes, there will be one memcpy from spark columnar batch -> arrow array, no matter spark columnar batch is on heap or off heap.

In native side, I simply uses importFromArrow to convert arrow array to velox columnar batch, there will be some memcpy either for String, timestamp, shortDecimal, etc(I'm not fully checked).

What I was worried about is ArrowWritableColumnarVector have not actually been under active maintenance for a period of time so we should have more tests here especially for complex data types.

Sry @zhztheplayer, I might miss something, are you saying ArrowFieldWriter? This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.

Also would be great if you could share thoughts about the risk of memory leaks this approach may bring @boneanxs .

The extra memory here acquired are arrowArray, cSchema, velox columnar batch, and they're all well handled by TaskResources and recyclePayload, arrowArray and velox columnar batch will be released during each iterator, and cSchema will be released until the iterator ends, TaskResources is the extra ensureance that all allocated memory will be released if the iterator interrupted abnormally.

I have a impression that parquet-mr can take use of offheap memory for columnar data. If so the best case is that we can avoid any memcpy during the conversion

I'm not sure whether parquet-mr can directly use offheap memory, but spark can export parquet data as offheap columnar batch by enabling spark.sql.columnVector.offheap.enabled, but directly reuse offheap columnar batch to arrow array is not supported in this pr(by the way, can we do so? Not sure abt this)

I can also try to do benchmarks comparing with VanillarColumnarBatchToRow->RowToVeloxColumnar if necessary, I think at least for memcpy, RowToVeloxColumnar also does an extra copy even if the row might from an offheap ColumnarBatch

@zhztheplayer
Copy link
Member

This pr uses ArrowFieldWriter to do the conversion, it's wildly used by pyspark, so reusing it here should be safe.

Ahh then it's fine enough. Some of the code is so similar to Gluten's ArrowWritableColumnarVector which had pasted some code from vanilla Spark so I was led by wrong intuition. Sorry for the mistake.

@FelixYBW
Copy link
Contributor

FelixYBW commented Mar 8, 2024

does an extra copy even if the row might from an off

Thank you for explanation. You may try to enable spark.sql.columnVector.offheap.enabled.

onheap to offheap memcpy is more expensive than offheap to offheap. Do you know how ArrowFieldWriter allocate memory? is it from unsafe API or direct memory?

@zhztheplayer Do you know?

@zhztheplayer
Copy link
Member

zhztheplayer commented Apr 10, 2024

I think it's OK to have it disabled by default.

@boneanxs Can you add a CI case for the feature to run TPC-H / TPC-DS tests?

Example:

run-tpc-test-ubuntu-randomkill:

You can use gluten-it arg --extra-conf=... to enable C2C during testing.

Also if -s=30.0 (SF 30) is too large, we can make it -s=1.0.

And sorry for late response. Let's have this merged asap.

@FelixYBW
Copy link
Contributor

Oh, just noted the PR is still open and have many conflict. @boneanxs would you like to continue?

@boneanxs
Copy link
Contributor Author

Hey @FelixYBW @zhztheplayer, yea, I'm willing to continue this pr. After last comment, I actually have run some benchmarks in my local environment, and found there's no obviously improvement comparing VanillaColumnar->row->VeloxColumnar(Also I notice there's a compatible issue since this pr relies on spark arrow version, which is conflict with gluten). I'm still checking why this(performance not improved) could happen(but didn't catch time recently), will update here once I'm done other things.

@zhouyuan
Copy link
Contributor

@boneanxs hi, could you please help to do rebase? There's a big commit(renaming io.glutneproject to org.apache.gluten) when we migrate to apache repo.

thanks
-yuan

Copy link

github-actions bot commented Jul 4, 2024

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the stale stale label Jul 4, 2024
@zhztheplayer
Copy link
Member

comment to keep the PR open as it could be a valuable topic

@github-actions github-actions bot removed the stale stale label Jul 26, 2024
Copy link

github-actions bot commented Sep 9, 2024

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the stale stale label Sep 9, 2024
Copy link

This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it is still valid. Thanks.

@github-actions github-actions bot closed this Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[VL] Add plan node to convert Vanilla spark columnar format data to Velox columnar format data
4 participants