Skip to content

Commit

Permalink
Merge pull request #3142 from IBM/issue-2900-ra
Browse files Browse the repository at this point in the history
Issue #2900 Cassandra implementation of payload offload including rollback support
  • Loading branch information
lmsurpre authored Jan 11, 2022
2 parents e8b6217 + d82d693 commit 65ad63c
Show file tree
Hide file tree
Showing 125 changed files with 4,712 additions and 1,021 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ The IBM FHIR Server is modular and extensible. The following tables provide an o
|fhir-audit|Audit-related interfaces and implementations including 1) a No-op AuditLogService and 2) an AuditLogService that writes audit events to Apache Kafka in the Cloud Auditing Data Federation (CADF) JSON format|false|
|fhir-search|Utilities for working with the FHIR search specification|false|
|fhir-persistence|Interfaces, helpers, and tests for implementing a persistence layer or persistence interceptors for the IBM FHIR Server|false|
|fhir-persistence-schema|Classes for deploying and updating the IBM FHIR Server relational database schema|false|
|fhir-persistence-jdbc|A relational FHIRPersistence implementation that uses JDBC to store and query FHIR resources|false|
|fhir-persistence-scout|A scale out persistence layer to store and query FHIR resources *experimental* |false|
|fhir-persistence-cos|Decorates the fhir-persistence-jdbc module with the ability to offload payload storage to IBM Cloud Object Storage *experimental* |false|
|fhir-persistence-cassandra|Decorates the fhir-persistence-jdbc module with the ability to offload payload storage to Cassandra *experimental* |false|
|fhir-persistence-blob|Decorates the fhir-persistence-jdbc module with the ability to offload payload storage to Azure Blob *experimental* |false|
|fhir-provider|JAX-RS Providers for FHIR XML and JSON and related patch formats|false|
|fhir-server|JAX-RS resources and related classes for implementing the FHIR REST API and extended operations|false|
|fhir-server-webapp|A web application that packages the fhir-server with a set of built-in extended operations|false|
Expand Down Expand Up @@ -135,6 +135,8 @@ The IBM FHIR Server is modular and extensible. The following tables provide an o
|fhir-install|Packaging and installation scripts for creating the fhir-distribution zip and the corresponding IBM FHIR Server Docker image|false|
|fhir-benchmark|Java Microbenchmark Harness (JMH) tests for measuring read/write/validation performance for the IBM FHIR Server and the HL7 FHIR Java Reference Implementation|false|
|fhir-bucket|Scans cloud object storage buckets and uploads data using the FHIR REST API|false|
|fhir-persistence-schema|Classes for deploying and updating the IBM FHIR Server relational database schema|false|
|fhir-persistence-cassandra-app|CLI utility application supporting payload storage to Cassandra *experimental* |false|

### Contributing to the IBM FHIR Server
The IBM FHIR Server is under active development. To help develop the server, clone or download the project and build it using Maven.
Expand Down
1 change: 1 addition & 0 deletions build/security/asoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mkdir -p ${WORKSPACE}/build/security/logs/output/
find ${WORKSPACE} -iname 'fhir-*.jar' -not -iname 'fhir*-tests.jar' -not -iname 'fhir*-test-*.jar' -not -iname 'fhir-persistence-schema-*-cli.jar' -not -iname 'fhir-swagger-generator-*-cli.jar' \
-not -iname 'fhir-examples-*.jar' -not -name 'fhir-bulkdata-webapp-*-client.jar' -not -iname 'fhir*-ig-*.jar' -not -iname 'fhir-bucket-*-cli.jar' -not -path '*/target/fhir-server-webapp-*' \
-not -iname 'fhir-operation-cqf-*-shaded.jar' -not -iname 'fhir-operation-cpg-*-shaded.jar' -not -iname 'fhir-term-graph-loader-*-cli.jar' \
-not -path '*/target/fhir-persistence-cassandra-app*' \
-not -path '*/target/fhir-bulkdata*' -exec cp -f {} ${WORKSPACE}/build/security/logs/tmp \;

cd ${WORKSPACE}/build/security/logs/
Expand Down
71 changes: 71 additions & 0 deletions fhir-bucket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ java -jar "${JAR}" \

This tracking database can be shared with the instance used by FHIR, but for proper performance testing it should be on a separate host. The standard schema for the tables is FHIRBUCKET.

Schema creation does not have to be performed separately. If you wish to create the schema as part of the main program run, specify

```
--bootstrap-schema
```

on the command line. The schema creation and update process is idempotent. Changes are only applied when required, and concurrency is managed correctly to ensure only one instance makes changes if multiple instances of fhir-bucket are run simultaneously.

The preferred approach is to use the new `--bootstrap-schema` option when running the main workload, in which case the `--create-schema` activity isn't required.


