Skip to content

Commit

Permalink
issue #3437 distributed schema using shard_key
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Arnold <[email protected]>
  • Loading branch information
punktilious committed May 5, 2022
1 parent 10df2e8 commit 68819ab
Show file tree
Hide file tree
Showing 125 changed files with 5,212 additions and 2,184 deletions.
14 changes: 10 additions & 4 deletions fhir-bucket/src/main/java/com/ibm/fhir/bucket/app/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
import com.ibm.fhir.database.utils.api.IDatabaseAdapter;
import com.ibm.fhir.database.utils.api.IDatabaseTranslator;
import com.ibm.fhir.database.utils.api.ILeaseManagerConfig;
import com.ibm.fhir.database.utils.api.ISchemaAdapter;
import com.ibm.fhir.database.utils.api.ITransaction;
import com.ibm.fhir.database.utils.api.ITransactionProvider;
import com.ibm.fhir.database.utils.api.SchemaApplyContext;
import com.ibm.fhir.database.utils.api.UniqueConstraintViolationException;
import com.ibm.fhir.database.utils.common.JdbcConnectionProvider;
import com.ibm.fhir.database.utils.common.PlainSchemaAdapter;
import com.ibm.fhir.database.utils.db2.Db2Adapter;
import com.ibm.fhir.database.utils.db2.Db2PropertyAdapter;
import com.ibm.fhir.database.utils.db2.Db2Translator;
Expand Down Expand Up @@ -148,6 +150,9 @@ public class Main {
// The adapter configured for the type of database we're using
private IDatabaseAdapter adapter;

// The (plain) schema adapter which wraps the database adapter
private ISchemaAdapter schemaAdapter;

// The number of threads to use for the schema creation step
private int createSchemaThreads = 1;

Expand Down Expand Up @@ -671,6 +676,7 @@ public void setupDerbyRepository() {
this.connectionPool = new PoolConnectionProvider(cp, connectionPoolSize);
this.connectionPool.setCloseOnAnyError();
this.adapter = new DerbyAdapter(connectionPool);
this.schemaAdapter = new PlainSchemaAdapter(adapter);
this.transactionProvider = new SimpleTransactionProvider(connectionPool);
}

Expand Down Expand Up @@ -734,7 +740,7 @@ protected VersionHistoryService createVersionHistoryService() {
// Create the version history table if it doesn't yet exist
try (ITransaction tx = transactionProvider.getTransaction()) {
try {
CreateVersionHistory.createTableIfNeeded(schemaName, this.adapter);
CreateVersionHistory.createTableIfNeeded(schemaName, this.schemaAdapter);
} catch (Exception x) {
logger.log(Level.SEVERE, "failed to create version history table", x);
tx.setRollbackOnly();
Expand Down Expand Up @@ -763,8 +769,8 @@ public void bootstrapDb() {
try (ITransaction tx = transactionProvider.getTransaction()) {
try {
adapter.createSchema(schemaName);
CreateControl.createTableIfNeeded(schemaName, adapter);
CreateWholeSchemaVersion.createTableIfNeeded(schemaName, adapter);
CreateControl.createTableIfNeeded(schemaName, schemaAdapter);
CreateWholeSchemaVersion.createTableIfNeeded(schemaName, schemaAdapter);
success = true;
} catch (Exception x) {
logger.log(Level.SEVERE, "failed to create schema management tables", x);
Expand Down Expand Up @@ -825,7 +831,7 @@ private void buildSchema() {
ExecutorService pool = Executors.newFixedThreadPool(this.createSchemaThreads);
ITaskCollector collector = taskService.makeTaskCollector(pool);
SchemaApplyContext context = SchemaApplyContext.getDefault();
pdm.collect(collector, adapter, context, this.transactionProvider, vhs);
pdm.collect(collector, schemaAdapter, context, this.transactionProvider, vhs);

// FHIR in the hole!
logger.info("Starting schema updates");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static com.ibm.fhir.bucket.persistence.SchemaConstants.VERSION;

import com.ibm.fhir.bucket.app.Main;
import com.ibm.fhir.database.utils.api.IDatabaseAdapter;
import com.ibm.fhir.database.utils.api.ISchemaAdapter;
import com.ibm.fhir.database.utils.api.SchemaApplyContext;
import com.ibm.fhir.database.utils.model.Generated;
import com.ibm.fhir.database.utils.model.PhysicalDataModel;
Expand Down Expand Up @@ -304,9 +304,11 @@ protected void addResourceBundleErrors(PhysicalDataModel pdm) {

/**
* Apply the model to the database. Will generate the DDL and execute it
* @param adapter
* @param context
* @param pdm
*/
protected void applyModel(IDatabaseAdapter adapter, SchemaApplyContext context, PhysicalDataModel pdm) {
protected void applyModel(ISchemaAdapter adapter, SchemaApplyContext context, PhysicalDataModel pdm) {
pdm.apply(adapter, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@
import com.ibm.fhir.bucket.persistence.ResourceRec;
import com.ibm.fhir.bucket.persistence.ResourceTypeRec;
import com.ibm.fhir.bucket.persistence.ResourceTypesReader;
import com.ibm.fhir.database.utils.api.ISchemaAdapter;
import com.ibm.fhir.database.utils.api.ITransaction;
import com.ibm.fhir.database.utils.api.ITransactionProvider;
import com.ibm.fhir.database.utils.common.JdbcTarget;
import com.ibm.fhir.database.utils.common.PlainSchemaAdapter;
import com.ibm.fhir.database.utils.derby.DerbyAdapter;
import com.ibm.fhir.database.utils.derby.DerbyConnectionProvider;
import com.ibm.fhir.database.utils.derby.DerbyMaster;
Expand Down Expand Up @@ -369,7 +371,8 @@ protected VersionHistoryService createVersionHistoryService() throws SQLExceptio
try {
JdbcTarget target = new JdbcTarget(c);
DerbyAdapter derbyAdapter = new DerbyAdapter(target);
CreateVersionHistory.createTableIfNeeded(ADMIN_SCHEMA_NAME, derbyAdapter);
ISchemaAdapter schemaAdapter = new PlainSchemaAdapter(derbyAdapter);
CreateVersionHistory.createTableIfNeeded(ADMIN_SCHEMA_NAME, schemaAdapter);
c.commit();
} catch (SQLException x) {
logger.log(Level.SEVERE, "failed to create version history table", x);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public PartitionPlan mapPartitions() throws Exception {
List<String> target = new ArrayList<>();
try {
for (String resourceType : resourceTypes) {
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(1, null, null, null,
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(null, 1, null, null, null,
Arrays.asList(resourceType), false, HistorySortOrder.NONE);

// Early Exit Logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public PartitionPlan mapPartitions() throws Exception {
List<String> target = new ArrayList<>();
try {
for (String resourceType : resourceTypes) {
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE);
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(null, 1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE);

// Early Exit Logic
if (!resourceResults.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class FHIRConfiguration {
// Core server properties
public static final String PROPERTY_ORIGINAL_REQUEST_URI_HEADER_NAME = "fhirServer/core/originalRequestUriHeaderName";
public static final String PROPERTY_TENANT_ID_HEADER_NAME = "fhirServer/core/tenantIdHeaderName";
public static final String PROPERTY_SHARD_KEY_HEADER_NAME = "fhirServer/core/shardKeyHeaderName";
public static final String PROPERTY_DATASTORE_ID_HEADER_NAME = "fhirServer/core/datastoreIdHeaderName";
public static final String PROPERTY_DEFAULT_TENANT_ID = "fhirServer/core/defaultTenantId";
public static final String PROPERTY_DEFAULT_PRETTY_PRINT = "fhirServer/core/defaultPrettyPrint";
Expand Down Expand Up @@ -109,6 +110,12 @@ public class FHIRConfiguration {
public static final String PROPERTY_NATS_KEYSTORE = "fhirServer/notifications/nats/keystoreLocation";
public static final String PROPERTY_NATS_KEYSTORE_PW = "fhirServer/notifications/nats/keystorePassword";

// Configuration properties for the Kafka-based async index service
public static final String PROPERTY_REMOTE_INDEX_SERVICE_TYPE = "fhirServer/remoteIndexService/type";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_TOPICNAME = "fhirServer/remoteIndexService/kafka/topicName";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_CONNECTIONPROPS = "fhirServer/remoteIndexService/kafka/connectionProperties";
public static final String PROPERTY_KAFKA_INDEX_SERVICE_MODE = "fhirServer/remoteIndexService/kafka/mode";

// Operations config properties
public static final String PROPERTY_OPERATIONS_EVERYTHING = "fhirServer/operations/everything";
public static final String PROPERTY_OPERATIONS_EVERYTHING_INCLUDE_TYPES = "includeTypes";
Expand All @@ -132,6 +139,7 @@ public class FHIRConfiguration {
public static final String DEFAULT_TENANT_ID_HEADER_NAME = "X-FHIR-TENANT-ID";
public static final String DEFAULT_DATASTORE_ID_HEADER_NAME = "X-FHIR-DSID";
public static final String DEFAULT_PRETTY_RESPONSE_HEADER_NAME = "X-FHIR-FORMATTED";
public static final String DEFAULT_SHARD_KEY_HEADER_NAME = "X-FHIR-SHARD-KEY";

public static final String FHIR_SERVER_DEFAULT_CONFIG = "config/default/fhir-server-config.json";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class FHIRRequestContext {

// The datastore to be used for this request. Usually "default"
private String dataStoreId;

// An optional shard key passed with the request for use with distributed schemas
private String requestShardKey;

private String requestUniqueId;
private String originalRequestUri;
private Map<String, List<String>> httpHeaders;
Expand Down Expand Up @@ -144,6 +148,24 @@ public void setDataStoreId(String dataStoreId) throws FHIRException {
}
}

/**
* Set the shard key string value provided by the request
* @param k
*/
public void setRequestShardKey(String k) {
this.requestShardKey = k;
}

/**
* Get the shard key string value provided by the request. This value is
* not filtered in any way because the value eventually gets hashed into
* a short (2 byte integer number) before being used.
* @return
*/
public String getRequestShardKey() {
return this.requestShardKey;
}

/**
* set an Operation Context property
* @param name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.database.utils.api;


/**
* Carrier for the distribution context passed to some adapter methods
*/
public class DistributionContext {
// The type of distribution to be applied for a particular table
private final DistributionType distributionType;
// The column name to be used for distribution when the distributionType is DISTRIBUTED
private final String distributionColumnName;

/**
* Public constructor
* @param distributionType
* @param distributionColumnName
*/
public DistributionContext(DistributionType distributionType, String distributionColumnName) {
this.distributionType = distributionType;
this.distributionColumnName = distributionColumnName;
}

/**
* @return the distributionType
*/
public DistributionType getDistributionType() {
return distributionType;
}

/**
* @return the distributionColumnName
*/
public String getDistributionColumnName() {
return distributionColumnName;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* (C) Copyright IBM Corp. 2022
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.ibm.fhir.database.utils.api;

/**
* The type of distribution to use for a table
*/
public enum DistributionType {
NONE, // table will not be distributed at all
REFERENCE, // table will be replicated
DISTRIBUTED // table will be sharded by a known column
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ public interface IDatabaseAdapter {
* @param tablespaceName
* @param withs
* @param checkConstraints
* @param distributionRules
* @param distributionContext
*/
public void createTable(String schemaName, String name, String tenantColumnName, List<ColumnBase> columns,
PrimaryKeyDef primaryKey, IdentityDef identity, String tablespaceName, List<With> withs, List<CheckConstraint> checkConstraints,
DistributionRules distributionRules);
DistributionContext distributionContext);

/**
* Apply any distribution rules configured for the named table
* @param schemaName
* @param tableName
* @param distributionRules
* @param distributionContext
*/
public void applyDistributionRules(String schemaName, String tableName, DistributionRules distributionRules);
public void applyDistributionRules(String schemaName, String tableName, DistributionContext distributionContext);

/**
* Add a new column to an existing table
Expand Down Expand Up @@ -167,10 +167,10 @@ public void createTable(String schemaName, String name, String tenantColumnName,
* @param tenantColumnName
* @param indexColumns
* @param includeColumns
* @param distributionRules
* @param distributionContext
*/
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName,
List<OrderedColumnDef> indexColumns, List<String> includeColumns, DistributionRules distributionRules);
List<OrderedColumnDef> indexColumns, List<String> includeColumns, DistributionContext distributionContext);

/**
* Create a unique index
Expand All @@ -179,10 +179,10 @@ public void createUniqueIndex(String schemaName, String tableName, String indexN
* @param indexName
* @param tenantColumnName
* @param indexColumns
* @param distributionRules
* @param distributionContext
*/
public void createUniqueIndex(String schemaName, String tableName, String indexName, String tenantColumnName,
List<OrderedColumnDef> indexColumns, DistributionRules distributionRules);
List<OrderedColumnDef> indexColumns, DistributionContext distributionContext);

/**
*
Expand Down
Loading

0 comments on commit 68819ab

Please sign in to comment.