Skip to content

Commit

Permalink
issue #3136 handle multiple transactions for batch bundles when offlo…
Browse files Browse the repository at this point in the history
…ad is enabled

Signed-off-by: Robin Arnold <[email protected]>
  • Loading branch information
punktilious committed Mar 29, 2022
1 parent 8d2a74f commit b000c79
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.ibm.fhir.persistence.jdbc.connection;

import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -47,25 +48,25 @@ public class FHIRUserTransactionAdapter implements FHIRPersistenceTransaction {
// support nesting by tracking the number of begin/end requests
private int startCount;

// A handler to be called after a transaction has been rolled back
private final Runnable rolledBackHandler;
// A handler to be called after a transaction has completed
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
* @param tx
* @param syncRegistry
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler
*/
public FHIRUserTransactionAdapter(UserTransaction tx, TransactionSynchronizationRegistry syncRegistry, FHIRPersistenceJDBCCache cache,
String transactionDataKey, Runnable rolledBackHandler) {
String transactionDataKey, Consumer<Boolean> afterTransactionHandler) {
this.userTransaction = tx;
this.syncRegistry = syncRegistry;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
startedByThis = false;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

/**
Expand All @@ -91,7 +92,7 @@ public void begin() throws FHIRPersistenceException {
// On starting a new transaction, we need to register a callback so that
// the cache is informed when the transaction commits it can promote thread-local
// ids to the shared caches.
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler));
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler));

} catch (Exception x) {
log.log(Level.SEVERE, "failed to start transaction", x);
Expand All @@ -105,7 +106,7 @@ public void begin() throws FHIRPersistenceException {
// On starting a bulk transaction, we need to register a callback so that
// the cache is informed when the transaction commits it can promote thread-local
// ids to the shared caches.
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler));
syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler));

// transaction is already active, so this is a nested request
this.startCount++;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* (C) Copyright IBM Corp. 2020, 2021
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.persistence.jdbc.connection;

import java.util.function.Consumer;

import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.UserTransaction;

Expand All @@ -26,7 +28,7 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory {

private final String transactionDataKey;

private final Runnable rolledBackHandler;
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
Expand All @@ -35,19 +37,19 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory {
* @param syncReg
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler
*/
public FHIRUserTransactionFactory(UserTransaction tx, TransactionSynchronizationRegistry syncReg, FHIRPersistenceJDBCCache cache, String transactionDataKey,
Runnable rolledBackHandler) {
Consumer<Boolean> afterTransactionHandler) {
this.userTransaction = tx;
this.syncRegistry = syncReg;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

@Override
public FHIRPersistenceTransaction create() {
return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, rolledBackHandler);
return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, afterTransactionHandler);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/*
* (C) Copyright IBM Corp. 2020
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.persistence.jdbc.impl;

import java.util.function.Consumer;
import java.util.logging.Logger;

import javax.transaction.Status;
Expand Down Expand Up @@ -33,23 +34,23 @@ public class CacheTransactionSync implements Synchronization {

private final String transactionDataKey;

// A callback when we hit a rollback
private final Runnable rolledBackHandler;
// Called after the transaction completes (true == committed; false == rolled back)
private final Consumer<Boolean> afterTransactionHandler;

/**
* Public constructor
*
* @param txSyncRegistry
* @param cache
* @param transactionDataKey
* @param rolledBackHandler
* @param afterTransactionHandler
*/
public CacheTransactionSync(TransactionSynchronizationRegistry txSyncRegistry, FHIRPersistenceJDBCCache cache, String transactionDataKey,
Runnable rolledBackHandler) {
Consumer<Boolean> afterTransactionHandler) {
this.txSyncRegistry = txSyncRegistry;
this.cache = cache;
this.transactionDataKey = transactionDataKey;
this.rolledBackHandler = rolledBackHandler;
this.afterTransactionHandler = afterTransactionHandler;
}

@Override
Expand All @@ -71,13 +72,16 @@ public void beforeCompletion() {
public void afterCompletion(int status) {
if (status == Status.STATUS_COMMITTED) {
cache.transactionCommitted();
if (afterTransactionHandler != null) {
afterTransactionHandler.accept(Boolean.TRUE);
}
} else {
// probably a rollback, so throw away everything
logger.info("Transaction failed - afterCompletion(status = " + status + ")");
cache.transactionRolledBack();

if (rolledBackHandler != null) {
rolledBackHandler.run();
if (afterTransactionHandler != null) {
afterTransactionHandler.accept(Boolean.FALSE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ public class FHIRPersistenceJDBCImpl implements FHIRPersistence, SchemaNameSuppl
// A list of payload persistence responses in case we have a rollback to clean up
private final List<PayloadPersistenceResponse> payloadPersistenceResponses = new ArrayList<>();

// A list of EraseResourceRec referencing offload resource records to erase if the current transaction commits
private final List<ErasedResourceRec> eraseResourceRecs = new ArrayList<>();

/**
* Constructor for use when running as web application in WLP.
* @throws Exception
Expand Down Expand Up @@ -267,12 +270,13 @@ public FHIRPersistenceJDBCImpl(FHIRPersistenceJDBCCache cache, FHIRPayloadPersis
boolean enableReadOnlyReplicas = fhirConfig.getBooleanProperty(FHIRConfiguration.PROPERTY_JDBC_ENABLE_READ_ONLY_REPLICAS, Boolean.FALSE);
this.connectionStrategy = new FHIRDbTenantDatasourceConnectionStrategy(trxSynchRegistry, buildActionChain(), enableReadOnlyReplicas);

this.transactionAdapter = new FHIRUserTransactionAdapter(userTransaction, trxSynchRegistry, cache, TXN_DATA_KEY, () -> handleRollback());
this.transactionAdapter = new FHIRUserTransactionAdapter(userTransaction, trxSynchRegistry, cache, TXN_DATA_KEY, (committed) -> transactionCompleted(committed));

// Use of legacy whole-system search parameters disabled by default
this.legacyWholeSystemSearchParamsEnabled =
fhirConfig.getBooleanProperty(PROPERTY_SEARCH_ENABLE_LEGACY_WHOLE_SYSTEM_SEARCH_PARAMS, false);


log.exiting(CLASSNAME, METHODNAME);
}

Expand Down Expand Up @@ -369,6 +373,11 @@ public <T extends Resource> SingleResourceResult<T> create(FHIRPersistenceContex
try (Connection connection = openConnection()) {
doCachePrefill(connection);

if (context.getOffloadResponse() != null) {
// Remember this payload offload response as part of the current transaction
this.payloadPersistenceResponses.add(context.getOffloadResponse());
}

// This create() operation is only called by a REST create. If the given resource
// contains an id, then for R4 we need to ignore it and replace it with our
// system-generated value. For the update-or-create scenario, see update().
Expand Down Expand Up @@ -560,6 +569,12 @@ public <T extends Resource> SingleResourceResult<T> update(FHIRPersistenceContex

try (Connection connection = openConnection()) {
doCachePrefill(connection);

if (context.getOffloadResponse() != null) {
// Remember this payload offload response as part of the current transaction
this.payloadPersistenceResponses.add(context.getOffloadResponse());
}

ResourceDAO resourceDao = makeResourceDAO(connection);
ParameterDAO parameterDao = makeParameterDAO(connection);

Expand Down Expand Up @@ -1027,6 +1042,12 @@ public <T extends Resource> void delete(FHIRPersistenceContext context, Class<T>

try (Connection connection = openConnection()) {
doCachePrefill(connection);

if (context.getOffloadResponse() != null) {
// Remember this payload offload response as part of the current transaction
this.payloadPersistenceResponses.add(context.getOffloadResponse());
}

ResourceDAO resourceDao = makeResourceDAO(connection);

// Create a new Resource DTO instance to represent the deletion marker.
Expand Down Expand Up @@ -1518,15 +1539,14 @@ private void fetchPayloadsForDTOList(List<ResourceResult<? extends Resource>> re
resourceTypeId = getResourceTypeId(resourceType);
}

// If a specific version of a resource has been deleted using $erase, it
// is possible for the result here to be null.
// If the resource has been deleted, no payload has been stored so there's no
// need to try and fetch it
CompletableFuture<ResourceResult<? extends Resource>> cf;
if (!resourceDTO.isDeleted()) {
// Trigger the read and stash the async response
cf = payloadPersistence.readResourceAsync(resourceType, rowResourceTypeName, resourceTypeId, resourceDTO.getLogicalId(), resourceDTO.getVersionId(), resourceDTO.getResourcePayloadKey(), resourceDTO.getLastUpdated().toInstant(), elements);
} else {
// Payload never stored for deletion markers, so there's no resource to read. Just
// knock up a new ResourceResult to represent the deleted resource in the result list
// Knock up a new ResourceResult to represent the deleted resource in the result list
ResourceResult.Builder<? extends Resource> builder = new ResourceResult.Builder<>();
builder.logicalId(resourceDTO.getLogicalId());
builder.resourceTypeName(rowResourceTypeName);
Expand Down Expand Up @@ -2724,31 +2744,51 @@ private ParameterTransactionDataImpl getTransactionDataForDatasource(String data
}

/**
* Callback from TransactionData when a transaction has been rolled back
* @param payloadPersistenceResponses an immutable list of {@link PayloadPersistenceResponse}
* Callback from TransactionData when a transaction has ended (after commit or rollback)
* @param committed true if the transaction completed, or false if it rolled back
*/
private void handleRollback() {
if (payloadPersistenceResponses.size() > 0 && payloadPersistence == null) {
throw new IllegalStateException("handleRollback called but payloadPersistence is not configured");
}
// try to delete each of the payload objects we've stored
// because the transaction has been rolled back
log.fine("starting rollback handling for PayloadPersistenceResponse data");
for (PayloadPersistenceResponse ppr: payloadPersistenceResponses) {
try {
log.fine(() -> "tx rollback - deleting payload: " + ppr.toString());
payloadPersistence.deletePayload(ppr.getResourceTypeName(), ppr.getResourceTypeId(),
ppr.getLogicalId(), ppr.getVersionId(), ppr.getResourcePayloadKey());
} catch (Exception x) {
// Nothing more we can do other than log the issue. Any rows we can't process
// here (e.g. network outage) will be orphaned. These orphaned rows
// will be removed by the reconciliation process which scans the payload
// persistence repository and looks for missing RDBMS records.
log.log(Level.SEVERE, "rollback failed to delete payload: " + ppr.toString(), x);
private void transactionCompleted(Boolean committed) {
if (committed) {
// See if we have any erase resources to clean up
for (ErasedResourceRec err: this.eraseResourceRecs) {
try {
erasePayload(err);
} catch (Exception x) {
// The transaction has already committed, so we don't want to fail
// the request. This is a server-side issue now so all we can do is
// log.
log.log(Level.SEVERE, "failed to erase offload payload for '"
+ err.toString()
+ "'. Run reconciliation to ensure this record is removed.", x);
}
}
} else {
// Try to delete each of the payload objects we've stored in this
// transaction because the transaction has been rolled back
if (payloadPersistenceResponses.size() > 0 && payloadPersistence == null) {
throw new IllegalStateException("handleRollback called but payloadPersistence is not configured");
}

log.fine("starting rollback handling for PayloadPersistenceResponse data");
for (PayloadPersistenceResponse ppr: payloadPersistenceResponses) {
try {
log.fine(() -> "tx rollback - deleting payload: " + ppr.toString());
payloadPersistence.deletePayload(ppr.getResourceTypeName(), ppr.getResourceTypeId(),
ppr.getLogicalId(), ppr.getVersionId(), ppr.getResourcePayloadKey());
} catch (Exception x) {
// Nothing more we can do other than log the issue. Any rows we can't process
// here (e.g. network outage) will be orphaned. These orphaned rows
// will be removed by the reconciliation process which scans the payload
// persistence repository and looks for missing RDBMS records.
log.log(Level.SEVERE, "rollback failed to delete payload: " + ppr.toString(), x);
}
}
}

// important to clear this list after each transaction because batch bundles
// use the same FHIRPersistenceJDBCImpl instance for each entry
payloadPersistenceResponses.clear();
eraseResourceRecs.clear();
}

/**
Expand Down Expand Up @@ -2786,7 +2826,8 @@ public void onCommit(Collection<ResourceTokenValueRec> records, Collection<Resou
throw fx;
}

// At this stage we also need to check that any (async) payload offload operations have completed
// At this stage we also need to check that any (async) payload offload operations related
// to the current transaction have completed
for (PayloadPersistenceResponse ppr: this.payloadPersistenceResponses) {
try {
log.fine(() -> "Getting storePayload() async result for: " + ppr.toString());
Expand Down Expand Up @@ -2926,9 +2967,9 @@ public ResourceEraseRecord erase(EraseDTO eraseDto) throws FHIRPersistenceExcept
*/
private void erasePayloads(EraseResourceDAO dao, long erasedResourceGroupId) throws FHIRPersistenceException {
List<ErasedResourceRec> recs = dao.getErasedResourceRecords(erasedResourceGroupId);
for (ErasedResourceRec rec: recs) {
erasePayload(rec);
}

// Stash this list so that we can do the erase after the transaction commits
eraseResourceRecs.addAll(recs);

// If the above loop completed without throwing an exception, we can safely
// remove all the records in the group. If an exception was thrown (because
Expand Down Expand Up @@ -3000,8 +3041,9 @@ public PayloadPersistenceResponse storePayload(Resource resource, String logical
// Delegate the serialization and any compression to the FHIRPayloadPersistence implementation
PayloadPersistenceResponse response = payloadPersistence.storePayload(resourceTypeName, resourceTypeId, logicalId, newVersionNumber, resourcePayloadKey, resource);

// register the response object so that we can clean up in case of a rollback later
this.payloadPersistenceResponses.add(response);
// We don't record the response in the payloadPersistenceResponses list yet because
// for batch bundles, there are multiple transactions and the list should contain
// only those responses relevant to the current transaction
return response;
} else {
// Offloading not supported by the plain JDBC persistence implementation, so return null
Expand Down

0 comments on commit b000c79

Please sign in to comment.