Expand All @@ -247,6 +255,7 @@ java -jar "${JAR}" \
--db-properties db2.properties \
--cos-properties cos.properties \
--fhir-properties fhir.properties \
--bootstrap-schema \
--bucket example-bucket \
--tenant-name example-tenant \
--file-type NDJSON \
Expand Down Expand Up @@ -275,11 +284,73 @@ To run using PostgreSQL, change the relevant arguments to:
...
```

The `--immediate-local` option can be used to load files from a local file-system without the need for a FHIRBUCKET database or connection to COS:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--max-concurrent-fhir-requests 0 \
--max-concurrent-json-files 0 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 10 \
--immediate-local \
--scan-local-dir /path/to/synthea/data
```

Because --immediate-local does not use a FHIRBUCKET database, there is no tracking of the logical ids generated by the IBM FHIR Server. This means it is not possible to run the interop workload against this data.

To track the logical ids, you can provide a FHIRBUCKET database configuration along with the --scan-local-dir argument, but do not specify --immediate-local:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--max-concurrent-fhir-requests 0 \
--max-concurrent-json-files 0 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 10 \
--scan-local-dir /path/to/synthea/data
```

Once the directory scanning is complete, the program can be terminated. To load the files now registered in the FHIRBUCKET database, run the following:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--no-scan \
--max-concurrent-fhir-requests 40 \
--max-concurrent-json-files 10 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 40 \
--scan-local-dir /path/to/synthea/data
```

Note that the --scan-local-dir [path-name] option must still be provided.


| Property Name | Description |
| -------------------- | -----|
| `--bootstrap-schema` | Creates/updates the schema as an initial step before starting the main workload. Simplifies cloud deployment scenarios by avoiding the need for a separate job. Ensures only one instance will try to update the schema at a time. Do not specify `--create-schema` when using this option. |
| `--db-type type` | where `type` is one of: db2, derby, postgresql. Specifies the type of database to use for the FHIRBUCKET tracking data. |
| `--create-schema` | Creates a new or updates an existing database schema. The program will exit after the schema operations have completed.|
| `--bootstrap-schema` | Creates a new or updates an existing database schema. The program will not exit after the schema operations have completed.|
| `--schema-name` | The custom schema used for FHIRBUCKET tracking data. The default is `FHIRBUCKET`.|
| `--tenant-name fhir-tenant-name` | The IBM FHIR Server tenant name|
| `--cos-properties properties-file` | Connection properties file for COS |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;
import com.ibm.fhir.operation.bulkdata.model.type.StorageType;
import com.ibm.fhir.persistence.FHIRPersistence;
import com.ibm.fhir.persistence.FHIRPersistenceSupport;
import com.ibm.fhir.persistence.InteractionStatus;
import com.ibm.fhir.persistence.SingleResourceResult;
import com.ibm.fhir.persistence.context.FHIRPersistenceContext;
import com.ibm.fhir.persistence.context.FHIRPersistenceContextFactory;
import com.ibm.fhir.persistence.context.FHIRPersistenceEvent;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.persistence.payload.PayloadPersistenceHelper;
import com.ibm.fhir.persistence.util.FHIRPersistenceUtil;
import com.ibm.fhir.validation.exception.FHIRValidationException;

