Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #2900 Cassandra implementation of payload offload including rollback support #3142

Merged
merged 26 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3c0e507
issue 2900 payload offloading with cassandra
punktilious Nov 29, 2021
ea90f26
issue 2900 hash based partitioning strategy for Cassandra
punktilious Nov 30, 2021
838bba9
issue 2900 cassandra payload mvp
punktilious Dec 1, 2021
6093afc
Merge remote-tracking branch 'origin/HEAD' into issue-2900-ra
punktilious Dec 1, 2021
971ce72
issue 2900 new persistence deleteWithMeta to support offloading
punktilious Dec 6, 2021
f4aa267
issue 2900 payload offload server integration test support
punktilious Dec 10, 2021
dc688fd
issue 2900 payload offload erase operation support
punktilious Dec 11, 2021
575e7b6
issue #2900 tie RDBMS and offload records with resourcePayloadKey UUID
punktilious Dec 14, 2021
9753dc2
issue #2900 read RESOURCE_PAYLOAD_KEY from RDBMS to support offload f…
punktilious Dec 14, 2021
bbbf677
issue #2900 fixed vread query and erase using resource_payload_key
punktilious Dec 15, 2021
f769df0
issue #2900 delete offloaded payloads when tx rolls back
punktilious Dec 16, 2021
5d42c3b
issue #2900 merged with main
punktilious Dec 16, 2021
18162f4
issue #3136 initial sketch of payload persistence with Azure blob
punktilious Dec 21, 2021
76713c2
issue #2900 payload offload merge with latest main
punktilious Dec 23, 2021
1e9c404
issue #2900 payload offload allow uncompressed storage
punktilious Dec 23, 2021
b719eba
issue #2900 fixed InputOutputByteStream resize bug in write(int b)
punktilious Dec 23, 2021
314d954
issue #2900 fixed parameter markers
punktilious Dec 23, 2021
74bd715
issue #2900 support erase operation when payload offload is configured
punktilious Dec 24, 2021
a748731
issue #2900 fixed insert into erased_resources for Db2 mt_id
punktilious Dec 24, 2021
d8adefd
issue #2900 fixed insert into erased_resources for Db2 mt_id
punktilious Dec 24, 2021
f562006
issue #2900 code review and fix cassandra schema to support reconcili…
punktilious Jan 7, 2022
f9e0b6d
Merge remote-tracking branch 'origin/HEAD' into issue-2900-ra
punktilious Jan 7, 2022
61fb2e1
issue #2900 reconciliation between cassandra offload store and RDBMS
punktilious Jan 7, 2022
eac2bb5
issue #2900 add isOffloadingSupported to FHIRPersistence
punktilious Jan 7, 2022
bcf2bd5
issue #2900 do not include cassandra offload in fhir-server
punktilious Jan 10, 2022
d82d693
issue #2900 updated readmes and final review comments
punktilious Jan 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/security/asoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mkdir -p ${WORKSPACE}/build/security/logs/output/
find ${WORKSPACE} -iname 'fhir-*.jar' -not -iname 'fhir*-tests.jar' -not -iname 'fhir*-test-*.jar' -not -iname 'fhir-persistence-schema-*-cli.jar' -not -iname 'fhir-swagger-generator-*-cli.jar' \
-not -iname 'fhir-examples-*.jar' -not -name 'fhir-bulkdata-webapp-*-client.jar' -not -iname 'fhir*-ig-*.jar' -not -iname 'fhir-bucket-*-cli.jar' -not -path '*/target/fhir-server-webapp-*' \
-not -iname 'fhir-operation-cqf-*-shaded.jar' -not -iname 'fhir-operation-cpg-*-shaded.jar' -not -iname 'fhir-term-graph-loader-*-cli.jar' \
-not -path '*/target/fhir-persistence-cassandra-app*' \
-not -path '*/target/fhir-bulkdata*' -exec cp -f {} ${WORKSPACE}/build/security/logs/tmp \;

