Skip to content

Commit

Permalink
FMWK-622 Add support for multi-record transactions (#79)
Browse files Browse the repository at this point in the history
* FMWK-622 Add support for multi-record transactions

* reduce network timeout in transaction test

* update transaction isolation methods in connection

* commit any active transaction when switching to auto-commit mode

* Update examples.md with information about transactions

* Update README.md with transactions

* Link to the SC documentation.

* update the Java client to version 9.0.2

---------

Co-authored-by: Ronen Botzer <[email protected]>
  • Loading branch information
reugn and rbotzer authored Dec 8, 2024
1 parent f459615 commit 5726334
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 70 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Packages documentation can be found [here](https://javadoc.io/doc/com.aerospike/
* TRUNCATE TABLE
* CREATE INDEX
* DROP INDEX
* Transactions

See [examples](docs/examples.md) of SQL.

Expand Down
35 changes: 35 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,38 @@ CREATE INDEX port_idx ON port_list (port);
```sql
DROP INDEX port_idx ON port_list;
```

## Transactions
**Note:** Wrapping multiple commands in a transaction requires Aerospike Database version 8.0+. Requires the namespace to be configured up with [`strong-consistency true`](https://aerospike.com/docs/server/operations/configure/consistency).

[JDBC transactions](https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html) are started by setting auto-commit to false, which acts as an implicit `BEGIN`. Every subsequent command is part of the transaction until a
commit or rollback are issued. A new transaction begins automatically after either is executed. Switching back to auto-commit will rollback an uncommitted transaction.

In a data browser like DBeaver, the UI has buttons to control switching the auto-commit on and off, along with commit and rollback buttons.

```sql
-- Switch to manual (begins the transaction)
SELECT * FROM port_list WHERE __key="ntp";
UPDATE port_list SET port=124 where __key="ntp";
UPDATE port_list SET port=162 where __key="snmp";
--COMMIT

--Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");

-- Switch to manual (begins the transaction)
UPDATE port_list SET port=123 where __key="ntp";
UPDATE port_list SET port=161 where __key="snmp";
--Rollback

-- Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");

-- Switch to manual (begins the transaction)
UPDATE port_list SET port=123 where __key="ntp";
UPDATE port_list SET port=161 where __key="snmp";
--Commit

-- Switch to auto
SELECT * FROM port_list WHERE __key IN ("ntp", "snmp");
```
1 change: 1 addition & 0 deletions docs/params.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ Consider setting a custom value if really necessary.
| metadataCacheTtlSeconds | 3600 | Database metadata cache TTL in seconds |
| schemaBuilderMaxRecords | 1000 | The number of records to be used to build the table schema |
| showRecordMetadata | false | Add record metadata columns (__digest, __ttl, __gen) |
| txnTimeoutSeconds | 10 | Multi-record transaction timeout in seconds |
53 changes: 3 additions & 50 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<maven-surefire-plugin.version>3.5.0</maven-surefire-plugin.version>
<nexus-staging-maven-plugin.version>1.6.14</nexus-staging-maven-plugin.version>
<maven-javadoc-plugin.version>2.9.1</maven-javadoc-plugin.version>
<maven-javadoc-plugin.version>3.11.1</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>

<aerospike-client.version>8.1.4</aerospike-client.version>
<aerospike-client.version>9.0.2</aerospike-client.version>
<netty.version>4.1.114.Final</netty.version>
<jackson.version>2.18.1</jackson.version>
<calcite.version>1.38.0</calcite.version>
Expand Down Expand Up @@ -152,52 +152,6 @@
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>${nexus-staging-maven-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>${maven-gpg-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
</plugin>
</plugins>
</pluginManagement>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -255,8 +209,7 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
<reportOutputDirectory>${basedir}</reportOutputDirectory>
<doclint>all,-missing</doclint>
<doctitle>A JDBC driver for the Aerospike database</doctitle>
<show>public</show>
<splitindex>true</splitindex>
Expand Down
82 changes: 74 additions & 8 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.aerospike.jdbc;

import com.aerospike.client.AbortStatus;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.CommitStatus;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Txn;
import com.aerospike.client.policy.Policy;
import com.aerospike.jdbc.model.DriverConfiguration;
import com.aerospike.jdbc.sql.SimpleWrapper;
Expand Down Expand Up @@ -42,6 +46,8 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private volatile Map<String, Class<?>> typeMap = emptyMap();
private volatile int holdability = HOLD_CURSORS_OVER_COMMIT;
private volatile boolean closed;
private boolean autoCommit = true;
private Txn txn;

public AerospikeConnection(String url, Properties props) {
logger.info("Init AerospikeConnection");
Expand Down Expand Up @@ -77,24 +83,70 @@ public String nativeSQL(String sql) throws SQLException {
@Override
public boolean getAutoCommit() throws SQLException {
checkClosed();
return true;
return autoCommit;
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
checkClosed();
// no-op
if (this.autoCommit == autoCommit) {
return;
}
if (!this.autoCommit) {
commit();
}
this.autoCommit = autoCommit;
logger.fine(() -> format("setAutoCommit = %b", autoCommit));
}

/**
* Requires Aerospike Server 8.0+.
*
* @throws SQLException if the transaction commit fails.
*/
@Override
public void commit() throws SQLException {
checkClosed();
// no-op
if (autoCommit) {
throw new SQLException("Connection is in auto-commit mode");
}
if (txn == null) {
logger.info("No active transaction to commit");
return;
}
try {
CommitStatus status = client.commit(txn);
logger.info(() -> format("MRT %d commit status: %s", txn.getId(), status));
} catch (AerospikeException e) {
throw new SQLException(e);
} finally {
txn = null;
}
}

/**
* Requires Aerospike Server 8.0+.
*
* @throws SQLException if the transaction rollback fails.
*/
@Override
public void rollback() throws SQLException {
throw new SQLFeatureNotSupportedException("rollback is not supported");
checkClosed();
if (autoCommit) {
throw new SQLException("Connection is in auto-commit mode");
}
if (txn == null) {
logger.info("No active transaction to rollback");
return;
}
try {
AbortStatus status = client.abort(txn);
logger.info(() -> format("MRT %d rollback status: %s", txn.getId(), status));
} catch (AerospikeException e) {
throw new SQLException(e);
} finally {
txn = null;
}
}

@Override
Expand Down Expand Up @@ -143,15 +195,14 @@ public void setCatalog(String catalog) throws SQLException {
@Override
public int getTransactionIsolation() throws SQLException {
checkClosed();
return TRANSACTION_NONE;
return TRANSACTION_SERIALIZABLE;
}

@Override
public void setTransactionIsolation(int level) throws SQLException {
checkClosed();
if (level != TRANSACTION_NONE) {
throw new SQLFeatureNotSupportedException(format("Aerospike does not support transactions," +
" so the only valid value here is TRANSACTION_NONE=%d", TRANSACTION_NONE));
if (level != TRANSACTION_SERIALIZABLE) {
throw new SQLException(format("Unsupported transaction isolation level: %d", level));
}
}

Expand All @@ -168,11 +219,13 @@ public void clearWarnings() throws SQLException {
}

@Override
@SuppressWarnings("MagicConstant")
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return createStatement(resultSetType, resultSetConcurrency, holdability);
}

@Override
@SuppressWarnings("MagicConstant")
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
throws SQLException {
return prepareStatement(sql, resultSetType, resultSetConcurrency, holdability);
Expand Down Expand Up @@ -239,6 +292,7 @@ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
checkClosed();
checkTxn();
validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability);
return new AerospikeStatement(client, this);
}
Expand All @@ -247,10 +301,18 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
int resultSetHoldability) throws SQLException {
checkClosed();
checkTxn();
validateResultSetParameters(resultSetType, resultSetConcurrency, resultSetHoldability);
return new AerospikePreparedStatement(client, this, sql);
}

private void checkTxn() {
if (!autoCommit && txn == null) {
txn = new Txn();
txn.setTimeout(config.getDriverPolicy().getTxnTimeoutSeconds());
}
}

private void validateResultSetParameters(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
if (resultSetType != TYPE_FORWARD_ONLY) {
Expand Down Expand Up @@ -428,4 +490,8 @@ public AerospikeVersion getAerospikeVersion() {
public IAerospikeClient getClient() {
return client;
}

public Txn getTxn() {
return txn;
}
}
11 changes: 6 additions & 5 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.sql.Connection.TRANSACTION_NONE;
import static java.sql.Connection.TRANSACTION_SERIALIZABLE;
import static java.sql.Types.*;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -340,7 +341,7 @@ public boolean supportsMultipleResultSets() {

@Override
public boolean supportsMultipleTransactions() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -655,17 +656,17 @@ public int getMaxUserNameLength() {

@Override
public int getDefaultTransactionIsolation() {
return TRANSACTION_NONE;
return TRANSACTION_SERIALIZABLE;
}

@Override
public boolean supportsTransactions() {
return false;
return true;
}

@Override
public boolean supportsTransactionIsolationLevel(int level) {
return TRANSACTION_NONE == level;
return TRANSACTION_NONE == level || TRANSACTION_SERIALIZABLE == level;
}

@Override
Expand All @@ -675,7 +676,7 @@ public boolean supportsDataDefinitionAndDataManipulationTransactions() {

@Override
public boolean supportsDataManipulationTransactionsOnly() {
return false;
return true;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/aerospike/jdbc/AerospikeStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected AerospikeQuery parseQuery(String sql, List<Object> sqlParameters) thro
if (query.getCatalog() == null) {
query.setCatalog(catalog);
}
query.setTxn(connection.getTxn());
return query;
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.aerospike.jdbc.model;

import com.aerospike.client.Txn;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.jdbc.predicate.QueryPredicate;
Expand Down Expand Up @@ -43,6 +44,8 @@ public class AerospikeQuery {
private List<Object> values;
private List<String> columns;

private Txn txn;

public AerospikeQuery() {
this.queryType = QueryType.UNKNOWN;
}
Expand Down Expand Up @@ -147,6 +150,14 @@ public void setColumns(List<String> columns) {
this.columns = columns;
}

public Txn getTxn() {
return txn;
}

public void setTxn(Txn txn) {
this.txn = txn;
}

public String[] columnBins() {
String[] binNames = columns.stream()
.filter(c -> !Objects.equals(c, ASTERISK))
Expand Down
Loading

0 comments on commit 5726334

Please sign in to comment.