Expand Down Expand Up @@ -305,7 +305,7 @@ public OperationOutcome conditionalFingerprintUpdate(ImportTransientUserData chu
SingleResourceResult<? extends Resource> oldResourceResult = persistence.read(context, resource.getClass(), logicalId);
Resource oldResource = oldResourceResult.getResource();

final com.ibm.fhir.model.type.Instant lastUpdated = PayloadPersistenceHelper.getCurrentInstant();
final com.ibm.fhir.model.type.Instant lastUpdated = FHIRPersistenceSupport.getCurrentInstant();
final int newVersionNumber = oldResource != null && oldResource.getMeta() != null && oldResource.getMeta().getVersionId() != null
? Integer.parseInt(oldResource.getMeta().getVersionId().getValue()) + 1 : 1;
resource = FHIRPersistenceUtil.copyAndSetResourceMetaFields(resource, logicalId, newVersionNumber, lastUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ private void initProperties(Properties props) throws Exception {

setHostnameVerificationEnabled(Boolean.parseBoolean(getProperty(PROPNAME_HOSTNAME_VERIFICATION_ENABLED, "true")));

setHttpTimeout(Integer.parseUnsignedInt(getProperty(PROPNAME_HTTP_TIMEOUT, "60000")));
// Use a default that's longer than the default Liberty transaction timeout
setHttpTimeout(Integer.parseUnsignedInt(getProperty(PROPNAME_HTTP_TIMEOUT, "130000")));

setTenantId(getProperty(PROPNAME_TENANT_ID, null));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2019, 2020
* (C) Copyright IBM Corp. 2019, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -53,4 +53,13 @@ public void setAutoCreate(boolean create) {
public boolean isAutoCreate() {
return "Y".equals(this.properties.getProperty(CREATE_KEY));
}

@Override
public String getDefaultSchema() {
String result = super.getDefaultSchema();
if (result == null) {
result = "APP";
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.database.utils.pool;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import com.ibm.fhir.database.utils.api.IConnectionProvider;
import com.ibm.fhir.database.utils.api.IDatabaseAdapter;
import com.ibm.fhir.database.utils.api.IDatabaseTranslator;
import com.ibm.fhir.database.utils.api.ITransaction;
import com.ibm.fhir.database.utils.api.ITransactionProvider;
import com.ibm.fhir.database.utils.common.JdbcConnectionProvider;
import com.ibm.fhir.database.utils.db2.Db2Adapter;
import com.ibm.fhir.database.utils.db2.Db2PropertyAdapter;
import com.ibm.fhir.database.utils.db2.Db2Translator;
import com.ibm.fhir.database.utils.derby.DerbyAdapter;
import com.ibm.fhir.database.utils.derby.DerbyPropertyAdapter;
import com.ibm.fhir.database.utils.derby.DerbyTranslator;
import com.ibm.fhir.database.utils.model.DbType;
import com.ibm.fhir.database.utils.postgres.PostgresAdapter;
import com.ibm.fhir.database.utils.postgres.PostgresPropertyAdapter;
import com.ibm.fhir.database.utils.postgres.PostgresTranslator;
import com.ibm.fhir.database.utils.transaction.SimpleTransactionProvider;

/**
* Support class for managing connections to a database for utility apps
*/
public class DatabaseSupport implements IConnectionProvider, ITransactionProvider {
private static final int DEFAULT_CONNECTION_POOL_SIZE = 10;
private final Properties dbProperties;
private final DbType dbType;

// The translator for the configured database type
private IDatabaseTranslator translator;

// The adapter configured for the type of database we're using
private IDatabaseAdapter adapter;

// Connection pool used to work alongside the transaction provider
private PoolConnectionProvider connectionPool;

// Simple transaction service for use outside of JEE
private ITransactionProvider transactionProvider;

private int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE;

/**
* Public constructor
* @param dbProperties
* @param dbType
*/
public DatabaseSupport(Properties dbProperties, DbType dbType) {
this.dbProperties = dbProperties;
this.dbType = dbType;
}

/**
* Build the database configuration from the configured properties
*/
public void init() {
switch (this.dbType) {
case DB2:
configureForDb2();
break;
case DERBY:
configureForDerby();
break;
case POSTGRESQL:
configureForPostgresql();
break;
default:
throw new IllegalStateException("Unsupported database type: " + this.dbType);
}
}

/**
* Set up the connection pool and transaction provider for connecting to a Derby
* database
*/
private void configureForDerby() {
DerbyPropertyAdapter propertyAdapter = new DerbyPropertyAdapter(dbProperties);

this.translator = new DerbyTranslator();
IConnectionProvider cp = new JdbcConnectionProvider(this.translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.connectionPool.setCloseOnAnyError();
this.adapter = new DerbyAdapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Set up the connection pool and transaction provider for connecting to a DB2
* database
*/
private void configureForDb2() {

this.translator = new Db2Translator();
try {
Class.forName(translator.getDriverClassName());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}

Db2PropertyAdapter propertyAdapter = new Db2PropertyAdapter(dbProperties);
IConnectionProvider cp = new JdbcConnectionProvider(translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.adapter = new Db2Adapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Set up the connection pool and transaction provider for connecting to a DB2
* database
*/
private void configureForPostgresql() {
this.translator = new PostgresTranslator();
try {
Class.forName(translator.getDriverClassName());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}

PostgresPropertyAdapter propertyAdapter = new PostgresPropertyAdapter(dbProperties);
IConnectionProvider cp = new JdbcConnectionProvider(translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.adapter = new PostgresAdapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Get the configured database adapter
* @return
*/
public IDatabaseAdapter getDatabaseAdapter() {
if (this.adapter == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.adapter;
}

/**
* Get the IDatabaseTranslator for the configured database type
* @return
*/
public IDatabaseTranslator getTranslator() {
if (this.translator == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.translator;
}

@Override
public Connection getConnection() throws SQLException {
if (this.connectionPool == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}

return this.connectionPool.getConnection();
}

@Override
public ITransaction getTransaction() {
if (this.transactionProvider == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.transactionProvider.getTransaction();
}

@Override
public void commitTransaction() throws SQLException {
connectionPool.commitTransaction();
}

@Override
public void rollbackTransaction() throws SQLException {
connectionPool.rollbackTransaction();
}

@Override
public void describe(String prefix, StringBuilder cfg, String key) {
connectionPool.describe(prefix, cfg, key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ public void addColumn(String source, String name) {
public void addColumn(String source, String name, Alias alias) {
selectList.addColumn(source, name, alias);
}

/**
* Add a value column to the select list, for example:
* addColumn("5", alias("RESOURCE_TYPE_ID")) can be used for:
* SELECT foo, 5 AS RESOURCE_TYPE_ID
* FROM ...
* @param columnValue
* @param alias
*/
public void addColumn(String columnValue, Alias alias) {
selectList.addColumn(columnValue, alias);
}

/**
* Add a table item to the from-clause
Expand Down
Loading

0 comments on commit 65ad63c

Please sign in to comment.