Skip to content

Commit

Permalink
coroutines rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Tapac committed Jun 26, 2019
1 parent 6dcce56 commit 1bd64eb
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,6 @@ fun <T> transaction(transactionIsolation: Int, repetitionAttempts: Int, db: Data
}
}

private fun TransactionInterface.rollbackLoggingException(log: (Exception) -> Unit){
try {
rollback()
} catch (e: Exception){
log(e)
}
}

private inline fun TransactionInterface.closeLoggingException(log: (Exception) -> Unit){
try {
close()
} catch (e: Exception){
log(e)
}
}

fun <T> inTopLevelTransaction(transactionIsolation: Int, repetitionAttempts: Int, db: Database? = null, statement: Transaction.() -> T): T {
var repetitions = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,20 @@ interface TransactionManager {

fun isInitialized() = managers.first != NotInitializedManager
}
}

internal fun TransactionInterface.rollbackLoggingException(log: (Exception) -> Unit){
try {
rollback()
} catch (e: Exception){
log(e)
}
}

internal inline fun TransactionInterface.closeLoggingException(log: (Exception) -> Unit){
try {
close()
} catch (e: Exception){
log(e)
}
}
Original file line number Diff line number Diff line change
@@ -1,66 +1,99 @@
package org.jetbrains.exposed.sql.transactions.experimental

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.sync.withLock
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManager
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.*
import org.jetbrains.exposed.sql.transactions.rollbackLoggingException
import java.lang.Exception
import java.util.*
import kotlin.coroutines.CoroutineContext

suspend fun <T> suspendedTransaction(context: CoroutineContext? = null, db: Database? = null, statement: suspend Transaction.() -> T): T {
return suspendedTransactionAsync(context, db, statement = statement).await().result
return suspendedTransactionAsync(context, db, statement = statement).await()
}

suspend fun <T> Transaction.suspendedTransaction(context: CoroutineContext? = null, statement: suspend Transaction.() -> T): T {
return suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement).await().result
val suspendedTransactionAsyncInternal = suspendedTransactionAsyncInternal(context, db, currentTransaction = this, statement = statement)
return suspendedTransactionAsyncInternal.await()
}