cd ${WORKSPACE}/build/security/logs/
Expand Down
71 changes: 71 additions & 0 deletions fhir-bucket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ java -jar "${JAR}" \

This tracking database can be shared with the instance used by FHIR, but for proper performance testing it should be on a separate host. The standard schema for the tables is FHIRBUCKET.

Schema creation does not have to be performed separately. If you wish to create the schema as part of the main program run, specify

```
--bootstrap-schema
```

on the command line. The schema creation and update process is idempotent. Changes are only applied when required, and concurrency is managed correctly to ensure only one instance makes changes if multiple instances of fhir-bucket are run simultaneously.

The preferred approach is to use the new `--bootstrap-schema` option when running the main workload, in which case the `--create-schema` activity isn't required.


Expand All @@ -247,6 +255,7 @@ java -jar "${JAR}" \
--db-properties db2.properties \
--cos-properties cos.properties \
--fhir-properties fhir.properties \
--bootstrap-schema \
--bucket example-bucket \
--tenant-name example-tenant \
--file-type NDJSON \
Expand Down Expand Up @@ -275,11 +284,73 @@ To run using PostgreSQL, change the relevant arguments to:
...
```

The `--immediate-local` option can be used to load files from a local file-system without the need for a FHIRBUCKET database or connection to COS:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--max-concurrent-fhir-requests 0 \
--max-concurrent-json-files 0 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 10 \
--immediate-local \
--scan-local-dir /path/to/synthea/data
```

Because --immediate-local does not use a FHIRBUCKET database, there is no tracking of the logical ids generated by the IBM FHIR Server. This means it is not possible to run the interop workload against this data.

