diff --git a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/patient/PatientExportPartitionMapper.java b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/patient/PatientExportPartitionMapper.java index 7dd241e918c..7b82fdf013b 100644 --- a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/patient/PatientExportPartitionMapper.java +++ b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/patient/PatientExportPartitionMapper.java @@ -6,6 +6,7 @@ package com.ibm.fhir.bulkdata.jbatch.export.patient; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -20,11 +21,17 @@ import javax.enterprise.context.Dependent; import javax.inject.Inject; +import com.ibm.fhir.bulkdata.export.system.resource.SystemExportResourceHandler; import com.ibm.fhir.bulkdata.jbatch.context.BatchContextAdapter; import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter; import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory; import com.ibm.fhir.operation.bulkdata.model.type.BulkDataContext; import com.ibm.fhir.operation.bulkdata.model.type.OperationFields; +import com.ibm.fhir.persistence.FHIRPersistence; +import com.ibm.fhir.persistence.HistorySortOrder; +import com.ibm.fhir.persistence.ResourceChangeLogRecord; +import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper; +import com.ibm.fhir.persistence.helper.FHIRTransactionHelper; import com.ibm.fhir.search.compartment.CompartmentHelper; @Dependent @@ -61,13 +68,36 @@ public PartitionPlan mapPartitions() throws Exception { ConfigurationAdapter adapter = ConfigurationFactory.getInstance(); adapter.registerRequestContext(ctx.getTenantId(), ctx.getDatastoreId(), ctx.getIncomingUrl()); + // Note we're already running inside a transaction (started by the JavaBatch framework) + // so this txn will just wrap it...the commit won't happen until the checkpoint + SystemExportResourceHandler handler = new SystemExportResourceHandler(); + FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper(handler.getSearchHelper()); + FHIRPersistence fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation(); + FHIRTransactionHelper txn = new FHIRTransactionHelper(fhirPersistence.getTransaction()); + txn.begin(); + + // Check resourceType needs to be processed + List target = new ArrayList<>(); + try { + for (String resourceType : resourceTypes) { + List resourceResults = fhirPersistence.changes(1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE); + + // Early Exit Logic + if (!resourceResults.isEmpty()) { + target.add(resourceType); + } + } + } finally { + txn.end(); + } + PartitionPlanImpl pp = new PartitionPlanImpl(); - pp.setPartitions(resourceTypes.size()); - pp.setThreads(Math.min(adapter.getCoreMaxPartitions(), resourceTypes.size())); - Properties[] partitionProps = new Properties[resourceTypes.size()]; + pp.setPartitions(target.size()); + pp.setThreads(Math.min(adapter.getCoreMaxPartitions(), target.size())); + Properties[] partitionProps = new Properties[target.size()]; int propCount = 0; - for (String resourceType : resourceTypes) { + for (String resourceType : target) { Properties p = new Properties(); p.setProperty(OperationFields.PARTITION_RESOURCETYPE, resourceType); partitionProps[propCount++] = p; diff --git a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/system/SystemExportPartitionMapper.java b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/system/SystemExportPartitionMapper.java index 0341de17d78..817f49feae4 100644 --- a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/system/SystemExportPartitionMapper.java +++ b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/export/system/SystemExportPartitionMapper.java @@ -6,6 +6,7 @@ package com.ibm.fhir.bulkdata.jbatch.export.system; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -20,10 +21,17 @@ import javax.enterprise.context.Dependent; import javax.inject.Inject; +import com.ibm.fhir.bulkdata.export.system.resource.SystemExportResourceHandler; import com.ibm.fhir.bulkdata.jbatch.context.BatchContextAdapter; +import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter; import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory; import com.ibm.fhir.operation.bulkdata.model.type.BulkDataContext; import com.ibm.fhir.operation.bulkdata.model.type.OperationFields; +import com.ibm.fhir.persistence.FHIRPersistence; +import com.ibm.fhir.persistence.HistorySortOrder; +import com.ibm.fhir.persistence.ResourceChangeLogRecord; +import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper; +import com.ibm.fhir.persistence.helper.FHIRTransactionHelper; /** * Generates the {@link PartitionPlan} describing how the system export work is @@ -51,16 +59,43 @@ public PartitionPlan mapPartitions() throws Exception { BulkDataContext ctx = ctxAdapter.getStepContextForExportPartitionMapper(); + // Register the context to get the right configuration. + ConfigurationAdapter adapter = ConfigurationFactory.getInstance(); + adapter.registerRequestContext(ctx.getTenantId(), ctx.getDatastoreId(), ctx.getIncomingUrl()); + // We know these are real resource types. List resourceTypes = Arrays.asList(ctx.getFhirResourceTypes().split("\\s*,\\s*")); + // Note we're already running inside a transaction (started by the JavaBatch framework) + // so this txn will just wrap it...the commit won't happen until the checkpoint + SystemExportResourceHandler handler = new SystemExportResourceHandler(); + FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper(handler.getSearchHelper()); + FHIRPersistence fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation(); + FHIRTransactionHelper txn = new FHIRTransactionHelper(fhirPersistence.getTransaction()); + txn.begin(); + + // Check resourceType needs to be processed + List target = new ArrayList<>(); + try { + for (String resourceType : resourceTypes) { + List resourceResults = fhirPersistence.changes(1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE); + + // Early Exit Logic + if (!resourceResults.isEmpty()) { + target.add(resourceType); + } + } + } finally { + txn.end(); + } + PartitionPlanImpl pp = new PartitionPlanImpl(); - pp.setPartitions(resourceTypes.size()); - pp.setThreads(Math.min(ConfigurationFactory.getInstance().getCoreMaxPartitions(), resourceTypes.size())); - Properties[] partitionProps = new Properties[resourceTypes.size()]; + pp.setPartitions(target.size()); + pp.setThreads(Math.min(ConfigurationFactory.getInstance().getCoreMaxPartitions(), target.size())); + Properties[] partitionProps = new Properties[target.size()]; int propCount = 0; - for (String resourceType : resourceTypes) { + for (String resourceType : target) { Properties p = new Properties(); p.setProperty(OperationFields.PARTITION_RESOURCETYPE, resourceType); partitionProps[propCount++] = p; diff --git a/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config-db2.json b/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config-db2.json index 2eec22c382b..076d102fd9e 100644 --- a/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config-db2.json +++ b/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config-db2.json @@ -142,7 +142,7 @@ }, "pageSize": 100, "batchIdEncryptionKey": "change-password", - "maxPartitions": 3, + "maxPartitions": 5, "maxInputs": 5 }, "storageProviders": { diff --git a/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config.json b/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config.json index 9dfc5b6648c..d247d7a2748 100644 --- a/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config.json +++ b/fhir-server-webapp/src/main/liberty/config/config/default/fhir-server-config.json @@ -156,7 +156,7 @@ }, "pageSize": 100, "batchIdEncryptionKey": "change-password", - "maxPartitions": 3, + "maxPartitions": 5, "maxInputs": 5, "maxChunkReadTime": "90000", "systemExportImpl": "fast",