diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java index 7e19cbe873e..ed242e55fd6 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionAdapter.java @@ -6,6 +6,7 @@ package com.ibm.fhir.persistence.jdbc.connection; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,8 +48,8 @@ public class FHIRUserTransactionAdapter implements FHIRPersistenceTransaction { // support nesting by tracking the number of begin/end requests private int startCount; - // A handler to be called after a transaction has been rolled back - private final Runnable rolledBackHandler; + // A handler to be called after a transaction has completed + private final Consumer afterTransactionHandler; /** * Public constructor @@ -56,16 +57,16 @@ public class FHIRUserTransactionAdapter implements FHIRPersistenceTransaction { * @param syncRegistry * @param cache * @param transactionDataKey - * @param rolledBackHandler + * @param afterTransactionHandler */ public FHIRUserTransactionAdapter(UserTransaction tx, TransactionSynchronizationRegistry syncRegistry, FHIRPersistenceJDBCCache cache, - String transactionDataKey, Runnable rolledBackHandler) { + String transactionDataKey, Consumer afterTransactionHandler) { this.userTransaction = tx; this.syncRegistry = syncRegistry; this.cache = cache; this.transactionDataKey = transactionDataKey; startedByThis = false; - this.rolledBackHandler = rolledBackHandler; + this.afterTransactionHandler = afterTransactionHandler; } /** @@ -91,7 +92,7 @@ public void begin() throws FHIRPersistenceException { // On starting a new transaction, we need to register a callback so that // the cache is informed when the transaction commits it can promote thread-local // ids to the shared caches. - syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler)); + syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler)); } catch (Exception x) { log.log(Level.SEVERE, "failed to start transaction", x); @@ -105,7 +106,7 @@ public void begin() throws FHIRPersistenceException { // On starting a bulk transaction, we need to register a callback so that // the cache is informed when the transaction commits it can promote thread-local // ids to the shared caches. - syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.rolledBackHandler)); + syncRegistry.registerInterposedSynchronization(new CacheTransactionSync(this.syncRegistry, this.cache, this.transactionDataKey, this.afterTransactionHandler)); // transaction is already active, so this is a nested request this.startCount++; diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionFactory.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionFactory.java index 1d8ee594537..84f000b985f 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionFactory.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/connection/FHIRUserTransactionFactory.java @@ -1,11 +1,13 @@ /* - * (C) Copyright IBM Corp. 2020, 2021 + * (C) Copyright IBM Corp. 2020, 2022 * * SPDX-License-Identifier: Apache-2.0 */ package com.ibm.fhir.persistence.jdbc.connection; +import java.util.function.Consumer; + import javax.transaction.TransactionSynchronizationRegistry; import javax.transaction.UserTransaction; @@ -26,7 +28,7 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory { private final String transactionDataKey; - private final Runnable rolledBackHandler; + private final Consumer afterTransactionHandler; /** * Public constructor @@ -35,19 +37,19 @@ public class FHIRUserTransactionFactory implements FHIRTransactionFactory { * @param syncReg * @param cache * @param transactionDataKey - * @param rolledBackHandler + * @param afterTransactionHandler */ public FHIRUserTransactionFactory(UserTransaction tx, TransactionSynchronizationRegistry syncReg, FHIRPersistenceJDBCCache cache, String transactionDataKey, - Runnable rolledBackHandler) { + Consumer afterTransactionHandler) { this.userTransaction = tx; this.syncRegistry = syncReg; this.cache = cache; this.transactionDataKey = transactionDataKey; - this.rolledBackHandler = rolledBackHandler; + this.afterTransactionHandler = afterTransactionHandler; } @Override public FHIRPersistenceTransaction create() { - return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, rolledBackHandler); + return new FHIRUserTransactionAdapter(userTransaction, syncRegistry, cache, transactionDataKey, afterTransactionHandler); } } diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/CacheTransactionSync.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/CacheTransactionSync.java index cc00ef81b7e..aec38f113bf 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/CacheTransactionSync.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/CacheTransactionSync.java @@ -1,11 +1,12 @@ /* - * (C) Copyright IBM Corp. 2020 + * (C) Copyright IBM Corp. 2020, 2022 * * SPDX-License-Identifier: Apache-2.0 */ package com.ibm.fhir.persistence.jdbc.impl; +import java.util.function.Consumer; import java.util.logging.Logger; import javax.transaction.Status; @@ -33,8 +34,8 @@ public class CacheTransactionSync implements Synchronization { private final String transactionDataKey; - // A callback when we hit a rollback - private final Runnable rolledBackHandler; + // Called after the transaction completes (true == committed; false == rolled back) + private final Consumer afterTransactionHandler; /** * Public constructor @@ -42,14 +43,14 @@ public class CacheTransactionSync implements Synchronization { * @param txSyncRegistry * @param cache * @param transactionDataKey - * @param rolledBackHandler + * @param afterTransactionHandler */ public CacheTransactionSync(TransactionSynchronizationRegistry txSyncRegistry, FHIRPersistenceJDBCCache cache, String transactionDataKey, - Runnable rolledBackHandler) { + Consumer afterTransactionHandler) { this.txSyncRegistry = txSyncRegistry; this.cache = cache; this.transactionDataKey = transactionDataKey; - this.rolledBackHandler = rolledBackHandler; + this.afterTransactionHandler = afterTransactionHandler; } @Override @@ -71,13 +72,16 @@ public void beforeCompletion() { public void afterCompletion(int status) { if (status == Status.STATUS_COMMITTED) { cache.transactionCommitted(); + if (afterTransactionHandler != null) { + afterTransactionHandler.accept(Boolean.TRUE); + } } else { // probably a rollback, so throw away everything logger.info("Transaction failed - afterCompletion(status = " + status + ")"); cache.transactionRolledBack(); - if (rolledBackHandler != null) { - rolledBackHandler.run(); + if (afterTransactionHandler != null) { + afterTransactionHandler.accept(Boolean.FALSE); } } } diff --git a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java index e7943357e97..e4c5d608eba 100644 --- a/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java +++ b/fhir-persistence-jdbc/src/main/java/com/ibm/fhir/persistence/jdbc/impl/FHIRPersistenceJDBCImpl.java @@ -232,6 +232,9 @@ public class FHIRPersistenceJDBCImpl implements FHIRPersistence, SchemaNameSuppl // A list of payload persistence responses in case we have a rollback to clean up private final List payloadPersistenceResponses = new ArrayList<>(); + // A list of EraseResourceRec referencing offload resource records to erase if the current transaction commits + private final List eraseResourceRecs = new ArrayList<>(); + /** * Constructor for use when running as web application in WLP. * @throws Exception @@ -267,12 +270,13 @@ public FHIRPersistenceJDBCImpl(FHIRPersistenceJDBCCache cache, FHIRPayloadPersis boolean enableReadOnlyReplicas = fhirConfig.getBooleanProperty(FHIRConfiguration.PROPERTY_JDBC_ENABLE_READ_ONLY_REPLICAS, Boolean.FALSE); this.connectionStrategy = new FHIRDbTenantDatasourceConnectionStrategy(trxSynchRegistry, buildActionChain(), enableReadOnlyReplicas); - this.transactionAdapter = new FHIRUserTransactionAdapter(userTransaction, trxSynchRegistry, cache, TXN_DATA_KEY, () -> handleRollback()); + this.transactionAdapter = new FHIRUserTransactionAdapter(userTransaction, trxSynchRegistry, cache, TXN_DATA_KEY, (committed) -> transactionCompleted(committed)); // Use of legacy whole-system search parameters disabled by default this.legacyWholeSystemSearchParamsEnabled = fhirConfig.getBooleanProperty(PROPERTY_SEARCH_ENABLE_LEGACY_WHOLE_SYSTEM_SEARCH_PARAMS, false); + log.exiting(CLASSNAME, METHODNAME); } @@ -369,6 +373,11 @@ public SingleResourceResult create(FHIRPersistenceContex try (Connection connection = openConnection()) { doCachePrefill(connection); + if (context.getOffloadResponse() != null) { + // Remember this payload offload response as part of the current transaction + this.payloadPersistenceResponses.add(context.getOffloadResponse()); + } + // This create() operation is only called by a REST create. If the given resource // contains an id, then for R4 we need to ignore it and replace it with our // system-generated value. For the update-or-create scenario, see update(). @@ -560,6 +569,12 @@ public SingleResourceResult update(FHIRPersistenceContex try (Connection connection = openConnection()) { doCachePrefill(connection); + + if (context.getOffloadResponse() != null) { + // Remember this payload offload response as part of the current transaction + this.payloadPersistenceResponses.add(context.getOffloadResponse()); + } + ResourceDAO resourceDao = makeResourceDAO(connection); ParameterDAO parameterDao = makeParameterDAO(connection); @@ -1027,6 +1042,12 @@ public void delete(FHIRPersistenceContext context, Class try (Connection connection = openConnection()) { doCachePrefill(connection); + + if (context.getOffloadResponse() != null) { + // Remember this payload offload response as part of the current transaction + this.payloadPersistenceResponses.add(context.getOffloadResponse()); + } + ResourceDAO resourceDao = makeResourceDAO(connection); // Create a new Resource DTO instance to represent the deletion marker. @@ -1518,15 +1539,14 @@ private void fetchPayloadsForDTOList(List> re resourceTypeId = getResourceTypeId(resourceType); } - // If a specific version of a resource has been deleted using $erase, it - // is possible for the result here to be null. + // If the resource has been deleted, no payload has been stored so there's no + // need to try and fetch it CompletableFuture> cf; if (!resourceDTO.isDeleted()) { // Trigger the read and stash the async response cf = payloadPersistence.readResourceAsync(resourceType, rowResourceTypeName, resourceTypeId, resourceDTO.getLogicalId(), resourceDTO.getVersionId(), resourceDTO.getResourcePayloadKey(), resourceDTO.getLastUpdated().toInstant(), elements); } else { - // Payload never stored for deletion markers, so there's no resource to read. Just - // knock up a new ResourceResult to represent the deleted resource in the result list + // Knock up a new ResourceResult to represent the deleted resource in the result list ResourceResult.Builder builder = new ResourceResult.Builder<>(); builder.logicalId(resourceDTO.getLogicalId()); builder.resourceTypeName(rowResourceTypeName); @@ -2724,31 +2744,51 @@ private ParameterTransactionDataImpl getTransactionDataForDatasource(String data } /** - * Callback from TransactionData when a transaction has been rolled back - * @param payloadPersistenceResponses an immutable list of {@link PayloadPersistenceResponse} + * Callback from TransactionData when a transaction has ended (after commit or rollback) + * @param committed true if the transaction completed, or false if it rolled back */ - private void handleRollback() { - if (payloadPersistenceResponses.size() > 0 && payloadPersistence == null) { - throw new IllegalStateException("handleRollback called but payloadPersistence is not configured"); - } - // try to delete each of the payload objects we've stored - // because the transaction has been rolled back - log.fine("starting rollback handling for PayloadPersistenceResponse data"); - for (PayloadPersistenceResponse ppr: payloadPersistenceResponses) { - try { - log.fine(() -> "tx rollback - deleting payload: " + ppr.toString()); - payloadPersistence.deletePayload(ppr.getResourceTypeName(), ppr.getResourceTypeId(), - ppr.getLogicalId(), ppr.getVersionId(), ppr.getResourcePayloadKey()); - } catch (Exception x) { - // Nothing more we can do other than log the issue. Any rows we can't process - // here (e.g. network outage) will be orphaned. These orphaned rows - // will be removed by the reconciliation process which scans the payload - // persistence repository and looks for missing RDBMS records. - log.log(Level.SEVERE, "rollback failed to delete payload: " + ppr.toString(), x); + private void transactionCompleted(Boolean committed) { + if (committed) { + // See if we have any erase resources to clean up + for (ErasedResourceRec err: this.eraseResourceRecs) { + try { + erasePayload(err); + } catch (Exception x) { + // The transaction has already committed, so we don't want to fail + // the request. This is a server-side issue now so all we can do is + // log. + log.log(Level.SEVERE, "failed to erase offload payload for '" + + err.toString() + + "'. Run reconciliation to ensure this record is removed.", x); + } + } + } else { + // Try to delete each of the payload objects we've stored in this + // transaction because the transaction has been rolled back + if (payloadPersistenceResponses.size() > 0 && payloadPersistence == null) { + throw new IllegalStateException("handleRollback called but payloadPersistence is not configured"); + } + + log.fine("starting rollback handling for PayloadPersistenceResponse data"); + for (PayloadPersistenceResponse ppr: payloadPersistenceResponses) { + try { + log.fine(() -> "tx rollback - deleting payload: " + ppr.toString()); + payloadPersistence.deletePayload(ppr.getResourceTypeName(), ppr.getResourceTypeId(), + ppr.getLogicalId(), ppr.getVersionId(), ppr.getResourcePayloadKey()); + } catch (Exception x) { + // Nothing more we can do other than log the issue. Any rows we can't process + // here (e.g. network outage) will be orphaned. These orphaned rows + // will be removed by the reconciliation process which scans the payload + // persistence repository and looks for missing RDBMS records. + log.log(Level.SEVERE, "rollback failed to delete payload: " + ppr.toString(), x); + } } } + // important to clear this list after each transaction because batch bundles + // use the same FHIRPersistenceJDBCImpl instance for each entry payloadPersistenceResponses.clear(); + eraseResourceRecs.clear(); } /** @@ -2786,7 +2826,8 @@ public void onCommit(Collection records, Collection "Getting storePayload() async result for: " + ppr.toString()); @@ -2926,9 +2967,9 @@ public ResourceEraseRecord erase(EraseDTO eraseDto) throws FHIRPersistenceExcept */ private void erasePayloads(EraseResourceDAO dao, long erasedResourceGroupId) throws FHIRPersistenceException { List recs = dao.getErasedResourceRecords(erasedResourceGroupId); - for (ErasedResourceRec rec: recs) { - erasePayload(rec); - } + + // Stash this list so that we can do the erase after the transaction commits + eraseResourceRecs.addAll(recs); // If the above loop completed without throwing an exception, we can safely // remove all the records in the group. If an exception was thrown (because @@ -3000,8 +3041,9 @@ public PayloadPersistenceResponse storePayload(Resource resource, String logical // Delegate the serialization and any compression to the FHIRPayloadPersistence implementation PayloadPersistenceResponse response = payloadPersistence.storePayload(resourceTypeName, resourceTypeId, logicalId, newVersionNumber, resourcePayloadKey, resource); - // register the response object so that we can clean up in case of a rollback later - this.payloadPersistenceResponses.add(response); + // We don't record the response in the payloadPersistenceResponses list yet because + // for batch bundles, there are multiple transactions and the list should contain + // only those responses relevant to the current transaction return response; } else { // Offloading not supported by the plain JDBC persistence implementation, so return null