class TransactionResult<T>(internal val transaction: Transaction, val result:T)
class TransactionResult<T>(internal val transaction: Transaction,
internal val deferred: Deferred<T>,
private val selectClause: SelectClause1<T>,
internal var closeTransaction : Boolean = true) : Deferred<T> by deferred, SelectClause1<T> by selectClause {

suspend fun <T, R> Deferred<TransactionResult<T>>.andThen(statement: suspend Transaction.(T) -> R) : Deferred<TransactionResult<R>> {
val firstResult = this.await()
return suspendedTransactionAsync {
with(firstResult.transaction) {
suspendedMutex.withLock {
statement(firstResult.result)
override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle {
if (closeTransaction) {
try {
transaction.commit()
transaction.closeLoggingException { exposedLogger.warn("Transaction close failed: ${it.message}. Statement: ${transaction.currentStatement}", it) }
} catch (e: Exception) {
transaction.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${transaction.currentStatement}", it) }
throw e
}
}
return deferred.invokeOnCompletion(handler)
}
}

suspend fun <T, R> TransactionResult<T>.andThen(statement: suspend Transaction.(T) -> R) : TransactionResult<R> {
val currentAsync = this
return suspendedTransactionAsyncInternal(currentTransaction = transaction) {
currentAsync.closeTransaction = false
statement(currentAsync.deferred.await())
}
}

suspend fun <T> suspendedTransactionAsync(context: CoroutineContext? = null, db: Database? = null,
useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : Deferred<TransactionResult<T>> {
useOuterTransactionIfAccessible: Boolean = true, statement: suspend Transaction.() -> T) : TransactionResult<T> {
val currentTransaction = TransactionManager.currentOrNull().takeIf { useOuterTransactionIfAccessible }
return suspendedTransactionAsyncInternal(context, db, currentTransaction, statement)
}

suspend private fun <T> suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null,
currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : Deferred<TransactionResult<T>> = coroutineScope {
val manager = db?.let { TransactionManager.managerFor(it) } ?: TransactionManager.manager
private suspend fun <T> suspendedTransactionAsyncInternal(context: CoroutineContext? = null, db: Database? = null,
currentTransaction: Transaction?, statement: suspend Transaction.() -> T) : TransactionResult<T> = coroutineScope {
val manager = (currentTransaction?.db ?: db)?.let { TransactionManager.managerFor(it) }
?: TransactionManager.manager
val threadLocalManager = manager as? ThreadLocalTransactionManager

if (currentTransaction != null) {
val scope = (if (threadLocalManager != null) {
(context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(currentTransaction)
} else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement(manager)
async(scope) {
currentTransaction.suspendedMutex.withLock {
TransactionResult(currentTransaction, currentTransaction.statement())
}
}
} else
val asyncLazy = asyncTxInvocation(scope, currentTransaction, statement)
TransactionResult(currentTransaction, asyncLazy, asyncLazy as SelectClause1<T>, false)
} else {
val tx = manager.newTransaction(manager.defaultIsolationLevel)
val scope = (if (threadLocalManager != null) {
(context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx)
} else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement()
val asyncLazy = asyncTxInvocation(scope, tx, statement)
TransactionResult(tx, asyncLazy, asyncLazy as SelectClause1<T>, false)
}
}

transaction(manager.defaultIsolationLevel, manager.defaultRepetitionAttempts, db) {
val tx = this
val scope = (if (threadLocalManager != null) {
(context ?: coroutineContext) + threadLocalManager.threadLocal.asContextElement(tx)
} else (context ?: coroutineContext)) + TransactionManager.currentThreadManager.asContextElement()
async(scope) {
tx.suspendedMutex.withLock {
TransactionResult(tx, tx.statement())
private fun <T> CoroutineScope.asyncTxInvocation(scope: CoroutineContext, tx: Transaction, statement: suspend Transaction.() -> T): Deferred<T> {
return async(context = scope, start = CoroutineStart.LAZY) {
tx.suspendedMutex.withLock {
try {
tx.statement()
} catch (e: Throwable) {
tx.rollbackLoggingException { exposedLogger.warn("Transaction rollback failed: ${it.message}. Statement: ${tx.currentStatement}", it) }
throw e
} finally {
try {
tx.currentStatement?.let {
it.close()
tx.currentStatement = null
}
tx.closeExecutedStatements()
} catch (e: Exception) {
exposedLogger.warn("Statements close failed", e)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.jetbrains.exposed.sql.tests.shared

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.insert
Expand All @@ -13,16 +12,23 @@ import org.jetbrains.exposed.sql.tests.h2.H2Tests
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
import org.jetbrains.exposed.test.utils.RepeatableTest
import org.junit.Rule
import org.junit.Test
import kotlin.test.assertEquals

class CoroutineTests : DatabaseTestsBase() {

@Rule
@JvmField
val timeout = CoroutinesTimeout.seconds(10)

@Test @RepeatableTest(10)
fun suspendedTx() {
withDb {
runBlocking {
SchemaUtils.create(H2Tests.Testing)

val launchResult = suspendedTransaction {
val launchResult = suspendedTransaction(Dispatchers.IO) {
H2Tests.Testing.insert{}

launch(Dispatchers.Default) {
Expand Down Expand Up @@ -63,11 +69,10 @@ class CoroutineTests : DatabaseTestsBase() {
H2Tests.Testing.select { H2Tests.Testing.id.eq(1) }.single()[H2Tests.Testing.id]
}

launchResult.await().result.join()
assertEquals(1, result.await().result)
launchResult.await()
assertEquals(1, result.await())
SchemaUtils.drop(H2Tests.Testing)
}
}
}

}

0 comments on commit 1bd64eb

Please sign in to comment.