diff --git a/README.md b/README.md index e393094fc..3ee17ae46 100644 --- a/README.md +++ b/README.md @@ -406,7 +406,9 @@ to a running PostgreSQL query. 0 disables this timeout and is the default. To take effect an RMB based module must get it via PostgresClient.getConnectionConfig().getInteger("queryTimeout") and pass it to the RMB method that starts the connection, transaction or query. -The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the [Read and write database instances setup](#read-and-write-database-instances-setup). +The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the synchronously replicated [read and write database instances setup](#read-and-write-database-instances-setup). + +The environment variables `DB_HOST_ASYNC_READER` and `DB_PORT_ASYNC_READER` are for the asynchronously replicated read and write database instances setup. `DB_ALLOW_SUPPRESS_OPTIMISTIC_LOCKING` is a timestamp in the format `2022-12-31T23:59:59Z`. Setting it disables optimistic locking when sending a record that contains `"_version":-1` before that time, after that time `"_version":-1` is rejected. This applies only to tables with `failOnConflictUnlessSuppressed`, see below. The timestamp ensures that disabling this option cannot be forgotten. Suppressing optimistic locking is known to lead to data loss in some cases, don't use in production, you have been warned! @@ -415,13 +417,17 @@ The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the [Rea See the [Environment Variables](https://github.com/folio-org/okapi/blob/master/doc/guide.md#environment-variables) section of the Okapi Guide for more information on how to deploy environment variables to RMB modules via Okapi. ## Read and write database instances setup -A PostgreSQL instance (the write instance) can be replicated for horizontal scaling (scale out), each replica is a read-only standby instance. +A PostgreSQL instance (the write instance) can be replicated for horizontal scaling (scale out). Each replica is a read-only standby instance. RMB supports separating read and write requests. By default the write instance configured with `DB_HOST` and `DB_PORT` environment variables is used for reading as well, but optionally a read instance (or a load balancer for multiple read instances) can be configured by setting its host and port using the `DB_HOST_READER` and `DB_PORT_READER` environment variables. If either of these reader variables are not set then it will default to use the writer instance. -RMB only supports [synchronous standby servers](https://www.postgresql.org/docs/current/warm-standby.html#SYNCHRONOUS-REPLICATION). The write instance must list the `DB_*_READER` instance(s) in its `synchronous_standby_names` configuration. This ensures ACID for both write and read instance. +RMB supports both [synchronous standby servers](https://www.postgresql.org/docs/current/warm-standby.html#SYNCHRONOUS-REPLICATION) and asynchronously replicated standby servers (the default)[https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION]. When using synchronous replication, write instance must list the `DB_HOST_READER` instance(s) in its `synchronous_standby_names` configuration. This ensures ACID for both write and read instance. + +PostgreSQL's default asynchronous replication is supported by RMB by configuring `DB_HOST_ASYNC_READER` and `DB_PORT_ASYNC_READER`. Asynchronous replication is eventually consistent and suitable for read-only applications like reporting, analytics, and data warehousing. To use the async read host in queries, get an instance of `PostgresClient` using `PostgresClientWithAsyncReadConn.getInstance(...)`. If no async read host is configured, it falls back to the sync read host if configured, otherwise it uses the write host. + +AWS RDS does not support synchronous replication. For AWS it is recommended to only use `DB_HOST` and `DB_HOST_ASYNC_READER` in a given deployment. -PostgreSQL's default asynchronous replication is not supported by RMB and will result in errors caused by outdated data the replica may return. Asynchronous replication is eventual consistency suitable for read-only applications like reporting, analytics, and data warehouse. +APIs using the async client should provide a warning in the API documentation that the API uses stale data (for performance reasons). ## Local development server diff --git a/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java index eaeaf8950..bdab6b498 100644 --- a/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java +++ b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClient.java @@ -3,23 +3,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import freemarker.template.TemplateException; -import io.netty.handler.ssl.OpenSsl; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.core.net.JdkSSLEngineOptions; -import io.vertx.core.net.OpenSSLEngineOptions; -import io.vertx.core.net.PemTrustOptions; -import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; import io.vertx.pgclient.PgPool; -import io.vertx.pgclient.SslMode; -import io.vertx.sqlclient.PoolOptions; import io.vertx.sqlclient.PreparedStatement; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowIterator; @@ -41,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.regex.Matcher; @@ -75,8 +66,11 @@ public class PostgresClient { public static final String DEFAULT_SCHEMA = "public"; public static final String DEFAULT_JSONB_FIELD_NAME = "jsonb"; public static final int DEFAULT_MAX_POOL_SIZE = 4; - /** default release delay in milliseconds; after this time an idle database connection is closed */ - public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 60000; + + protected static final String MODULE_NAME = getModuleNameValue("getModuleName"); + protected static final String PG_APPLICATION_NAME = MODULE_NAME.replace('_', '-') + "-" + + getModuleNameValue("getModuleVersion"); + protected static final String MAX_SHARED_POOL_SIZE = "maxSharedPoolSize"; static Logger log = LogManager.getLogger(PostgresClient.class); @@ -88,6 +82,17 @@ public class PostgresClient { static final int STREAM_GET_DEFAULT_CHUNK_SIZE = 100; static final ObjectMapper MAPPER = ObjectMapperTool.getMapper(); + @SuppressWarnings("java:S2068") // suppress "Hard-coded credentials are security-sensitive" + // we use it as a key in the config. We use it as a default password only when testing + // using embedded postgres, see getPostgreSQLClientConfig + static final String PASSWORD = "password"; + static final String USERNAME = "username"; + static final String HOST = "host"; + static final String HOST_READER = "host_reader"; + static final String PORT = "port"; + static final String PORT_READER = "port_reader"; + static final String DATABASE = "database"; + /** * True if all tenants of a Vertx share one PgPool, false for having a separate PgPool for * each combination of tenant and Vertx (= each PostgresClient has its own PgPool). @@ -98,14 +103,8 @@ public class PostgresClient { */ private static boolean sharedPgPool; - private static final String MODULE_NAME = getModuleNameValue("getModuleName"); - private static final String PG_APPLICATION_NAME = MODULE_NAME.replace('_', '-') + "-" - + getModuleNameValue("getModuleVersion"); private static final String ID_FIELD = "id"; - private static final String CONNECTION_RELEASE_DELAY = "connectionReleaseDelay"; - private static final String MAX_POOL_SIZE = "maxPoolSize"; - private static final String MAX_SHARED_POOL_SIZE = "maxSharedPoolSize"; private static final String POSTGRES_LOCALHOST_CONFIG = "/postgres-conf.json"; private static PostgresTester postgresTester; @@ -114,24 +113,10 @@ public class PostgresClient { private static final String FROM = " FROM "; private static final String WHERE = " WHERE "; - @SuppressWarnings("java:S2068") // suppress "Hard-coded credentials are security-sensitive" - // we use it as a key in the config. We use it as a default password only when testing - // using embedded postgres, see getPostgreSQLClientConfig - private static final String PASSWORD = "password"; - private static final String USERNAME = "username"; - private static final String HOST = "host"; - private static final String HOST_READER = "host_reader"; - private static final String PORT = "port"; - private static final String PORT_READER = "port_reader"; - private static final String DATABASE = "database"; - private static final String RECONNECT_ATTEMPTS = "reconnectAttempts"; - private static final String RECONNECT_INTERVAL = "reconnectInterval"; - private static final String SERVER_PEM = "server_pem"; private static final String POSTGRES_TESTER = "postgres_tester"; private static final String GET_STAT_METHOD = "get"; private static final String EXECUTE_STAT_METHOD = "execute"; - private static final String PROCESS_RESULTS_STAT_METHOD = "processResults"; private static final String SPACE = " "; @@ -171,6 +156,7 @@ public class PostgresClient { private final Vertx vertx; private JsonObject postgreSQLClientConfig = null; + /** * PgPool client that is initialized with mainly the database writer instance's connection string. */ @@ -182,6 +168,7 @@ public class PostgresClient { private PgPool readClient; private final String tenantId; private final String schemaName; + private PostgresClientInitializer postgresClientInitializer; protected PostgresClient(Vertx vertx, String tenantId) throws Exception { this.tenantId = tenantId; @@ -190,7 +177,6 @@ protected PostgresClient(Vertx vertx, String tenantId) throws Exception { init(); } - /** * test constructor for unit testing */ @@ -317,7 +303,7 @@ static Long getExplainQueryThreshold() { * @param tenantId the tenantId the instance is for * @return the PostgresClient instance, or null on error */ - private static PostgresClient getInstanceInternal(Vertx vertx, String tenantId) { + protected static PostgresClient getInstanceInternal(Vertx vertx, String tenantId) { // assumes a single thread vertx model so no sync needed PostgresClient postgresClient = CONNECTION_POOL.get(vertx, tenantId); try { @@ -433,6 +419,14 @@ void setClient(PgPool client) { this.client = client; } + /** + * Get the {@link PostgresClientInitializer} for this {@link PostgresClient} instance. + * @return A reference to the initializer. + */ + PostgresClientInitializer getPostgresClientInitializer() { + return this.postgresClientInitializer; + } + /** * Close the SQL client of this PostgresClient instance. * This is idempotent: additional close invocations are always successful. @@ -512,78 +506,8 @@ public static void closeAllClients() { PG_POOLS.clear(); PG_POOLS_READER.values().forEach(PgPool::close); PG_POOLS_READER.clear(); - } - static PgConnectOptions createPgConnectOptions(JsonObject sqlConfig, boolean isReader) { - PgConnectOptions pgConnectOptions = new PgConnectOptions(); - pgConnectOptions.addProperty("application_name", PG_APPLICATION_NAME); - - String hostToResolve = HOST; - String portToResolve = PORT; - if (isReader) { - hostToResolve = HOST_READER; - portToResolve = PORT_READER; - } - - String host = sqlConfig.getString(hostToResolve); - if (host != null) { - pgConnectOptions.setHost(host); - } - - Integer port; - port = sqlConfig.getInteger(portToResolve); - - if (port != null) { - pgConnectOptions.setPort(port); - } - - if (isReader && (host == null || port == null)) { - return null; - } - - String username = sqlConfig.getString(USERNAME); - if (username != null) { - pgConnectOptions.setUser(username); - } - String password = sqlConfig.getString(PASSWORD); - if (password != null) { - pgConnectOptions.setPassword(password); - } - String database = sqlConfig.getString(DATABASE); - if (database != null) { - pgConnectOptions.setDatabase(database); - } - Integer reconnectAttempts = sqlConfig.getInteger(RECONNECT_ATTEMPTS); - if (reconnectAttempts != null) { - pgConnectOptions.setReconnectAttempts(reconnectAttempts); - } - Long reconnectInterval = sqlConfig.getLong(RECONNECT_INTERVAL); - if (reconnectInterval != null) { - pgConnectOptions.setReconnectInterval(reconnectInterval); - } - - String serverPem = sqlConfig.getString(SERVER_PEM); - if (serverPem != null) { - pgConnectOptions.setSslMode(SslMode.VERIFY_FULL); - pgConnectOptions.setHostnameVerificationAlgorithm("HTTPS"); - pgConnectOptions.setPemTrustOptions( - new PemTrustOptions().addCertValue(Buffer.buffer(serverPem))); - pgConnectOptions.setEnabledSecureTransportProtocols(Collections.singleton("TLSv1.3")); - if (OpenSSLEngineOptions.isAvailable()) { - pgConnectOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions()); - } else { - pgConnectOptions.setJdkSslEngineOptions(new JdkSSLEngineOptions()); - log.error("Cannot run OpenSSL, using slow JDKSSL. Is netty-tcnative-boringssl-static for windows-x86_64, " - + "osx-x86_64 or linux-x86_64 installed? https://netty.io/wiki/forked-tomcat-native.html " - + "Is libc6-compat installed (if required)? https://github.com/pires/netty-tcnative-alpine"); - } - log.debug("Enforcing SSL encryption for PostgreSQL connections, " - + "requiring TLSv1.3 with server name certificate, " - + "using " + (OpenSSLEngineOptions.isAvailable() ? "OpenSSL " + OpenSsl.versionString() : "JDKSSL")); - } - return pgConnectOptions; - } private void init() throws Exception { @@ -602,38 +526,14 @@ private void init() throws Exception { } logPostgresConfig(); + this.postgresClientInitializer = new PostgresClientInitializer(vertx, postgreSQLClientConfig); if (sharedPgPool) { - client = PG_POOLS.computeIfAbsent(vertx, x -> createPgPool(vertx, postgreSQLClientConfig, false)); - readClient = PG_POOLS_READER.computeIfAbsent(vertx, x -> createPgPool(vertx, postgreSQLClientConfig, true)); + client = PG_POOLS.computeIfAbsent(vertx, x -> postgresClientInitializer.getClient()); + readClient = PG_POOLS_READER.computeIfAbsent(vertx, x -> postgresClientInitializer.getSyncReadClient()); } else { - client = createPgPool(vertx, postgreSQLClientConfig, false); - readClient = createPgPool(vertx, postgreSQLClientConfig, true); + client = postgresClientInitializer.getClient(); + readClient = postgresClientInitializer.getSyncReadClient(); } - - readClient = readClient != null ? readClient : client; - } - - static PgPool createPgPool(Vertx vertx, JsonObject configuration, Boolean isReader) { - PgConnectOptions connectOptions = createPgConnectOptions(configuration, isReader); - - if (connectOptions == null) { - return null; - } - - PoolOptions poolOptions = new PoolOptions(); - poolOptions.setMaxSize( - configuration.getInteger(MAX_SHARED_POOL_SIZE, configuration.getInteger(MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE))); - - poolOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS); - - if (sharedPgPool) { - poolOptions.setIdleTimeout(0); // The manager fully manages this. - } else { - var connectionReleaseDelay = configuration.getInteger(CONNECTION_RELEASE_DELAY, DEFAULT_CONNECTION_RELEASE_DELAY); - poolOptions.setIdleTimeout(connectionReleaseDelay); - } - - return PgPool.pool(vertx, connectOptions, poolOptions); } /** @@ -3551,7 +3451,7 @@ public Future getReadConnection() { * @see #withTransaction(Function) */ public Future getConnection(PgPool client) { - if (!isSharedPool()) { + if (!sharedPgPool) { return client.getConnection().map(PgConnection.class::cast); } diff --git a/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientInitializer.java b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientInitializer.java new file mode 100644 index 000000000..9100768fb --- /dev/null +++ b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientInitializer.java @@ -0,0 +1,171 @@ +package org.folio.rest.persist; + +import io.netty.handler.ssl.OpenSsl; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.JdkSSLEngineOptions; +import io.vertx.core.net.OpenSSLEngineOptions; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgPool; +import io.vertx.pgclient.SslMode; +import io.vertx.sqlclient.PoolOptions; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PostgresClientInitializer { + /** default release delay in milliseconds; after this time an idle database connection is closed */ + public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 60000; + static final String HOST_READER_ASYNC = "host_reader_async"; + static final String PORT_READER_ASYNC = "port_reader_async"; + + private static final Logger LOG = LogManager.getLogger(PostgresClientInitializer.class); + private static final String CONNECTION_RELEASE_DELAY = "connectionReleaseDelay"; + private static final String MAX_POOL_SIZE = "maxPoolSize"; + private static final String RECONNECT_ATTEMPTS = "reconnectAttempts"; + private static final String RECONNECT_INTERVAL = "reconnectInterval"; + private static final String SERVER_PEM = "server_pem"; + + private final PgPool client; + private PgPool syncReadClient; + private PgPool asyncReadClient; + + /** + * Defines the various clients (PgPool instances) based on their configured hosts in any supported combination. + * @param vertx A reference to the current vertex instance. + * @param configuration A reference to the current database configuration. + */ + protected PostgresClientInitializer(Vertx vertx, JsonObject configuration) { + client = createPgPool(vertx, configuration, PostgresClient.HOST, PostgresClient.PORT); + syncReadClient = createPgPool(vertx, configuration, PostgresClient.HOST_READER, PostgresClient.PORT_READER); + asyncReadClient = createPgPool(vertx, configuration, HOST_READER_ASYNC, PORT_READER_ASYNC); + + // If there is no read client defined, then use the r/w client for it. + // If there is no async read client defined, then use the sync read client for it if it exists, + // otherwise all 3 clients are the r/w client. + if (syncReadClient == null) { + syncReadClient = client; + } + if (asyncReadClient == null) { + asyncReadClient = syncReadClient; + } + } + + public PgPool getClient() { + return client; + } + + public PgPool getSyncReadClient() { + return syncReadClient; + } + + public PgPool getAsyncReadClient() { + return asyncReadClient; + } + + private static PgPool createPgPool(Vertx vertx, + JsonObject configuration, + String hostToResolve, + String portToResolve) { + var connectOptions = createPgConnectOptions(configuration, hostToResolve, portToResolve); + + if (connectOptions == null) { + return null; + } + + var poolOptions = new PoolOptions(); + poolOptions.setMaxSize(configuration.getInteger(PostgresClient.MAX_SHARED_POOL_SIZE, + configuration.getInteger(MAX_POOL_SIZE, PostgresClient.DEFAULT_MAX_POOL_SIZE))); + + poolOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS); + if (PostgresClient.isSharedPool()) { + poolOptions.setIdleTimeout(0); // The connection manager fully manages this. + } else { + var connectionReleaseDelay = configuration.getInteger(CONNECTION_RELEASE_DELAY, DEFAULT_CONNECTION_RELEASE_DELAY); + poolOptions.setIdleTimeout(connectionReleaseDelay); + } + + return PgPool.pool(vertx, connectOptions, poolOptions); + } + + static PgConnectOptions createPgConnectOptions(JsonObject sqlConfig, String hostToResolve, String portToResolve) { + var pgConnectOptions = new PgConnectOptions(); + pgConnectOptions.addProperty("application_name", PostgresClient.PG_APPLICATION_NAME); + + if (!trySetHostAndPort(pgConnectOptions, sqlConfig, hostToResolve, portToResolve)) { + return null; + } + + var username = sqlConfig.getString(PostgresClient.USERNAME); + if (username != null) { + pgConnectOptions.setUser(username); + } + var password = sqlConfig.getString(PostgresClient.PASSWORD); + if (password != null) { + pgConnectOptions.setPassword(password); + } + var database = sqlConfig.getString(PostgresClient.DATABASE); + if (database != null) { + pgConnectOptions.setDatabase(database); + } + var reconnectAttempts = sqlConfig.getInteger(RECONNECT_ATTEMPTS); + if (reconnectAttempts != null) { + pgConnectOptions.setReconnectAttempts(reconnectAttempts); + } + var reconnectInterval = sqlConfig.getLong(RECONNECT_INTERVAL); + if (reconnectInterval != null) { + pgConnectOptions.setReconnectInterval(reconnectInterval); + } + var serverPem = sqlConfig.getString(SERVER_PEM); + if (serverPem != null) { + setUpSsl(pgConnectOptions, serverPem); + } + return pgConnectOptions; + } + + private static boolean trySetHostAndPort(PgConnectOptions pgConnectOptions, + JsonObject sqlConfig, + String hostToResolve, + String portToResolve) { + var host = sqlConfig.getString(hostToResolve); + if (host != null) { + pgConnectOptions.setHost(host); + } + + Integer port; + port = sqlConfig.getInteger(portToResolve); + + if (port != null) { + pgConnectOptions.setPort(port); + } + + return !isReaderHost(hostToResolve) || (host != null && port != null); + } + + private static void setUpSsl(PgConnectOptions pgConnectOptions, String serverPem) { + pgConnectOptions.setSslMode(SslMode.VERIFY_FULL); + pgConnectOptions.setHostnameVerificationAlgorithm("HTTPS"); + pgConnectOptions.setPemTrustOptions( + new PemTrustOptions().addCertValue(Buffer.buffer(serverPem))); + pgConnectOptions.setEnabledSecureTransportProtocols(Collections.singleton("TLSv1.3")); + if (OpenSSLEngineOptions.isAvailable()) { + pgConnectOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions()); + } else { + pgConnectOptions.setJdkSslEngineOptions(new JdkSSLEngineOptions()); + LOG.error("Cannot run OpenSSL, using slow JDKSSL. Is netty-tcnative-boringssl-static for windows-x86_64, " + + "osx-x86_64 or linux-x86_64 installed? https://netty.io/wiki/forked-tomcat-native.html " + + "Is libc6-compat installed (if required)? https://github.com/pires/netty-tcnative-alpine"); + } + LOG.debug("Enforcing SSL encryption for PostgreSQL connections, " + + "requiring TLSv1.3 with server name certificate, " + + "using {}", (OpenSSLEngineOptions.isAvailable() ? ("OpenSSL " + OpenSsl.versionString()) : "JDKSSL")); + } + + private static boolean isReaderHost(String hostToResolve) { + return hostToResolve.equals(PostgresClient.HOST_READER) || hostToResolve.equals(HOST_READER_ASYNC); + } +} + diff --git a/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientWithAsyncReadConn.java b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientWithAsyncReadConn.java new file mode 100644 index 000000000..40dc089e5 --- /dev/null +++ b/domain-models-runtime/src/main/java/org/folio/rest/persist/PostgresClientWithAsyncReadConn.java @@ -0,0 +1,27 @@ +package org.folio.rest.persist; + +import io.vertx.core.Vertx; + +/** + * Clients which, for performance reasons, would like to take advantage of asynchronous read replication can use + * this class to create their instance of the {@link PostgresClient} using the {@link PostgresClient#getInstance(Vertx)} + * or the {@link PostgresClient#getInstance(Vertx, String)} method. + * Note that the data on the read-only host is asynchronously replicated. This means that it is eventually consistent + * and not ACID. In other words, it is not guaranteed to be up-to-date or synchronized with the read/write host. + * The async read host used by this class is therefore intended to be used in cases where eventually consistent data + * is acceptable, such as reporting. + * APIs using the async client should have a warning in the API documentation that it uses stale data (for performance + * reasons). + */ +public class PostgresClientWithAsyncReadConn extends PostgresClient { + protected PostgresClientWithAsyncReadConn(Vertx vertx, String tenantId) throws Exception { + super(vertx, tenantId); + } + + public static PostgresClient getInstance(Vertx vertx, String tenantId) { + var postgresClient = getInstanceInternal(vertx, tenantId); + var initializer = postgresClient.getPostgresClientInitializer(); + postgresClient.setReaderClient(initializer.getAsyncReadClient()); + return postgresClient; + } +} diff --git a/domain-models-runtime/src/main/java/org/folio/rest/persist/cache/CachedConnectionManager.java b/domain-models-runtime/src/main/java/org/folio/rest/persist/cache/CachedConnectionManager.java index 6a6af1b89..1a9dda2e2 100644 --- a/domain-models-runtime/src/main/java/org/folio/rest/persist/cache/CachedConnectionManager.java +++ b/domain-models-runtime/src/main/java/org/folio/rest/persist/cache/CachedConnectionManager.java @@ -3,6 +3,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.folio.rest.persist.PostgresClient; +import org.folio.rest.persist.PostgresClientInitializer; import org.folio.rest.tools.utils.Envs; import io.vertx.core.Vertx; import io.vertx.core.Future; @@ -24,8 +25,8 @@ public class CachedConnectionManager { private static final Logger LOG = LogManager.getLogger(CachedConnectionManager.class); private static final int MAX_POOL_SIZE = getIntFromEnvOrDefault(Envs.DB_MAXSHAREDPOOLSIZE, PostgresClient.DEFAULT_MAX_POOL_SIZE); - private static final int CONNECTION_RELEASE_DELAY_SECONDS = - getIntFromEnvOrDefault(Envs.DB_CONNECTIONRELEASEDELAY, PostgresClient.DEFAULT_CONNECTION_RELEASE_DELAY); + private static final int CONNECTION_RELEASE_DELAY_SECONDS = getIntFromEnvOrDefault(Envs.DB_CONNECTIONRELEASEDELAY, + PostgresClientInitializer.DEFAULT_CONNECTION_RELEASE_DELAY); private final ConnectionCache connectionCache = new ConnectionCache(); diff --git a/domain-models-runtime/src/main/java/org/folio/rest/tools/utils/Envs.java b/domain-models-runtime/src/main/java/org/folio/rest/tools/utils/Envs.java index 0e554c603..58d251ad3 100644 --- a/domain-models-runtime/src/main/java/org/folio/rest/tools/utils/Envs.java +++ b/domain-models-runtime/src/main/java/org/folio/rest/tools/utils/Envs.java @@ -8,8 +8,10 @@ public enum Envs { DB_HOST, DB_HOST_READER, + DB_HOST_READER_ASYNC, DB_PORT, DB_PORT_READER, + DB_PORT_READER_ASYNC, DB_USERNAME, DB_PASSWORD, DB_DATABASE, @@ -66,6 +68,7 @@ private static Object configValue(Envs envs, String value) { switch (envs) { case DB_PORT: case DB_PORT_READER: + case DB_PORT_READER_ASYNC: case DB_QUERYTIMEOUT: case DB_MAXPOOLSIZE: case DB_MAXSHAREDPOOLSIZE: diff --git a/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientIT.java b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientIT.java index f27d35bf9..64ea8e351 100644 --- a/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientIT.java +++ b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientIT.java @@ -458,7 +458,8 @@ public void maxPoolSize(TestContext context) { postgresClient = createA(context, TENANT); JsonObject configuration = postgresClient.getConnectionConfig().copy() .put("maxPoolSize", maxPoolSize); - postgresClient.setClient(PostgresClient.createPgPool(vertx, configuration, false)); + var initializer = new PostgresClientInitializer(Vertx.vertx(), configuration); + postgresClient.setClient(initializer.getClient()); List futures = new ArrayList<>(); for (int i=0; i>future(promise -> diff --git a/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientTest.java b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientTest.java index a53179e3a..e43cb034a 100644 --- a/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientTest.java +++ b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientTest.java @@ -8,9 +8,7 @@ import static org.hamcrest.collection.IsMapContaining.hasEntry; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.text.StringContainsInOrder.stringContainsInOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.lang.reflect.Method; @@ -31,7 +29,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgConnection; -import io.vertx.pgclient.PgPool; import io.vertx.pgclient.impl.RowImpl; import io.vertx.sqlclient.Query; import io.vertx.sqlclient.Row; @@ -54,7 +51,6 @@ import org.junit.Before; import org.junit.Test; - public class PostgresClientTest { // See PostgresClientIT.java for the tests that require a postgres database! @@ -66,11 +62,6 @@ public class PostgresClientTest { private static final int DEFAULT_LIMIT = 10; - private static final String HOST = "host"; - private static final String HOST_READER = "host_reader"; - private static final String POST = "post"; - private static final String POST_READER = "post_reader"; - @Before public void initConfig() { oldConfigFilePath = PostgresClient.getConfigFilePath(); @@ -169,7 +160,8 @@ public void getConnectionConfig() throws Exception { @Test public void testPgConnectOptionsEmpty() { JsonObject conf = new JsonObject(); - PgConnectOptions options = PostgresClient.createPgConnectOptions(conf, false); + PgConnectOptions options = + PostgresClientInitializer.createPgConnectOptions(conf, PostgresClient.HOST, PostgresClient.PORT); assertThat("localhost", is(options.getHost())); assertThat(5432, is(options.getPort())); assertThat("user", is(options.getUser())); @@ -195,7 +187,8 @@ public void testPgConnectOptionsFull() throws Exception { "DB_RECONNECTINTERVAL", "2000" )); JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); - PgConnectOptions options = PostgresClient.createPgConnectOptions(conf, false); + PgConnectOptions options = + PostgresClientInitializer.createPgConnectOptions(conf, PostgresClient.HOST, PostgresClient.PORT); assertThat(options.getHost(), is("myhost")); assertThat(options.getPort(), is(5433)); assertThat(options.getUser(), is("myuser")); @@ -228,7 +221,8 @@ public void testPgConnectOptionsWithReaderConfig() throws Exception { "DB_RECONNECTINTERVAL", "2000" )); JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); - PgConnectOptions options = PostgresClient.createPgConnectOptions(conf, true); + PgConnectOptions options = + PostgresClientInitializer.createPgConnectOptions(conf, PostgresClient.HOST_READER, PostgresClient.PORT_READER); assertThat(options.getHost(), is("myhost_reader")); assertThat(options.getPort(), is(12345)); assertThat(options.getUser(), is("myuser")); @@ -257,24 +251,143 @@ public void testPgConnectOptionsWithMissingReaderPort() throws Exception { "DB_RECONNECTINTERVAL", "2000" )); JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); - PgConnectOptions options = PostgresClient.createPgConnectOptions(conf, true); + PgConnectOptions options = PostgresClientInitializer.createPgConnectOptions(conf, + PostgresClientInitializer.HOST_READER_ASYNC, PostgresClientInitializer.PORT_READER_ASYNC); assertNull(options); } finally { // restore defaults Envs.setEnv(System.getenv()); } } - @Test public void testGettingPgPoolWithMissingReaderHost() throws Exception { try { - JsonObject config = new JsonObject(); - config.put("DB_PORT_READER", "5433") - .put("DB_USERNAME", "myuser") - .put("DB_PASSWORD", "mypassword"); + Envs.setEnv(Map.of( + "DB_PORT_READER", "12345", + "DB_HOST", "myhost", + "DB_PORT", "5433", + "DB_USERNAME", "myuser", + "DB_PASSWORD", "mypassword", + "DB_DATABASE", "mydatabase", + "DB_CONNECTIONRELEASEDELAY", "1000", + "DB_RECONNECTATTEMPTS", "3", + "DB_RECONNECTINTERVAL", "2000" + )); + JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); + var initializer = new PostgresClientInitializer(Vertx.vertx(), conf); + assertNotNull(initializer.getClient()); + assertNotNull(initializer.getSyncReadClient()); + assertNotNull(initializer.getAsyncReadClient()); + assertEquals(initializer.getClient(), initializer.getSyncReadClient()); + assertEquals(initializer.getClient(), initializer.getAsyncReadClient()); + } finally { + // restore defaults + Envs.setEnv(System.getenv()); + } + } + + @Test + public void testGettingPgPoolWithNoReader() throws Exception { + try { + Envs.setEnv(Map.of( + "DB_HOST", "myhost", + "DB_PORT", "5433", + "DB_USERNAME", "myuser", + "DB_PASSWORD", "mypassword", + "DB_DATABASE", "mydatabase", + "DB_CONNECTIONRELEASEDELAY", "1000", + "DB_RECONNECTATTEMPTS", "3", + "DB_RECONNECTINTERVAL", "2000" + )); + JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); + var initializer = new PostgresClientInitializer(Vertx.vertx(), conf); + assertNotNull(initializer.getClient()); + assertNotNull(initializer.getSyncReadClient()); + assertNotNull(initializer.getAsyncReadClient()); + assertEquals(initializer.getClient(), initializer.getSyncReadClient()); + assertEquals(initializer.getClient(), initializer.getAsyncReadClient()); + } finally { + // restore defaults + Envs.setEnv(System.getenv()); + } + } + + @Test + public void testGettingPgPoolWithReaderHost() throws Exception { + try { + Envs.setEnv(Map.of( + "DB_PORT_READER", "12345", + "DB_HOST_READER", "myhost_reader", + "DB_HOST", "myhost", + "DB_PORT", "5433", + "DB_USERNAME", "myuser", + "DB_PASSWORD", "mypassword", + "DB_DATABASE", "mydatabase", + "DB_CONNECTIONRELEASEDELAY", "1000", + "DB_RECONNECTATTEMPTS", "3", + "DB_RECONNECTINTERVAL", "2000" + )); + JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); + var initializer = new PostgresClientInitializer(Vertx.vertx(), conf); + assertNotNull(initializer.getClient()); + assertNotNull(initializer.getSyncReadClient()); + assertNotNull(initializer.getAsyncReadClient()); + assertNotEquals(initializer.getClient(), initializer.getSyncReadClient()); + assertNotEquals(initializer.getClient(), initializer.getAsyncReadClient()); + assertEquals(initializer.getSyncReadClient(), initializer.getAsyncReadClient()); + } finally { + // restore defaults + Envs.setEnv(System.getenv()); + } + } + + @Test + public void testGettingPgPoolWithAsyncReaderHostOnly() throws Exception { + try { + Envs.setEnv(Map.of( + "DB_PORT_READER_ASYNC", "12345", + "DB_HOST_READER_ASYNC", "myhost_reader", + "DB_HOST", "myhost", + "DB_PORT", "5433", + "DB_USERNAME", "myuser", + "DB_PASSWORD", "mypassword", + "DB_DATABASE", "mydatabase", + "DB_CONNECTIONRELEASEDELAY", "1000", + "DB_RECONNECTATTEMPTS", "3", + "DB_RECONNECTINTERVAL", "2000" + )); + JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); + var initializer = new PostgresClientInitializer(Vertx.vertx(), conf); + assertNotNull(initializer.getClient()); + assertNotNull(initializer.getSyncReadClient()); + assertNotNull(initializer.getAsyncReadClient()); + assertEquals(initializer.getClient(), initializer.getSyncReadClient()); + assertNotEquals(initializer.getClient(), initializer.getAsyncReadClient()); + } finally { + // restore defaults + Envs.setEnv(System.getenv()); + } + } - PgPool pgPool = PostgresClient.createPgPool(Vertx.vertx(), config, true); - assertNull(pgPool); + @Test + public void testGettingPgPoolWithAllThreeHostTypes() throws Exception { + try { + Envs.setEnv(Map.of( + "DB_PORT_READER_ASYNC", "12345", + "DB_HOST_READER_ASYNC", "myhost_reader_async", + "DB_PORT_READER", "54321", + "DB_HOST_READER", "myhost_reader", + "DB_HOST", "myhost", + "DB_PORT", "5433" + )); + JsonObject conf = new PostgresClient(Vertx.vertx(), "public").getConnectionConfig(); + var initializer = new PostgresClientInitializer(Vertx.vertx(), conf); + assertNotNull(initializer.getClient()); + assertNotNull(initializer.getSyncReadClient()); + assertNotNull(initializer.getAsyncReadClient()); + assertNotEquals(initializer.getClient(), initializer.getSyncReadClient()); + assertNotEquals(initializer.getClient(), initializer.getAsyncReadClient()); + assertNotEquals(initializer.getSyncReadClient(), initializer.getAsyncReadClient()); } finally { // restore defaults Envs.setEnv(System.getenv()); diff --git a/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientWithAsyncReadConnTest.java b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientWithAsyncReadConnTest.java new file mode 100644 index 000000000..a9aa43c79 --- /dev/null +++ b/domain-models-runtime/src/test/java/org/folio/rest/persist/PostgresClientWithAsyncReadConnTest.java @@ -0,0 +1,36 @@ +package org.folio.rest.persist; + +import io.vertx.core.Vertx; +import org.folio.rest.tools.utils.Envs; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.*; + +public class PostgresClientWithAsyncReadConnTest { + + @Test + public void getInstanceTest() { + Envs.setEnv(Map.of( + "DB_HOST_READER_ASYNC", "myhost_reader_async", + "DB_PORT_READER_ASYNC", "54321", + "DB_HOST_READER", "myhost_reader", + "DB_PORT_READER", "12345", + "DB_HOST", "myhost", + "DB_PORT", "5433", + "DB_USERNAME", "myuser", + "DB_PASSWORD", "mypassword", + "DB_DATABASE", "mydatabase", + "DB_CONNECTIONRELEASEDELAY", "1000" + )); + var vertx = Vertx.vertx(); + var tenant = "testTenant"; + var client = PostgresClientWithAsyncReadConn.getInstance(vertx, tenant); + var initializer = client.getPostgresClientInitializer(); + assertNotNull(client); + assertNotNull(client.getReaderClient()); + assertNotEquals(client.getClient(), client.getReaderClient()); + assertEquals(client.getReaderClient(), initializer.getAsyncReadClient()); + } +}