Skip to content

Commit

Permalink
Align behaviors about time zone
Browse files Browse the repository at this point in the history
- Add `preserveInstants`, `connectionTimeZone` and `forceConnectionTimeZoneToSession`
- Default `connectionTimeZone` to "SERVER" to preserve behavior as before
- Mark `serverZoneId` as deprecated. It will notice users to use `connectionTimeZone` instead
  • Loading branch information
mirromutth committed Feb 19, 2024
1 parent 98741f2 commit 2c87dd5
Show file tree
Hide file tree
Showing 25 changed files with 675 additions and 186 deletions.
2 changes: 1 addition & 1 deletion r2dbc-mysql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ public final class ConnectionContext implements CodecContext {

private final int localInfileBufferSize;

private final boolean preserveInstants;

@Nullable
private ZoneId serverZoneId;
private ZoneId timeZone;

/**
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or
Expand All @@ -60,12 +62,18 @@ public final class ConnectionContext implements CodecContext {
@Nullable
private volatile Capability capability = null;

ConnectionContext(ZeroDateOption zeroDateOption, @Nullable Path localInfilePath,
int localInfileBufferSize, @Nullable ZoneId serverZoneId) {
ConnectionContext(
ZeroDateOption zeroDateOption,
@Nullable Path localInfilePath,
int localInfileBufferSize,
boolean preserveInstants,
@Nullable ZoneId timeZone
) {
this.zeroDateOption = requireNonNull(zeroDateOption, "zeroDateOption must not be null");
this.localInfilePath = localInfilePath;
this.localInfileBufferSize = localInfileBufferSize;
this.serverZoneId = serverZoneId;
this.preserveInstants = preserveInstants;
this.timeZone = timeZone;
}

/**
Expand Down Expand Up @@ -101,27 +109,33 @@ public CharCollation getClientCollation() {
}

@Override
public ZoneId getServerZoneId() {
if (serverZoneId == null) {
public boolean isPreserveInstants() {
return preserveInstants;
}

@Override
public ZoneId getTimeZone() {
if (timeZone == null) {
throw new IllegalStateException("Server timezone have not initialization");
}
return serverZoneId;
return timeZone;
}

@Override
public boolean isMariaDb() {
return capability.isMariaDb() || serverVersion.isMariaDb();
public boolean isTimeZoneInitialized() {
return timeZone != null;
}

boolean shouldSetServerZoneId() {
return serverZoneId == null;
@Override
public boolean isMariaDb() {
Capability capability = this.capability;
return (capability != null && capability.isMariaDb()) || serverVersion.isMariaDb();
}

void setServerZoneId(ZoneId serverZoneId) {
if (this.serverZoneId != null) {
void setTimeZone(ZoneId timeZone) {
if (this.timeZone != null) {
throw new IllegalStateException("Server timezone have been initialized");
}
this.serverZoneId = serverZoneId;
this.timeZone = timeZone;
}

@Override
Expand Down
115 changes: 40 additions & 75 deletions r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.Readable;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -64,12 +65,6 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private static final String PING_MARKER = "/* ping */";

private static final String ZONE_PREFIX_POSIX = "posix/";

private static final String ZONE_PREFIX_RIGHT = "right/";

private static final int PREFIX_LENGTH = 6;

private static final ServerVersion MARIA_11_1_1 = ServerVersion.create(11, 1, 1, true);

private static final ServerVersion MYSQL_8_0_3 = ServerVersion.create(8, 0, 3);
Expand Down Expand Up @@ -333,7 +328,8 @@ public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
requireNonNull(isolationLevel, "isolationLevel must not be null");

// Set subsequent transaction isolation level.
return QueryFlow.executeVoid(client, "SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
return QueryFlow.executeVoid(client,
"SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> {
this.sessionLevel = isolationLevel;
if (!this.isInTransaction()) {
Expand Down Expand Up @@ -436,7 +432,7 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
final ServerVersion serverVersion = context.getServerVersion();
final long timeoutMs = timeout.toMillis();
final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0
: "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs;
: "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs;

// mariadb: https://mariadb.com/kb/en/aborting-statements/
// mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/
Expand Down Expand Up @@ -485,10 +481,10 @@ static Mono<MySqlConnection> init(
Mono<MySqlConnection> connection = initSessionVariables(client, sessionVariables)
.then(loadSessionVariables(client, codecs, context))
.map(data -> {
ZoneId serverZoneId = data.serverZoneId;
if (serverZoneId != null) {
logger.debug("Set server time zone to {} from init query", serverZoneId);
context.setServerZoneId(serverZoneId);
ZoneId timeZone = data.timeZone;
if (timeZone != null) {
logger.debug("Got server time zone {} from loading session variables", timeZone);
context.setTimeZone(timeZone);
}

return new MySqlConnection(client, context, codecs, data.level, data.lockWaitTimeout,
Expand Down Expand Up @@ -531,21 +527,21 @@ private static Mono<Void> initSessionVariables(Client client, List<String> sessi
return QueryFlow.executeVoid(client, query.toString());
}

private static Mono<InitData> loadSessionVariables(
private static Mono<SessionData> loadSessionVariables(
Client client, Codecs codecs, ConnectionContext context
) {
StringBuilder query = new StringBuilder(160)
.append("SELECT ")
.append(transactionIsolationColumn(context))
.append(",@@innodb_lock_wait_timeout AS l,@@version_comment AS v");

Function<MySqlResult, Flux<InitData>> handler;
Function<MySqlResult, Flux<SessionData>> handler;

if (context.shouldSetServerZoneId()) {
query.append(",@@system_time_zone AS s,@@time_zone AS t");
handler = MySqlConnection::fullInit;
if (context.isTimeZoneInitialized()) {
handler = r -> convertSessionData(r, false);
} else {
handler = MySqlConnection::init;
query.append(",@@system_time_zone AS s,@@time_zone AS t");
handler = r -> convertSessionData(r, true);
}

return new TextSimpleStatement(client, codecs, context, query.toString())
Expand All @@ -569,70 +565,39 @@ private static Mono<Void> initDatabase(Client client, String database) {
});
}

private static Flux<InitData> init(MySqlResult r) {
return r.map((row, meta) -> new InitData(convertIsolationLevel(row.get(0, String.class)),
convertLockWaitTimeout(row.get(1, Long.class)),
row.get(2, String.class), null));
}

private static Flux<InitData> fullInit(MySqlResult r) {
return r.map((row, meta) -> {
IsolationLevel level = convertIsolationLevel(row.get(0, String.class));
long lockWaitTimeout = convertLockWaitTimeout(row.get(1, Long.class));
String product = row.get(2, String.class);
String systemTimeZone = row.get(3, String.class);
String timeZone = row.get(4, String.class);
ZoneId zoneId;

if (timeZone == null || timeZone.isEmpty() || "SYSTEM".equalsIgnoreCase(timeZone)) {
if (systemTimeZone == null || systemTimeZone.isEmpty()) {
logger.warn("MySQL does not return any timezone, trying to use system default timezone");
zoneId = ZoneId.systemDefault();
} else {
zoneId = convertZoneId(systemTimeZone);
}
} else {
zoneId = convertZoneId(timeZone);
}
private static Flux<SessionData> convertSessionData(MySqlResult r, boolean timeZone) {
return r.map(readable -> {
IsolationLevel level = convertIsolationLevel(readable.get(0, String.class));
long lockWaitTimeout = convertLockWaitTimeout(readable.get(1, Long.class));
String product = readable.get(2, String.class);

return new InitData(level, lockWaitTimeout, product, zoneId);
return new SessionData(level, lockWaitTimeout, product, timeZone ? readZoneId(readable) : null);
});
}

/**
* Creates a {@link ZoneId} from MySQL timezone result, or fallback to system default timezone if not
* found.
*
* @param id the ID/name of MySQL timezone.
* @return the {@link ZoneId}.
*/
private static ZoneId convertZoneId(String id) {
String realId;
private static ZoneId readZoneId(Readable readable) {
String systemTimeZone = readable.get(3, String.class);
String timeZone = readable.get(4, String.class);

if (id.startsWith(ZONE_PREFIX_POSIX) || id.startsWith(ZONE_PREFIX_RIGHT)) {
realId = id.substring(PREFIX_LENGTH);
if (timeZone == null || timeZone.isEmpty() || "SYSTEM".equalsIgnoreCase(timeZone)) {
if (systemTimeZone == null || systemTimeZone.isEmpty()) {
logger.warn("MySQL does not return any timezone, trying to use system default timezone");
return ZoneId.systemDefault().normalized();
} else {
return convertZoneId(systemTimeZone);
}
} else {
realId = id;
return convertZoneId(timeZone);
}
}

private static ZoneId convertZoneId(String id) {
try {
switch (realId) {
case "Factory":
// It seems like UTC.
return ZoneOffset.UTC;
case "America/Nuuk":
// America/Godthab is the same as America/Nuuk, with DST.
return ZoneId.of("America/Godthab");
case "ROC":
// It is equal to +08:00.
return ZoneId.of("+8");
}

return ZoneId.of(realId, ZoneId.SHORT_IDS);
return StringUtils.parseZoneId(id);
} catch (DateTimeException e) {
logger.warn("The server timezone is unknown <{}>, trying to use system default timezone", id, e);

return ZoneId.systemDefault();
return ZoneId.systemDefault().normalized();
}
}

Expand Down Expand Up @@ -691,7 +656,7 @@ private static String transactionIsolationColumn(ConnectionContext context) {
"@@transaction_isolation AS i" : "@@tx_isolation AS i";
}

private static class InitData {
private static class SessionData {

private final IsolationLevel level;

Expand All @@ -701,14 +666,14 @@ private static class InitData {
private final String product;

@Nullable
private final ZoneId serverZoneId;
private final ZoneId timeZone;

private InitData(IsolationLevel level, long lockWaitTimeout, @Nullable String product,
@Nullable ZoneId serverZoneId) {
private SessionData(IsolationLevel level, long lockWaitTimeout, @Nullable String product,
@Nullable ZoneId timeZone) {
this.level = level;
this.lockWaitTimeout = lockWaitTimeout;
this.product = product;
this.serverZoneId = serverZoneId;
this.timeZone = timeZone;
}
}
}
Loading

0 comments on commit 2c87dd5

Please sign in to comment.