-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-17688: [C++][Java][FlightRPC] Substrait, transaction, cancellation for Flight SQL #13492
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format?
or
See also: |
format/FlightSql.proto
Outdated
FLIGHT_SQL_TRANSACTION_SUPPORT_UNKNOWN = 0; | ||
// No supprot | ||
FLIGHT_SQL_TRANSACTION_SUPPORT_NONE = 1; | ||
// Transactions, but not savepoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference between transcations and save points? Are there docs someplace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format/FlightSql.proto
Outdated
option (experimental) = true; | ||
|
||
// The serialized substrait.Plan to create a prepared statement for. | ||
bytes plan = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't been keeping up but is serialized plan sufficient, or are yaml plugins necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root plan contains extension definitions, and I believe they're intended to be self-contained as a result, but I'll seek some clarification: https://github.com/substrait-io/substrait/blob/1080f06298d8e50abcd6acfaa6c425326a7e0579/proto/substrait/plan.proto#L24-L45
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're expecting the Protobuf serialized plan here, would it make more sense to just import the substrait proto definition and reference the object directly rather than having to serialize the plan and then stick the bytes inside another serialized protobuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep code dependencies on other Protobuf messages out because Windows/Protobuf has issues with those when they're in different DLLs (as they are with Flight/Flight SQL, and as they would be here). As seen below with the CancelQuery message I already ran into linking issues and I think they're insurmountable unless protoc itself is modified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the wire, the actual encoding is the same either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lidavidm Fair enough, that makes sense.
format/FlightSql.proto
Outdated
* The result of a "BeginTransaction" action. | ||
* | ||
* The transaction/savepoint can be manipulated with the "EndTransaction" | ||
* action, or automatically via server timeout. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should server timeout be something that the client has the option of specifying? Can it be introspected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to follow prepared statements in this regard. I'll add a SqlInfo value to retrieve the timeout.
format/FlightSql.proto
Outdated
*/ | ||
message ActionBeginTransactionRequest { | ||
// Create a savepoint within the identified transaction. Only supported if | ||
// FLIGHT_SQL_TRANSACTION is FLIGHT_SQL_TRANSACTION_SUPPORT_SAVEPOINT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this misplaced should it be above name?
message ActionBeginTransactionRequest { | ||
// Create a savepoint within the identified transaction. Only supported if | ||
// FLIGHT_SQL_TRANSACTION is FLIGHT_SQL_TRANSACTION_SUPPORT_SAVEPOINT. | ||
bytes transaction_id = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is transaction_id optional here for a new transaction? it seems like in most cases this should be server assigned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully it's clearer now that it's split. The ID is always server assigned. To begin a savepoint, you must provide the ID of the transaction it falls under.
* Request message for the "EndTransaction" action. | ||
* Commit or rollback a transaction, or release/rollback a savepoint within a transaction. | ||
*/ | ||
message ActionEndTransactionRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the semantics relative to savepoint here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clarified the docstring, though I'm not sure what you're referring to here.
// Roll back the transaction or to a savepoint. | ||
END_TRANSACTION_ROLLBACK = 2; | ||
} | ||
// Opaque handle for the transaction on the server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transaction + savepoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated docstring.
format/FlightSql.proto
Outdated
@@ -1475,6 +1549,35 @@ message CommandStatementQuery { | |||
|
|||
// The SQL syntax. | |||
string query = 1; | |||
// Include the query as part of this transaction (by default queries are auto-committed). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from an RPC optimization standpoint, it seems that maybe we want a way of specifying this should be considered the first action inside of a transaction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like maybe there should be something like:
message TransactionDetails {
oneof transaction {
ActionBeginTranscation begin_transaction = 1;
bytes existing_transaction_id = 2;
}
}
that can be included.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to mimic prepared statements: the server assigns the transaction ID and gives it to the client. So here there's not a great way to return the transaction ID to the client. It would be good if FlightInfo could gain an app_metadata field for such things (since Tickets are not meant to be client-introspected).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could use schema metadata for that, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems app_metadata would probably be generally useful. It seems like a hack to force the info onto the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll propose that separately when I get a chance (and I'll see if I can set aside some time to help with the small-result optimization stuff)
format/FlightSql.proto
Outdated
* For distributed queries (ones that use GetFlightInfo->DoGet), this lets a | ||
* single client explicitly cancel work across all clients, given server support. | ||
*/ | ||
message ActionCancelQuery { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does there need to be a response associated with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would it contain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could state. CANCELLED, NOT_CANCELLABLE, ALREADY_DONE, CANCELLING. Not sure this is useful, so feel free to ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we declare the command idempotent, that could be useful. I suppose the server could just synchronously block until the query is cancelled but then it'd be unclear how to recover from a transient failure.
format/FlightSql.proto
Outdated
*/ | ||
message ActionCancelQuery { | ||
// The result of the GetFlightInfo RPC that initated the query. | ||
FlightInfo info = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be some sort of ID instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would it be? Since most of the data inside FlightInfo isn't meant to be introspected, and we haven't specified what the contents of Ticket should be, there's no consistent 'query ID' concept right now. So I chose FlightInfo since presumably that has all information the server needs to identify the query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this be another place where app_metadata would be useful? Otherwise, it seems like the only implementation path for the server would be to introspect one of the tickets? (it would seem lfight descriptors might very commonly be non-unique). This might be fine, I just want to confirm my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - if we had FlightInfo.app_metadata, we could return an explicit cancellation token. But given the server is generating the tickets, it should be OK for the server to also introspect them, so long as the client doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think introspecting the tickets for now should be fine.
format/FlightSql.proto
Outdated
* Explicitly cancel a running query. | ||
* | ||
* For distributed queries (ones that use GetFlightInfo->DoGet), this lets a | ||
* single client explicitly cancel work across all clients, given server support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there semantics that need to be considered relative to transcations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update it to specify that the transaction is not rolled back/this is only to terminate reading of the result set (CC @jduo is that in line with what you were thinking?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was thinking in terms of read, but also think it should terminate a running write (same as in ODBC/JDBC).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - updated. The semantics around cancelling a write are a little unclear, I suppose in autocommit mode it gets committed, and otherwise the client can commit or rollback.
120d649
to
8326857
Compare
Generally seems OK to me. |
format/FlightSql.proto
Outdated
@@ -89,6 +90,31 @@ enum SqlInfo { | |||
*/ | |||
FLIGHT_SQL_SERVER_READ_ONLY = 3; | |||
|
|||
/* | |||
* Retrieves a boolean value indicating whether the Flight SQL Server supports executing Substrait plans. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So "Flight SQL" is generic enough that it might support things other than actual SQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the naming is unfortunate, but there's no real reason why we need to be tied to SQL specifically, and no reason why many of the existing concepts can't map to Substrait.
format/FlightSql.proto
Outdated
FLIGHT_SQL_TRANSACTION = 5; | ||
|
||
/* | ||
* Retrieves an int32 indicating the timeout for prepared statement handles. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Er... which unit is that? Seconds? Can we make it a real/float instead, if protobuf allows that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it milliseconds, unless we prefer floating point seconds?
FLIGHT_SQL_TRANSACTION_SUPPORT_NONE = 1; | ||
// Transactions, but not savepoints. | ||
// A savepoint is a mark within a transaction that can be individually | ||
// rolled back to. Not all databases support savepoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, savepoints are for two-phase commits, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not in the distributed systems sense, a savepoint is really just a nested transaction. I included it for parity with JDBC but it's not essential.
format/FlightSql.proto
Outdated
message ActionBeginTransactionRequest { | ||
// The transaction to which a savepoint belongs, if applicable. | ||
// | ||
// To begin a transaction, leave this field empty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... should there be two separate commands for starting a transaction and a savepoint, so that they can take different parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split into separate commands (though the response messasge and EndTransaction are still shared)
format/FlightSql.proto
Outdated
|
||
// A serialized substrait.Plan | ||
bytes plan = 1; | ||
// Include the query as part of this transaction (by default queries are auto-committed). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the "default" happen if transaction_id
is left unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, perhaps replace "by default" with "if unset, " for clarity?
format/FlightSql.proto
Outdated
/* | ||
* Retrieves an int32 indicating the timeout (in milliseconds) for prepared statement handles. | ||
* | ||
* If 0, there is no timeout. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we specify that the timeout is refreshed when the handle is "used"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, we should probably provide any semantics necessary for when a timeout is refreshed.
format/FlightSql.proto
Outdated
@@ -1475,6 +1577,35 @@ message CommandStatementQuery { | |||
|
|||
// The SQL syntax. | |||
string query = 1; | |||
// Include the query as part of this transaction (if unset, the query is auto-committed). | |||
bytes transaction_id = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These transaction_id
fields should be explicitly marked optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically, by definition in proto3 all fields are optional and it's unnecessary to explicitly mark any as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flight SQL uses explicit optional elsewhere though (just to make it clear what is expected), so Flight SQL actually has a higher protoc minimum version as a result
message ActionBeginTransactionRequest { | ||
// Create a savepoint within the identified transaction. Only supported if | ||
// FLIGHT_SQL_TRANSACTION is FLIGHT_SQL_TRANSACTION_SUPPORT_SAVEPOINT. | ||
bytes transaction_id = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully it's clearer now that it's split. The ID is always server assigned. To begin a savepoint, you must provide the ID of the transaction it falls under.
fcaadae
to
eae9ee1
Compare
I've been pushing various tweaks to the spec while implementing it in C++. One thing I will need to change: transaction IDs should be supplied when creating a prepared statement, not when executing them (since generally existing APIs associate the statement with a particular connection). |
There's now implementations in C++ and Java. TODOs:
|
bb77bbd
to
78453af
Compare
Hmm, Windows builds fail because of a similar issue to #13434 - Protobuf and DLLs don't interact well, since you can't get The easiest thing might be to just punt on CancelQuery for now. Or else, it would have to be message ActionCancelQueryRequest {
// XXX(ARROW-16902): A serialized FlightInfo
bytes info = 1;
} and then rely on |
@lidavidm Since this is a draft, are you looking for a detailed review or more for general opinions? |
The Protobuf definitions deserve more scrutiny; for the code, I'm just looking for general opinions. |
83241ca
to
6431071
Compare
@lidavidm I don't have time for a second look, unless you really need it :-) |
No worries, just want to make sure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments for clarification
format/FlightSql.proto
Outdated
/* | ||
* Retrieves an int32 indicating the timeout (in milliseconds) for prepared statement handles. | ||
* | ||
* If 0, there is no timeout. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, we should probably provide any semantics necessary for when a timeout is refreshed.
format/FlightSql.proto
Outdated
// Unknown/not indicated | ||
SQL_SUPPORTED_TRANSACTION_UNKNOWN = 0; | ||
// No support | ||
SQL_SUPPORTED_TRANSACTION_NONE = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a functional difference here? Or are we just including an "Unknown" so that it will be the default? In most cases developers are likely to treat "unknown" the same as "none" when it comes to transaction support: (ie. don't try calling them)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really - I guess in that case let's just fold them together since there's no point. (I suppose normally in protobuf you'd distinguish the two, but that doesn't apply here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I agree that in protobuf you'd distinguish the two in many cases, but my typical litmus test is if a consumer of the protobuf would treat the "unknown" case differently than the "none" case. And if there isn't any functional difference, it's not necessary to distinguish them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup - I ended up folding them together
format/FlightSql.proto
Outdated
option (experimental) = true; | ||
|
||
// The serialized substrait.Plan to create a prepared statement for. | ||
bytes plan = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're expecting the Protobuf serialized plan here, would it make more sense to just import the substrait proto definition and reference the object directly rather than having to serialize the plan and then stick the bytes inside another serialized protobuf?
// XXX(ARROW-16902): this must be a serialized FlightInfo, but is | ||
// rendered as bytes because Protobuf does not really support one | ||
// DLL using Protobuf definitions from another DLL. | ||
bytes info = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a serialized flightinfo? or should it actually be a Ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a FlightInfo because I assume the server needs the information of all endpoints in order to fully cancel a query. Also in the event that we do update FlightInfo with an application metadata field, it would automatically get passed back to the server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, that makes sense then.
format/FlightSql.proto
Outdated
CANCEL_RESULT_CANCELLED = 1; | ||
CANCEL_RESULT_CANCELLING = 2; | ||
CANCEL_RESULT_NOT_CANCELLABLE = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the server returns CANCEL_RESULT_CANCELLING
is a client supposed to poll with subsequent cancel requests until it receives CANCELLED
? Or is there a different way to determine when the cancellation is completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was the intent. I'll document the variants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated - is this clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup much better! thanks!
265f37a
to
0ac1408
Compare
Updated again, since @jvanstraten pointed out that the server may want to know the client's Substrait release version since otherwise it may be unclear how to interpret the plan (even if it parses properly). Also, adds some validation for SqlInfo values to the integration test + adds SqlInfo values so the server can report Substrait version support. |
@@ -61,7 +61,8 @@ add_arrow_lib(arrow_flight_sql | |||
STATIC_LINK_LIBS | |||
arrow_flight_static | |||
PRIVATE_INCLUDES | |||
"${Protobuf_INCLUDE_DIRS}") | |||
"${Protobuf_INCLUDE_DIRS}" | |||
"${CMAKE_CURRENT_BINARY_DIR}/../") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... what is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think I needed it originally, but maybe now that we aren't referencing Protobuf files from each other it's not needed anymore - removed
"local_files": { | ||
"items": [ | ||
{ | ||
"uri_file": "file://FILENAME_PLACEHOLDER", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should perhaps use a URI placeholder and produce the proper URI programmatically?
"uri_file": "file://FILENAME_PLACEHOLDER", | |
"uri_file": "URI_PLACEHOLDER", |
ASSERT_OK(server_->Init(options)); | ||
|
||
std::stringstream ss; | ||
ss << "grpc://localhost:" << server_->port(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps expose a server_->connect_uri()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying not to add new methods (MakeAceroServer just returns the base FlightSqlServerBase) and a server doesn't necessarily know what its 'public' address is (even here, we bind to 0.0.0.0 but assume that it's accessible on 'localhost')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and a server doesn't necessarily know what its 'public' address is (even here, we bind to 0.0.0.0 but assume that it's accessible on 'localhost')
Hmm... if you mean the server might reside behind a NAT, sure, but at least from a local point of view it should know on which addresses (plural, ideally :-)) it is reachable (perhaps only localhost, or perhaps one interface, or perhaps all/many interfaces...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use server_->location()
and bind to localhost
instead of 0.0.0.0
(should be OK/preferable in test code anyways)
TEST_F(TestAcero, GetSqlInfo) { | ||
ASSERT_OK_AND_ASSIGN( | ||
auto flight_info, | ||
client_->GetSqlInfo({}, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you perhaps make parameters more explicit?
client_->GetSqlInfo({}, { | |
client_->GetSqlInfo(/*abc=*/{}, /*def=*/{ |
SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_TRANSACTION, | ||
})); | ||
ASSERT_OK_AND_ASSIGN(auto reader, | ||
client_->DoGet({}, flight_info->endpoints()[0].ticket)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here (at least for the first one :-)).
@@ -1416,14 +1477,47 @@ message ActionCreatePreparedStatementRequest { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to remove the "experimental" markers like the above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not in this RFC, but we should do it in the near future (perhaps after the JDBC driver has seen some use)
* Request message for the "CreatePreparedSubstraitPlan" action on a Flight SQL enabled backend. | ||
*/ | ||
message ActionCreatePreparedSubstraitPlanRequest { | ||
option (experimental) = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular rationale for adding this option in some message definitions but not all of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I just missed it :) I'll fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments, I'll update this soon. (I guess these are mostly minor changes and we shouldn't need to restart the vote?)
|
||
/* | ||
* Retrieves a string value indicating the maximum supported Substrait version, or null | ||
* if Substrait is not supported. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, the backwards compatibility story is not yet worked out, so in the future, this may be less useful, but for now, it's the only way to reliably determine whether a plan can really be executed.
FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT = 100; | ||
|
||
/* | ||
* Retrieves an int32 indicating the timeout (in milliseconds) for transactions, since transactions are not tied to a connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe @jduo can chime in; this timeout existed implicitly before, but something here is necessary because unlike JDBC/ODBC which can tie these to the lifetime of an actual connection, Flight SQL makes fewer assumptions about state being tied to the gRPC connection (which are more disposable).
@@ -1416,14 +1477,47 @@ message ActionCreatePreparedStatementRequest { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not in this RFC, but we should do it in the near future (perhaps after the JDBC driver has seen some use)
* Request message for the "CreatePreparedSubstraitPlan" action on a Flight SQL enabled backend. | ||
*/ | ||
message ActionCreatePreparedSubstraitPlanRequest { | ||
option (experimental) = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I just missed it :) I'll fix that.
format/FlightSql.proto
Outdated
* Retrieves a boolean value indicating whether the Flight SQL Server supports executing | ||
* Substrait plans. | ||
*/ | ||
FLIGHT_SQL_SERVER_SUBSTRAIT = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think that makes sense, though current clients are going to assume SQL support.
FLIGHT_SQL_SERVER_CANCEL = 8; | ||
|
||
/* | ||
* Retrieves an int32 indicating the timeout (in milliseconds) for prepared statement handles. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update this.
Yes, definitely. Also, sorry, the review contains C++ comments that I did some days/weeks ago but had forgotten to submit apparently :-S |
58f748e
to
5823b75
Compare
Rebased + updated (minus the timeout since we'd have to change the result set schema for GetSqlInfo to add floating point) |
a8ff5fb
to
1a8af54
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at the C++ parts again.
if (!chunk.data) break; | ||
|
||
const UInt32Array& info_name = | ||
static_cast<const UInt32Array&>(*chunk.data->column(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use checked_cast
here and below?
namespace acero_example { | ||
|
||
namespace { | ||
class GetSchemaSinkNodeConsumer : public compute::SinkNodeConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to add docstrings/comments explaining each non-trivial helper class here.
ARROW_ASSIGN_OR_RAISE(auto ticket, CreateStatementQueryTicket(command.plan.plan)); | ||
std::vector<FlightEndpoint> endpoints{ | ||
FlightEndpoint{Ticket{std::move(ticket)}, /*locations=*/{}}}; | ||
ARROW_ASSIGN_OR_RAISE( | ||
auto info, FlightInfo::Make(*output_schema, descriptor, std::move(endpoints), | ||
/*total_records=*/-1, /*total_bytes=*/-1)); | ||
return std::unique_ptr<FlightInfo>(new FlightInfo(std::move(info))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but the logic to create the FlightInfo
from an encoded substrait plan could perhaps be factored out in a dedicated helper method? (since GetFlightInfoPreparedStatement
has the same logic inside)
sqlite3* db_; | ||
std::map<std::string, std::shared_ptr<SqliteStatement>> prepared_statements_; | ||
std::string db_uri_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit
std::string db_uri_; | |
const std::string db_uri_; |
std::map<std::string, std::shared_ptr<SqliteStatement>> prepared_statements_; | ||
std::string db_uri_; | ||
std::mutex mutex_; | ||
std::unordered_map<std::string, std::shared_ptr<SqliteStatement>> prepared_statements_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that accesses to prepared_statements_
are never mutex-protected, is it right?
|
||
ARROW_RETURN_NOT_OK(ExecuteSql(new_db, "BEGIN TRANSACTION")); | ||
|
||
open_transactions_[handle] = new_db; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be mutex-protected?
Status status; | ||
if (request.action == ActionEndTransactionRequest::kCommit) { | ||
status = ExecuteSql(it->second, "COMMIT"); | ||
} else { | ||
status = ExecuteSql(it->second, "ROLLBACK"); | ||
} | ||
sqlite3_close(it->second); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how efficient you want this to be, but you might release the lock around these lines (and call open_transactions_.erase
before?).
Updated, thanks Antoine! |
274a3d8
to
6c48fdd
Compare
CI failures here are addressed/fixed elsewhere |
Benchmark runs are scheduled for baseline = d571e93 and contender = 3ce4014. 3ce4014 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
['Python', 'R'] benchmarks have high level of regressions. |
…ion for Flight SQL (apache#13492) "[VOTE] Substrait for Flight SQL" https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl Authored-by: David Li <[email protected]> Signed-off-by: David Li <[email protected]>
…ion for Flight SQL (apache#13492) "[VOTE] Substrait for Flight SQL" https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl Authored-by: David Li <[email protected]> Signed-off-by: David Li <[email protected]>
This PR bumps Apache Arrow version from 9.0.0 to 10.0.0. Main changes related to PyAmber: ## Java/Scala side: - JDBC Driver for Arrow Flight SQL ([13800](apache/arrow#13800)) - Initial implementation of immutable Table API ([14316](apache/arrow#14316)) - Substrait, transaction, cancellation for Flight SQL ([13492](apache/arrow#13492)) - Read Arrow IPC, CSV, and ORC files by NativeDatasetFactory ([13811](apache/arrow#13811), [13973](apache/arrow#13973), [14182](apache/arrow#14182)) - Add utility to bind Arrow data to JDBC parameters ([13589](apache/arrow#13589)) ## Python side: - The batch_readahead and fragment_readahead arguments for scanning Datasets are exposed in Python ([ARROW-17299](https://issues.apache.org/jira/browse/ARROW-17299)). - ExtensionArrays can now be created from a storage array through the pa.array(..) constructor ([ARROW-17834](https://issues.apache.org/jira/browse/ARROW-17834)). - Converting ListArrays containing ExtensionArray values to numpy or pandas works by falling back to the storage array ([ARROW-17813](https://issues.apache.org/jira/browse/ARROW-17813)). - Casting Tables to a new schema now honors the nullability flag in the target schema ([ARROW-16651](https://issues.apache.org/jira/browse/ARROW-16651)).
"[VOTE] Substrait for Flight SQL"
https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl