Skip to content

Commit

Permalink
RMB-975: Define DB_HOST_ASYNC_READER, DB_PORT_ASYNC_READER (#1162)
Browse files Browse the repository at this point in the history
Defines an asynchronous reader client that is a drop-in replacement for the existing PostgresClient which developers can use for read operations where eventual consistency is acceptable (reporting, etc).
  • Loading branch information
steveellis authored Jul 19, 2024
1 parent 2fded21 commit 24e2b7b
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 161 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ to a running PostgreSQL query. 0 disables this timeout and is the default.
To take effect an RMB based module must get it via PostgresClient.getConnectionConfig().getInteger("queryTimeout")
and pass it to the RMB method that starts the connection, transaction or query.

The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the [Read and write database instances setup](#read-and-write-database-instances-setup).
The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the synchronously replicated [read and write database instances setup](#read-and-write-database-instances-setup).

The environment variables `DB_HOST_ASYNC_READER` and `DB_PORT_ASYNC_READER` are for the asynchronously replicated read and write database instances setup.

`DB_ALLOW_SUPPRESS_OPTIMISTIC_LOCKING` is a timestamp in the format `2022-12-31T23:59:59Z`. Setting it disables optimistic locking when sending a record that contains `"_version":-1` before that time, after that time `"_version":-1` is rejected. This applies only to tables with `failOnConflictUnlessSuppressed`, see below. The timestamp ensures that disabling this option cannot be forgotten. Suppressing optimistic locking is known to lead to data loss in some cases, don't use in production, you have been warned!

Expand All @@ -415,13 +417,17 @@ The environment variables `DB_HOST_READER` and `DB_PORT_READER` are for the [Rea
See the [Environment Variables](https://github.com/folio-org/okapi/blob/master/doc/guide.md#environment-variables) section of the Okapi Guide for more information on how to deploy environment variables to RMB modules via Okapi.

## Read and write database instances setup
A PostgreSQL instance (the write instance) can be replicated for horizontal scaling (scale out), each replica is a read-only standby instance.
A PostgreSQL instance (the write instance) can be replicated for horizontal scaling (scale out). Each replica is a read-only standby instance.

RMB supports separating read and write requests. By default the write instance configured with `DB_HOST` and `DB_PORT` environment variables is used for reading as well, but optionally a read instance (or a load balancer for multiple read instances) can be configured by setting its host and port using the `DB_HOST_READER` and `DB_PORT_READER` environment variables. If either of these reader variables are not set then it will default to use the writer instance.

RMB only supports [synchronous standby servers](https://www.postgresql.org/docs/current/warm-standby.html#SYNCHRONOUS-REPLICATION). The write instance must list the `DB_*_READER` instance(s) in its `synchronous_standby_names` configuration. This ensures ACID for both write and read instance.
RMB supports both [synchronous standby servers](https://www.postgresql.org/docs/current/warm-standby.html#SYNCHRONOUS-REPLICATION) and asynchronously replicated standby servers (the default)[https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION]. When using synchronous replication, write instance must list the `DB_HOST_READER` instance(s) in its `synchronous_standby_names` configuration. This ensures ACID for both write and read instance.

PostgreSQL's default asynchronous replication is supported by RMB by configuring `DB_HOST_ASYNC_READER` and `DB_PORT_ASYNC_READER`. Asynchronous replication is eventually consistent and suitable for read-only applications like reporting, analytics, and data warehousing. To use the async read host in queries, get an instance of `PostgresClient` using `PostgresClientWithAsyncReadConn.getInstance(...)`. If no async read host is configured, it falls back to the sync read host if configured, otherwise it uses the write host.

AWS RDS does not support synchronous replication. For AWS it is recommended to only use `DB_HOST` and `DB_HOST_ASYNC_READER` in a given deployment.

PostgreSQL's default asynchronous replication is not supported by RMB and will result in errors caused by outdated data the replica may return. Asynchronous replication is eventual consistency suitable for read-only applications like reporting, analytics, and data warehouse.
APIs using the async client should provide a warning in the API documentation that the API uses stale data (for performance reasons).

## Local development server

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import freemarker.template.TemplateException;
import io.netty.handler.ssl.OpenSsl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgPool;
import io.vertx.pgclient.SslMode;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedStatement;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
Expand All @@ -41,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -75,8 +66,11 @@ public class PostgresClient {
public static final String DEFAULT_SCHEMA = "public";
public static final String DEFAULT_JSONB_FIELD_NAME = "jsonb";
public static final int DEFAULT_MAX_POOL_SIZE = 4;
/** default release delay in milliseconds; after this time an idle database connection is closed */
public static final int DEFAULT_CONNECTION_RELEASE_DELAY = 60000;

protected static final String MODULE_NAME = getModuleNameValue("getModuleName");
protected static final String PG_APPLICATION_NAME = MODULE_NAME.replace('_', '-') + "-"
+ getModuleNameValue("getModuleVersion");
protected static final String MAX_SHARED_POOL_SIZE = "maxSharedPoolSize";

static Logger log = LogManager.getLogger(PostgresClient.class);

Expand All @@ -88,6 +82,17 @@ public class PostgresClient {
static final int STREAM_GET_DEFAULT_CHUNK_SIZE = 100;
static final ObjectMapper MAPPER = ObjectMapperTool.getMapper();

@SuppressWarnings("java:S2068") // suppress "Hard-coded credentials are security-sensitive"
// we use it as a key in the config. We use it as a default password only when testing
// using embedded postgres, see getPostgreSQLClientConfig
static final String PASSWORD = "password";
static final String USERNAME = "username";
static final String HOST = "host";
static final String HOST_READER = "host_reader";
static final String PORT = "port";
static final String PORT_READER = "port_reader";
static final String DATABASE = "database";

/**
* True if all tenants of a Vertx share one PgPool, false for having a separate PgPool for
* each combination of tenant and Vertx (= each PostgresClient has its own PgPool).
Expand All @@ -98,14 +103,8 @@ public class PostgresClient {
*/
private static boolean sharedPgPool;

private static final String MODULE_NAME = getModuleNameValue("getModuleName");
private static final String PG_APPLICATION_NAME = MODULE_NAME.replace('_', '-') + "-"
+ getModuleNameValue("getModuleVersion");
private static final String ID_FIELD = "id";

private static final String CONNECTION_RELEASE_DELAY = "connectionReleaseDelay";
private static final String MAX_POOL_SIZE = "maxPoolSize";
private static final String MAX_SHARED_POOL_SIZE = "maxSharedPoolSize";
private static final String POSTGRES_LOCALHOST_CONFIG = "/postgres-conf.json";

private static PostgresTester postgresTester;
Expand All @@ -114,24 +113,10 @@ public class PostgresClient {
private static final String FROM = " FROM ";
private static final String WHERE = " WHERE ";

@SuppressWarnings("java:S2068") // suppress "Hard-coded credentials are security-sensitive"
// we use it as a key in the config. We use it as a default password only when testing
// using embedded postgres, see getPostgreSQLClientConfig
private static final String PASSWORD = "password";
private static final String USERNAME = "username";
private static final String HOST = "host";
private static final String HOST_READER = "host_reader";
private static final String PORT = "port";
private static final String PORT_READER = "port_reader";
private static final String DATABASE = "database";
private static final String RECONNECT_ATTEMPTS = "reconnectAttempts";
private static final String RECONNECT_INTERVAL = "reconnectInterval";
private static final String SERVER_PEM = "server_pem";
private static final String POSTGRES_TESTER = "postgres_tester";

private static final String GET_STAT_METHOD = "get";
private static final String EXECUTE_STAT_METHOD = "execute";

private static final String PROCESS_RESULTS_STAT_METHOD = "processResults";

private static final String SPACE = " ";
Expand Down Expand Up @@ -171,6 +156,7 @@ public class PostgresClient {

private final Vertx vertx;
private JsonObject postgreSQLClientConfig = null;

/**
* PgPool client that is initialized with mainly the database writer instance's connection string.
*/
Expand All @@ -182,6 +168,7 @@ public class PostgresClient {
private PgPool readClient;
private final String tenantId;
private final String schemaName;
private PostgresClientInitializer postgresClientInitializer;

protected PostgresClient(Vertx vertx, String tenantId) throws Exception {
this.tenantId = tenantId;
Expand All @@ -190,7 +177,6 @@ protected PostgresClient(Vertx vertx, String tenantId) throws Exception {
init();
}


/**
* test constructor for unit testing
*/
Expand Down Expand Up @@ -317,7 +303,7 @@ static Long getExplainQueryThreshold() {
* @param tenantId the tenantId the instance is for
* @return the PostgresClient instance, or null on error
*/
private static PostgresClient getInstanceInternal(Vertx vertx, String tenantId) {
protected static PostgresClient getInstanceInternal(Vertx vertx, String tenantId) {
// assumes a single thread vertx model so no sync needed
PostgresClient postgresClient = CONNECTION_POOL.get(vertx, tenantId);
try {
Expand Down Expand Up @@ -433,6 +419,14 @@ void setClient(PgPool client) {
this.client = client;
}

/**
* Get the {@link PostgresClientInitializer} for this {@link PostgresClient} instance.
* @return A reference to the initializer.
*/
PostgresClientInitializer getPostgresClientInitializer() {
return this.postgresClientInitializer;
}

/**
* Close the SQL client of this PostgresClient instance.
* This is idempotent: additional close invocations are always successful.
Expand Down Expand Up @@ -512,78 +506,8 @@ public static void closeAllClients() {
PG_POOLS.clear();
PG_POOLS_READER.values().forEach(PgPool::close);
PG_POOLS_READER.clear();

}

static PgConnectOptions createPgConnectOptions(JsonObject sqlConfig, boolean isReader) {
PgConnectOptions pgConnectOptions = new PgConnectOptions();
pgConnectOptions.addProperty("application_name", PG_APPLICATION_NAME);

String hostToResolve = HOST;
String portToResolve = PORT;
if (isReader) {
hostToResolve = HOST_READER;
portToResolve = PORT_READER;
}

String host = sqlConfig.getString(hostToResolve);
if (host != null) {
pgConnectOptions.setHost(host);
}

Integer port;
port = sqlConfig.getInteger(portToResolve);

if (port != null) {
pgConnectOptions.setPort(port);
}

if (isReader && (host == null || port == null)) {
return null;
}

String username = sqlConfig.getString(USERNAME);
if (username != null) {
pgConnectOptions.setUser(username);
}
String password = sqlConfig.getString(PASSWORD);
if (password != null) {
pgConnectOptions.setPassword(password);
}
String database = sqlConfig.getString(DATABASE);
if (database != null) {
pgConnectOptions.setDatabase(database);
}
Integer reconnectAttempts = sqlConfig.getInteger(RECONNECT_ATTEMPTS);
if (reconnectAttempts != null) {
pgConnectOptions.setReconnectAttempts(reconnectAttempts);
}
Long reconnectInterval = sqlConfig.getLong(RECONNECT_INTERVAL);
if (reconnectInterval != null) {
pgConnectOptions.setReconnectInterval(reconnectInterval);
}

String serverPem = sqlConfig.getString(SERVER_PEM);
if (serverPem != null) {
pgConnectOptions.setSslMode(SslMode.VERIFY_FULL);
pgConnectOptions.setHostnameVerificationAlgorithm("HTTPS");
pgConnectOptions.setPemTrustOptions(
new PemTrustOptions().addCertValue(Buffer.buffer(serverPem)));
pgConnectOptions.setEnabledSecureTransportProtocols(Collections.singleton("TLSv1.3"));
if (OpenSSLEngineOptions.isAvailable()) {
pgConnectOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions());
} else {
pgConnectOptions.setJdkSslEngineOptions(new JdkSSLEngineOptions());
log.error("Cannot run OpenSSL, using slow JDKSSL. Is netty-tcnative-boringssl-static for windows-x86_64, "
+ "osx-x86_64 or linux-x86_64 installed? https://netty.io/wiki/forked-tomcat-native.html "
+ "Is libc6-compat installed (if required)? https://github.com/pires/netty-tcnative-alpine");
}
log.debug("Enforcing SSL encryption for PostgreSQL connections, "
+ "requiring TLSv1.3 with server name certificate, "
+ "using " + (OpenSSLEngineOptions.isAvailable() ? "OpenSSL " + OpenSsl.versionString() : "JDKSSL"));
}
return pgConnectOptions;
}

private void init() throws Exception {

Expand All @@ -602,38 +526,14 @@ private void init() throws Exception {
}
logPostgresConfig();

this.postgresClientInitializer = new PostgresClientInitializer(vertx, postgreSQLClientConfig);
if (sharedPgPool) {
client = PG_POOLS.computeIfAbsent(vertx, x -> createPgPool(vertx, postgreSQLClientConfig, false));
readClient = PG_POOLS_READER.computeIfAbsent(vertx, x -> createPgPool(vertx, postgreSQLClientConfig, true));
client = PG_POOLS.computeIfAbsent(vertx, x -> postgresClientInitializer.getClient());
readClient = PG_POOLS_READER.computeIfAbsent(vertx, x -> postgresClientInitializer.getSyncReadClient());
} else {
client = createPgPool(vertx, postgreSQLClientConfig, false);
readClient = createPgPool(vertx, postgreSQLClientConfig, true);
client = postgresClientInitializer.getClient();
readClient = postgresClientInitializer.getSyncReadClient();
}

readClient = readClient != null ? readClient : client;
}

static PgPool createPgPool(Vertx vertx, JsonObject configuration, Boolean isReader) {
PgConnectOptions connectOptions = createPgConnectOptions(configuration, isReader);

if (connectOptions == null) {
return null;
}

PoolOptions poolOptions = new PoolOptions();
poolOptions.setMaxSize(
configuration.getInteger(MAX_SHARED_POOL_SIZE, configuration.getInteger(MAX_POOL_SIZE, DEFAULT_MAX_POOL_SIZE)));

poolOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);

if (sharedPgPool) {
poolOptions.setIdleTimeout(0); // The manager fully manages this.
} else {
var connectionReleaseDelay = configuration.getInteger(CONNECTION_RELEASE_DELAY, DEFAULT_CONNECTION_RELEASE_DELAY);
poolOptions.setIdleTimeout(connectionReleaseDelay);
}

return PgPool.pool(vertx, connectOptions, poolOptions);
}

/**
Expand Down Expand Up @@ -3551,7 +3451,7 @@ public Future<PgConnection> getReadConnection() {
* @see #withTransaction(Function)
*/
public Future<PgConnection> getConnection(PgPool client) {
if (!isSharedPool()) {
if (!sharedPgPool) {
return client.getConnection().map(PgConnection.class::cast);
}

Expand Down
Loading

0 comments on commit 24e2b7b

Please sign in to comment.