To track the logical ids, you can provide a FHIRBUCKET database configuration along with the --scan-local-dir argument, but do not specify --immediate-local:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--max-concurrent-fhir-requests 0 \
--max-concurrent-json-files 0 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 10 \
--scan-local-dir /path/to/synthea/data
```

Once the directory scanning is complete, the program can be terminated. To load the files now registered in the FHIRBUCKET database, run the following:

```
java \
-Djava.util.logging.config.file=logging.properties \
-jar "${JAR}" \
--db-type postgresql \
--db-properties db.properties \
--fhir-properties fhir.properties \
--tenant-name your-tenant-name \
--file-type JSON \
--no-scan \
--max-concurrent-fhir-requests 40 \
--max-concurrent-json-files 10 \
--max-concurrent-ndjson-files 0 \
--connection-pool-size 40 \
--scan-local-dir /path/to/synthea/data
```

Note that the --scan-local-dir [path-name] option must still be provided.


| Property Name | Description |
| -------------------- | -----|
| `--bootstrap-schema` | Creates/updates the schema as an initial step before starting the main workload. Simplifies cloud deployment scenarios by avoiding the need for a separate job. Ensures only one instance will try to update the schema at a time. Do not specify `--create-schema` when using this option. |
| `--db-type type` | where `type` is one of: db2, derby, postgresql. Specifies the type of database to use for the FHIRBUCKET tracking data. |
| `--create-schema` | Creates a new or updates an existing database schema. The program will exit after the schema operations have completed.|
| `--bootstrap-schema` | Creates a new or updates an existing database schema. The program will not exit after the schema operations have completed.|
| `--schema-name` | The custom schema used for FHIRBUCKET tracking data. The default is `FHIRBUCKET`.|
| `--tenant-name fhir-tenant-name` | The IBM FHIR Server tenant name|
| `--cos-properties properties-file` | Connection properties file for COS |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;
import com.ibm.fhir.operation.bulkdata.model.type.StorageType;
import com.ibm.fhir.persistence.FHIRPersistence;
import com.ibm.fhir.persistence.FHIRPersistenceSupport;
import com.ibm.fhir.persistence.InteractionStatus;
import com.ibm.fhir.persistence.SingleResourceResult;
import com.ibm.fhir.persistence.context.FHIRPersistenceContext;
import com.ibm.fhir.persistence.context.FHIRPersistenceContextFactory;
import com.ibm.fhir.persistence.context.FHIRPersistenceEvent;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.persistence.payload.PayloadPersistenceHelper;
import com.ibm.fhir.persistence.util.FHIRPersistenceUtil;
import com.ibm.fhir.validation.exception.FHIRValidationException;

Expand Down Expand Up @@ -305,7 +305,7 @@ public OperationOutcome conditionalFingerprintUpdate(ImportTransientUserData chu
SingleResourceResult<? extends Resource> oldResourceResult = persistence.read(context, resource.getClass(), logicalId);
Resource oldResource = oldResourceResult.getResource();

final com.ibm.fhir.model.type.Instant lastUpdated = PayloadPersistenceHelper.getCurrentInstant();
final com.ibm.fhir.model.type.Instant lastUpdated = FHIRPersistenceSupport.getCurrentInstant();
final int newVersionNumber = oldResource != null && oldResource.getMeta() != null && oldResource.getMeta().getVersionId() != null
? Integer.parseInt(oldResource.getMeta().getVersionId().getValue()) + 1 : 1;
resource = FHIRPersistenceUtil.copyAndSetResourceMetaFields(resource, logicalId, newVersionNumber, lastUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ private void initProperties(Properties props) throws Exception {

setHostnameVerificationEnabled(Boolean.parseBoolean(getProperty(PROPNAME_HOSTNAME_VERIFICATION_ENABLED, "true")));

setHttpTimeout(Integer.parseUnsignedInt(getProperty(PROPNAME_HTTP_TIMEOUT, "60000")));
// Use a default that's longer than the default Liberty transaction timeout
setHttpTimeout(Integer.parseUnsignedInt(getProperty(PROPNAME_HTTP_TIMEOUT, "130000")));

setTenantId(getProperty(PROPNAME_TENANT_ID, null));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2019, 2020
* (C) Copyright IBM Corp. 2019, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -53,4 +53,13 @@ public void setAutoCreate(boolean create) {
public boolean isAutoCreate() {
return "Y".equals(this.properties.getProperty(CREATE_KEY));
}

@Override
public String getDefaultSchema() {
String result = super.getDefaultSchema();
if (result == null) {
result = "APP";
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.database.utils.pool;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

import com.ibm.fhir.database.utils.api.IConnectionProvider;
import com.ibm.fhir.database.utils.api.IDatabaseAdapter;
import com.ibm.fhir.database.utils.api.IDatabaseTranslator;
import com.ibm.fhir.database.utils.api.ITransaction;
import com.ibm.fhir.database.utils.api.ITransactionProvider;
import com.ibm.fhir.database.utils.common.JdbcConnectionProvider;
import com.ibm.fhir.database.utils.db2.Db2Adapter;
import com.ibm.fhir.database.utils.db2.Db2PropertyAdapter;
import com.ibm.fhir.database.utils.db2.Db2Translator;
import com.ibm.fhir.database.utils.derby.DerbyAdapter;
import com.ibm.fhir.database.utils.derby.DerbyPropertyAdapter;
import com.ibm.fhir.database.utils.derby.DerbyTranslator;
import com.ibm.fhir.database.utils.model.DbType;
import com.ibm.fhir.database.utils.postgres.PostgresAdapter;
import com.ibm.fhir.database.utils.postgres.PostgresPropertyAdapter;
import com.ibm.fhir.database.utils.postgres.PostgresTranslator;
import com.ibm.fhir.database.utils.transaction.SimpleTransactionProvider;

/**
* Support class for managing connections to a database for utility apps
*/
public class DatabaseSupport implements IConnectionProvider, ITransactionProvider {
private static final int DEFAULT_CONNECTION_POOL_SIZE = 10;
private final Properties dbProperties;
private final DbType dbType;

// The translator for the configured database type
private IDatabaseTranslator translator;

// The adapter configured for the type of database we're using
private IDatabaseAdapter adapter;

// Connection pool used to work alongside the transaction provider
private PoolConnectionProvider connectionPool;

// Simple transaction service for use outside of JEE
private ITransactionProvider transactionProvider;

private int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE;

/**
* Public constructor
* @param dbProperties
* @param dbType
*/
public DatabaseSupport(Properties dbProperties, DbType dbType) {
this.dbProperties = dbProperties;
this.dbType = dbType;
}

/**
* Build the database configuration from the configured properties
*/
public void init() {
switch (this.dbType) {
case DB2:
configureForDb2();
break;
case DERBY:
configureForDerby();
break;
case POSTGRESQL:
configureForPostgresql();
break;
default:
throw new IllegalStateException("Unsupported database type: " + this.dbType);
}
}

/**
* Set up the connection pool and transaction provider for connecting to a Derby
* database
*/
private void configureForDerby() {
DerbyPropertyAdapter propertyAdapter = new DerbyPropertyAdapter(dbProperties);

this.translator = new DerbyTranslator();
IConnectionProvider cp = new JdbcConnectionProvider(this.translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.connectionPool.setCloseOnAnyError();
this.adapter = new DerbyAdapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Set up the connection pool and transaction provider for connecting to a DB2
* database
*/
private void configureForDb2() {

this.translator = new Db2Translator();
try {
Class.forName(translator.getDriverClassName());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}

Db2PropertyAdapter propertyAdapter = new Db2PropertyAdapter(dbProperties);
IConnectionProvider cp = new JdbcConnectionProvider(translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.adapter = new Db2Adapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Set up the connection pool and transaction provider for connecting to a DB2
* database
*/
private void configureForPostgresql() {
this.translator = new PostgresTranslator();
try {
Class.forName(translator.getDriverClassName());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}

PostgresPropertyAdapter propertyAdapter = new PostgresPropertyAdapter(dbProperties);
IConnectionProvider cp = new JdbcConnectionProvider(translator, propertyAdapter);
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.adapter = new PostgresAdapter(connectionPool);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

/**
* Get the configured database adapter
* @return
*/
public IDatabaseAdapter getDatabaseAdapter() {
if (this.adapter == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.adapter;
}

/**
* Get the IDatabaseTranslator for the configured database type
* @return
*/
public IDatabaseTranslator getTranslator() {
if (this.translator == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.translator;
}

@Override
public Connection getConnection() throws SQLException {
if (this.connectionPool == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}

return this.connectionPool.getConnection();
}

@Override
public ITransaction getTransaction() {
if (this.transactionProvider == null) {
throw new IllegalStateException("DatabaseSupport not initialized");
}
return this.transactionProvider.getTransaction();
}

@Override
public void commitTransaction() throws SQLException {
connectionPool.commitTransaction();
}

@Override
public void rollbackTransaction() throws SQLException {
connectionPool.rollbackTransaction();
}

@Override
public void describe(String prefix, StringBuilder cfg, String key) {
connectionPool.describe(prefix, cfg, key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ public void addColumn(String source, String name) {
public void addColumn(String source, String name, Alias alias) {
selectList.addColumn(source, name, alias);
}

/**
* Add a value column to the select list, for example:
* addColumn("5", alias("RESOURCE_TYPE_ID")) can be used for:
* SELECT foo, 5 AS RESOURCE_TYPE_ID
* FROM ...
* @param columnValue
* @param alias
*/
public void addColumn(String columnValue, Alias alias) {
selectList.addColumn(columnValue, alias);
}

/**
* Add a table item to the from-clause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ public SelectAdapter addColumn(String source, String name, Alias alias) {
this.select.addColumn(source, name, alias);
return this;
}

/**
* Add a column value with a given alias. Can be used to add literals in
* the select list
* @param source
* @param name
* @param alias
* @return
*/
public SelectAdapter addColumn(String columnValue, Alias alias) {
this.select.addColumn(columnValue, alias);
return this;
}

/**
* Create a from clause for this select statement
Expand Down
Loading