Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent run of DB Migrations with MySql and MariaDB fails (need to use named locks) #108

Merged
merged 3 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ebean-migration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<!-- Note: Using ojdbc10 here will affect loading other drivers on jdk8,
because the driverManager will stop on first load error and stops loading
other drivers -->
<artifactId>ojdbc8</artifactId>
<version>19.12.0.0</version>
<scope>test</scope>
</dependency>

<!--
mvn install:install-file -Dfile=/some/path/to/ojdbc7.jar -DgroupId=oracle \
-DartifactId=oracle-jdbc -Dversion=7.0 -Dpackaging=jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ static String normalise(Connection connection) {
return readPostgres(connection);
} else if (productName.contains(MYSQL)) {
return MYSQL;
} else if (productName.contains(MARIADB)) {
return MARIADB;
} else if (productName.contains(ORACLE)) {
return ORACLE;
} else if (productName.contains("microsoft")) {
Expand Down Expand Up @@ -70,6 +72,12 @@ private static String readPostgres(Connection connection) {
@Nonnull
static MigrationPlatform platform(String platformName) {
switch (platformName) {
case MYSQL:
case MARIADB:
return new MigrationPlatform.MySql();
case ORACLE:
case H2:
return new MigrationPlatform.LogicalLock();
case POSTGRES:
return new MigrationPlatform.Postgres();
case SQLSERVER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public interface DbPlatformNames {
String SQLSERVER = "sqlserver";
String SQLITE = "sqlite";
String POSTGRES = "postgres";
String MARIADB = "mariadb";
String MYSQL = "mysql";
String ORACLE = "oracle";
String DB2 = "db2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ protected void run(Connection connection, boolean checkStateMode) {

MigrationTable table = new MigrationTable(migrationConfig, connection, checkStateMode, platform);
table.createIfNeededAndLock();

runMigrations(resources, table, checkStateMode);
connection.commit();

table.runNonTransactional();
try {
runMigrations(resources, table, checkStateMode);
connection.commit();
table.runNonTransactional();
} finally {
table.unlockMigrationTable();
}

} catch (MigrationException e) {
rollback(connection);
Expand All @@ -131,7 +133,6 @@ protected void run(Connection connection, boolean checkStateMode) {
private void runMigrations(LocalMigrationResources resources, MigrationTable table, boolean checkStateMode) throws SQLException {
// get the migrations in version order
List<LocalMigrationResource> localVersions = resources.getVersions();

if (table.isEmpty()) {
LocalMigrationResource initVersion = getInitVersion();
if (initVersion != null) {
Expand All @@ -141,7 +142,6 @@ private void runMigrations(LocalMigrationResources resources, MigrationTable tab
return;
}
}

log.info("Local migrations:{} existing migrations:{} checkState:{}", localVersions.size(), table.size(), checkStateMode);
checkMigrations = table.runAll(localVersions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


import io.ebean.ddlrunner.DdlDetect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.sql.Connection;
Expand All @@ -16,6 +18,8 @@
*/
public class MigrationPlatform {

private static final Logger log = LoggerFactory.getLogger("io.ebean.DDL");

private static final String BASE_SELECT_ID = "select id from ";
private static final String BASE_SELECT_ALL = "select id, mtype, mstatus, mversion, mcomment, mchecksum, run_on, run_by, run_time from ";

Expand All @@ -31,18 +35,50 @@ DdlDetect ddlDetect() {
return DdlDetect.NONE;
}

void unlockMigrationTable(String sqlTable, Connection connection) throws SQLException {
// do nothing by default for select for update case
}

/**
* Lock the migration table. The base implementation uses row locking but lock table would be preferred when available.
*/
void lockMigrationTable(String sqlTable, Connection connection) throws SQLException {
for (int attempt = 0; attempt < 5; attempt++) {
if (lockRows(sqlTable, connection) > 0) {
// successfully holding row locks
return;
}
backoff(attempt);
}
throw new IllegalStateException("Failed to obtain row locks on migration table due to it being empty?");
}

private static void backoff(int attempt) {
try {
if (attempt % 100 == 0) {
log.warn("In backoff loop attempting to obtain lock on DBMigration table ...");
} else {
log.trace("in backoff loop obtaining lock...");
}
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while trying to obtain lock on migration table", e);
}
}

private int lockRows(String sqlTable, Connection connection) throws SQLException {
int rowCount = 0;
final String selectSql = sqlSelectForUpdate(sqlTable);
try (PreparedStatement query = connection.prepareStatement(selectSql)) {
try (ResultSet resultSet = query.executeQuery()) {
while (resultSet.next()) {
resultSet.getInt(1);
rowCount++;
}
}
}
return rowCount;
}

/**
Expand Down Expand Up @@ -78,6 +114,50 @@ String sqlSelectForReading(String table) {
return BASE_SELECT_ALL + table + forUpdateSuffix;
}

public static class LogicalLock extends MigrationPlatform {

@Override
void lockMigrationTable(String sqlTable, Connection connection) throws SQLException {
int attempts = 0;
while (!obtainLogicalLock(sqlTable, connection)) {
backoff(++attempts);
}
log.trace("obtained logical lock");
}

@Override
void unlockMigrationTable(String sqlTable, Connection connection) throws SQLException {
releaseLogicalLock(sqlTable, connection);
connection.commit();
}

private boolean obtainLogicalLock(String sqlTable, Connection connection) throws SQLException {
try (PreparedStatement query = connection.prepareStatement("update " + sqlTable + " set mcomment=? where id=? and mcomment=?")) {
query.setString(1, "locked");
query.setInt(2, 0);
query.setString(3, "<init>");
if (query.executeUpdate() == 1) {
connection.commit();
return true;
} else {
connection.rollback();
return false;
}
}
}

private void releaseLogicalLock(String sqlTable, Connection connection) throws SQLException {
String sql = "update " + sqlTable + " set mcomment='<init>' where id=0";
try (PreparedStatement query = connection.prepareStatement(sql)) {
if (query.executeUpdate() != 1) {
log.error("Failed to release logical lock. Please review why [" + sql + "] didn't update the row?");
} else {
log.trace("released logical lock");
}
}
}
}

public static class Postgres extends MigrationPlatform {

@Override
Expand All @@ -93,6 +173,38 @@ void lockMigrationTable(String sqlTable, Connection connection) throws SQLExcept
}
}

/**
* MySql and MariaDB need to use named locks due to implicit commits with DDL.
*/
public static class MySql extends MigrationPlatform {

@Override
void lockMigrationTable(String sqlTable, Connection connection) throws SQLException {
int attempts = 0;
while (!obtainNamedLock(connection)) {
backoff(++attempts);
}
}

private boolean obtainNamedLock(Connection connection) throws SQLException {
try (PreparedStatement query = connection.prepareStatement("select get_lock('ebean_migration', 10)")) {
try (ResultSet resultSet = query.executeQuery()) {
if (resultSet.next()) {
return resultSet.getInt(1) == 1;
}
}
}
return false;
}

@Override
void unlockMigrationTable(String sqlTable, Connection connection) throws SQLException {
try (PreparedStatement query = connection.prepareStatement("select release_lock('ebean_migration')")) {
query.execute();
}
}
}

public static class SqlServer extends MigrationPlatform {

public SqlServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,15 @@ private ScriptTransform createScriptTransform(MigrationConfig config) {
*/
public void createIfNeededAndLock() throws SQLException, IOException {
if (!tableExists()) {
createTable();
try {
createTable();
} catch (SQLException e) {
if (tableExists()) {
log.info("Ignoring error during table creation, as an other process may have created the table");
} else {
throw e;
}
}
}
obtainLockWithWait();
readExistingMigrations();
Expand All @@ -166,6 +174,13 @@ private void obtainLockWithWait() throws SQLException {
platform.lockMigrationTable(sqlTable, connection);
}

/**
* Release a lock on the migration table (MySql, MariaDB only).
*/
public void unlockMigrationTable() throws SQLException {
platform.unlockMigrationTable(sqlTable, connection);
}

/**
* Read the migration table with details on what migrations have run.
* This must execute after we have completed the wait for the lock on
Expand All @@ -181,6 +196,7 @@ private void readExistingMigrations() throws SQLException {
private void createTable() throws IOException, SQLException {
scriptRunner.runScript(createTableDdl(), "create migration table");
createInitMetaRow().executeInsert(connection, insertSql);
connection.commit();
}

/**
Expand Down
Loading