From 1bd64eb8cdafd20812e9579c7da32dc366388ef2 Mon Sep 17 00:00:00 2001 From: Tapac Date: Wed, 26 Jun 2019 22:37:09 +0300 Subject: [PATCH] coroutines rework --- .../ThreadLocalTransactionManager.kt | 16 --- .../sql/transactions/TransactionApi.kt | 16 +++ .../transactions/experimental/Suspended.kt | 97 +++++++++++++------ .../sql/tests/shared/CoroutineTests.kt | 19 ++-- 4 files changed, 93 insertions(+), 55 deletions(-) diff --git a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/ThreadLocalTransactionManager.kt b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/ThreadLocalTransactionManager.kt index 30f9770a53..20e81bfa87 100644 --- a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/ThreadLocalTransactionManager.kt +++ b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/ThreadLocalTransactionManager.kt @@ -76,22 +76,6 @@ fun transaction(transactionIsolation: Int, repetitionAttempts: Int, db: Data } } -private fun TransactionInterface.rollbackLoggingException(log: (Exception) -> Unit){ - try { - rollback() - } catch (e: Exception){ - log(e) - } -} - -private inline fun TransactionInterface.closeLoggingException(log: (Exception) -> Unit){ - try { - close() - } catch (e: Exception){ - log(e) - } -} - fun inTopLevelTransaction(transactionIsolation: Int, repetitionAttempts: Int, db: Database? = null, statement: Transaction.() -> T): T { var repetitions = 0 diff --git a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/TransactionApi.kt b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/TransactionApi.kt index 25eeb35e0b..c7ec7b7ad0 100644 --- a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/TransactionApi.kt +++ b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/TransactionApi.kt @@ -90,4 +90,20 @@ interface TransactionManager { fun isInitialized() = managers.first != NotInitializedManager } +} + +internal fun TransactionInterface.rollbackLoggingException(log: (Exception) -> Unit){ + try { + rollback() + } catch (e: Exception){ + log(e) + } +} + +internal inline fun TransactionInterface.closeLoggingException(log: (Exception) -> Unit){ + try { + close() + } catch (e: Exception){ + log(e) + } } \ No newline at end of file diff --git a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt index a1be05b2bd..ce8b14ddc9 100644 --- a/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt +++ b/exposed/src/main/kotlin/org/jetbrains/exposed/sql/transactions/experimental/Suspended.kt @@ -1,66 +1,99 @@ package org.jetbrains.exposed.sql.transactions.experimental import kotlinx.coroutines.* +import kotlinx.coroutines.selects.SelectClause1 +import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.sync.withLock -import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.Transaction -import org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManager -import org.jetbrains.exposed.sql.transactions.TransactionManager -import org.jetbrains.exposed.sql.transactions.transaction +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.transactions.* +import org.jetbrains.exposed.sql.transactions.rollbackLoggingException +import java.lang.Exception +import java.util.* import kotlin.coroutines.CoroutineContext suspend fun suspendedTransaction(context: CoroutineContext? = null, db: Database? = null, statement: suspend Transaction.() -> T): T { - return suspendedTransactionAsync(context, db, statement = statement).await().result + return suspendedTransactionAsync(context, db, statement = statement).await() } suspend fun Transaction.suspendedTransaction(context: CoroutineContext? = null, statement: suspend Transaction.() -> T): T { - return suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement).await().result + val suspendedTransactionAsyncInternal = suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement) + return suspendedTransactionAsyncInternal.await() } -class TransactionResult(internal val transaction: Transaction, val result:T) +class TransactionResult(internal val transaction: Transaction, + internal val deferred: Deferred, + private val selectClause: SelectClause1, + internal var closeTransaction : Boolean = true) : Deferred by deferred, SelectClause1 by selectClause { -suspend fun Deferred>.andThen(statement: suspend Transaction.(T) -> R) : Deferred> { - val firstResult = this.await() - return suspendedTransactionAsync { - with(firstResult.transaction) { - suspendedMutex.withLock { - statement(firstResult.result) + override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle { + if (closeTransaction) { + try { + transaction.commit() + transaction.closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: ${transaction.currentStatement}", it) } + } catch (e: Exception) { + transaction.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${transaction.currentStatement}", it) } + throw e } } + return deferred.invokeOnCompletion(handler) + } +} + +suspend fun TransactionResult.andThen(statement: suspend Transaction.(T) -> R) : TransactionResult { + val currentAsync = this + return suspendedTransactionAsyncInternal(currentTransaction = transaction) { + currentAsync.closeTransaction = false + statement(currentAsync.deferred.await()) } } suspend fun suspendedTransactionAsync(context: CoroutineContext? = null, db: Database? = null, - useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : Deferred> { + useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : TransactionResult { val currentTransaction = TransactionManager.currentOrNull().takeIf { useOuterTransactionIfAccessible } return suspendedTransactionAsyncInternal(context, db, currentTransaction, statement) } -suspend private fun suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null, - currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : Deferred> = coroutineScope { - val manager = db?.let { TransactionManager.managerFor(it) } ?: TransactionManager.manager +private suspend fun suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null, + currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : TransactionResult = coroutineScope { + val manager = (currentTransaction?.db ?: db)?.let { TransactionManager.managerFor(it) } + ?: TransactionManager.manager val threadLocalManager = manager as? ThreadLocalTransactionManager if (currentTransaction != null) { val scope = (if (threadLocalManager != null) { (context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(currentTransaction) } else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement(manager) - async(scope) { - currentTransaction.suspendedMutex.withLock { - TransactionResult(currentTransaction, currentTransaction.statement()) - } - } - } else + val asyncLazy = asyncTxInvocation(scope, currentTransaction, statement) + TransactionResult(currentTransaction, asyncLazy, asyncLazy as SelectClause1, false) + } else { + val tx = manager.newTransaction(manager.defaultIsolationLevel) + val scope = (if (threadLocalManager != null) { + (context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx) + } else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement() + val asyncLazy = asyncTxInvocation(scope, tx, statement) + TransactionResult(tx, asyncLazy, asyncLazy as SelectClause1, false) + } +} - transaction(manager.defaultIsolationLevel, manager.defaultRepetitionAttempts, db) { - val tx = this - val scope = (if (threadLocalManager != null) { - (context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx) - } else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement() - async(scope) { - tx.suspendedMutex.withLock { - TransactionResult(tx, tx.statement()) +private fun CoroutineScope.asyncTxInvocation(scope: CoroutineContext, tx: Transaction, statement: suspend Transaction.() -> T): Deferred { + return async(context = scope, start = CoroutineStart.LAZY) { + tx.suspendedMutex.withLock { + try { + tx.statement() + } catch (e: Throwable) { + tx.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${tx.currentStatement}", it) } + throw e + } finally { + try { + tx.currentStatement?.let { + it.close() + tx.currentStatement = null + } + tx.closeExecutedStatements() + } catch (e: Exception) { + exposedLogger.warn("Statements close failed", e) } } + } } } \ No newline at end of file diff --git a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt index d6c819f303..715b4bf23d 100644 --- a/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt +++ b/exposed/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/CoroutineTests.kt @@ -1,8 +1,7 @@ package org.jetbrains.exposed.sql.tests.shared -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* +import kotlinx.coroutines.debug.junit4.CoroutinesTimeout import org.jetbrains.exposed.sql.SchemaUtils import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.insert @@ -13,16 +12,23 @@ import org.jetbrains.exposed.sql.tests.h2.H2Tests import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync import org.jetbrains.exposed.test.utils.RepeatableTest +import org.junit.Rule import org.junit.Test +import kotlin.test.assertEquals class CoroutineTests : DatabaseTestsBase() { + + @Rule + @JvmField + val timeout = CoroutinesTimeout.seconds(10) + @Test @RepeatableTest(10) fun suspendedTx() { withDb { runBlocking { SchemaUtils.create(H2Tests.Testing) - val launchResult = suspendedTransaction { + val launchResult = suspendedTransaction(Dispatchers.IO) { H2Tests.Testing.insert{} launch(Dispatchers.Default) { @@ -63,11 +69,10 @@ class CoroutineTests : DatabaseTestsBase() { H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id] } - launchResult.await().result.join() - assertEquals(1, result.await().result) + launchResult.await() + assertEquals(1, result.await()) SchemaUtils.drop(H2Tests.Testing) } } } - } \ No newline at end of file