-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fault Tolerant Execution for PostgreSQL and MySQL connectors #14445
Conversation
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
Writes appear atomic in these connectors, because they are publish in Did you mean "Support fault tolerant execution when writing to PostgreSQL or MySQL connector" ? |
@findepi yes - sorry for the confusion. my brain was pinned to the language of the original issue assigned to me, and even though it felt like the writes already were atomic, i was just going along with the language. i've adjusted the verbiage in the PR, though i now want to change the name of the |
1961526
to
19486e8
Compare
19486e8
to
88b4988
Compare
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSinkProvider.java
Outdated
Show resolved
Hide resolved
@mwd410 tests are failing. Please make sure there are no change-relevant tests, fixing those may make shape of PR significantly. |
plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java
Outdated
Show resolved
Hide resolved
plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java
Outdated
Show resolved
Hide resolved
plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/FaultTolerantJdbcClient.java
Outdated
Show resolved
Hide resolved
plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/FaultTolerantJdbcClient.java
Outdated
Show resolved
Hide resolved
plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/mapping/DefaultIdentifierMapping.java
Outdated
Show resolved
Hide resolved
c8639e8
to
e6d855e
Compare
e6d855e
to
90bb1dc
Compare
8e03bf9
to
f58c304
Compare
@@ -928,17 +928,9 @@ protected String createTableSql(RemoteTableName remoteTableName, List<String> co | |||
} | |||
|
|||
@Override | |||
protected String buildInsertSql(ConnectorSession session, RemoteTableName targetTable, RemoteTableName sourceTable, List<String> columnNames) | |||
protected String postProcessInsertTableClause(ConnectorSession session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass source sql statement as arguemnt and return postporcessed one (with suffix appended)
@@ -735,7 +743,7 @@ public void rollback() | |||
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode) | |||
{ | |||
verify(!((JdbcTableHandle) tableHandle).isSynthetic(), "Not a table reference: %s", tableHandle); | |||
if (retryMode != NO_RETRIES) { | |||
if (retryMode != NO_RETRIES && !jdbcClient.supportsFaultTolerantExecution(session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move this checks inside client itself. Now you are checking here, and there doing same check again inside to decide which code flow to follow - it is confusing.
Also please emit different error message if connector does not support retries at all and when connetor supports retries but NON_TRANSACTIONAL_INSERT
is set. Currently we would get This connector does not support query retries
for latter case and it is not a good error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also then maybe we do not need to expose supportsFaultTolerantExecution()
from JdbcClient
@@ -698,7 +699,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle | |||
@Override | |||
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode) | |||
{ | |||
if (retryMode != NO_RETRIES) { | |||
if (retryMode != NO_RETRIES && !jdbcClient.supportsFaultTolerantExecution(session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as for beginInsert
@@ -844,16 +1013,25 @@ public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle | |||
} | |||
} | |||
|
|||
@Override | |||
public boolean supportsFaultTolerantExecution(ConnectorSession session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given internal uses of this method it should rather be "shouldUseFaultTolerantExecution"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. One comment really.
1dfee0f
to
173e7c7
Compare
438a844
to
689a121
Compare
689a121
to
e09c4a2
Compare
public BaseJdbcClient( | ||
BaseJdbcConfig config, | ||
String identifierQuote, | ||
ConnectionFactory connectionFactory, | ||
QueryBuilder queryBuilder, | ||
IdentifierMapping identifierMapping) | ||
{ | ||
this( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deprecate the old constructor and remove all usages of it.
Description
This PR primarily introduces fault tolerant writes for Postgres and MySQL. To do this, the write process is changed thusly:
ConstructorPageSinkId
to trino-spi. The implementation of this isPageSinkId
.long
identifier. We choselong
instead of a string for the fast joins.fromTaskId
inPageSinkId
), is calculated thusly:id
partitionId
- this affords us ~16 million partitionsattemptId
- this affords us 256 attemptstrino_page_sink_id
column to that temporary table.finish
, will return its identifier in the fragment slice passed back to the coordinator.insert into ... select from
, we do a semi-join to that helper table created in step 7, thereby including only the rows inserted by successful tasks.Non-technical explanation
This guarantees the ability to only include data from successful tasks. There is no additional necessary cleanup of data from failed tasks. This PR avoids changing any behavior of existing JDBC clients, including in 3rd party plugins, unless you opt-in by extending
FaultTolerantJdbcClient
. I have opted in for thePostgreSqlClient
, and theMySqlClient
.Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text:
PostgreSQL and MySQL connectors now fault tolerant and only include data from successful tasks, automatically discarding any data from retried tasks.