-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StatePersistence object #13900
Add StatePersistence object #13900
Changes from all commits
7707a97
85195ba
657e83e
34665b5
ed53852
9e7d7ec
f39edeb
40656ce
d53b3ec
3526c5d
2ed47b9
d14cee1
fbaedfc
b96463f
6fc0bee
d0f5621
fdf61d4
de2f2f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,323 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.config.persistence; | ||
|
||
import static io.airbyte.db.instance.configs.jooq.generated.Tables.STATE; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.enums.Enums; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.config.StateType; | ||
import io.airbyte.config.StateWrapper; | ||
import io.airbyte.db.Database; | ||
import io.airbyte.db.ExceptionWrappingDatabase; | ||
import io.airbyte.protocol.models.AirbyteGlobalState; | ||
import io.airbyte.protocol.models.AirbyteStateMessage; | ||
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; | ||
import io.airbyte.protocol.models.AirbyteStreamState; | ||
import io.airbyte.protocol.models.StreamDescriptor; | ||
import java.io.IOException; | ||
import java.time.OffsetDateTime; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.stream.Collectors; | ||
import org.jooq.Condition; | ||
import org.jooq.DSLContext; | ||
import org.jooq.Field; | ||
import org.jooq.JSONB; | ||
import org.jooq.Record; | ||
import org.jooq.RecordMapper; | ||
import org.jooq.impl.DSL; | ||
|
||
/** | ||
* State Persistence | ||
* | ||
* Handle persisting States to the Database. | ||
* | ||
* Supports migration from Legacy to Global or Stream. Other type migrations need to go through a | ||
* reset. (an exception will be thrown) | ||
*/ | ||
public class StatePersistence { | ||
|
||
private final ExceptionWrappingDatabase database; | ||
|
||
public StatePersistence(final Database database) { | ||
this.database = new ExceptionWrappingDatabase(database); | ||
} | ||
|
||
/** | ||
* Get the current State of a Connection | ||
* | ||
* @param connectionId | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jdpgrailsdev will our linters scream at us if we have |
||
* @return | ||
* @throws IOException | ||
*/ | ||
public Optional<StateWrapper> getCurrentState(final UUID connectionId) throws IOException { | ||
final List<StateRecord> records = this.database.query(ctx -> getStateRecords(ctx, connectionId)); | ||
|
||
if (records.isEmpty()) { | ||
return Optional.empty(); | ||
} | ||
|
||
return switch (getStateType(connectionId, records)) { | ||
case GLOBAL -> Optional.of(buildGlobalState(records)); | ||
case STREAM -> Optional.of(buildStreamState(records)); | ||
default -> Optional.of(buildLegacyState(records)); | ||
}; | ||
} | ||
|
||
/** | ||
* Create or update the states described in the StateWrapper. Null states will be deleted. | ||
* | ||
* The only state migrations supported are going from a Legacy state to either a Global or Stream | ||
* state. Other state type migrations should go through an explicit reset. An exception will be | ||
* thrown to prevent the system from getting into a bad state. | ||
* | ||
* @param connectionId | ||
* @param state | ||
* @throws IOException | ||
*/ | ||
public void updateOrCreateState(final UUID connectionId, final StateWrapper state) throws IOException { | ||
final Optional<StateWrapper> previousState = getCurrentState(connectionId); | ||
final boolean isMigration = previousState.isPresent() && previousState.get().getStateType() == StateType.LEGACY && | ||
cgardens marked this conversation as resolved.
Show resolved
Hide resolved
|
||
state.getStateType() != StateType.LEGACY; | ||
|
||
// The only case where we allow a state migration is moving from LEGACY. | ||
// We expect any other migration to go through an explicit reset. | ||
if (!isMigration && previousState.isPresent() && previousState.get().getStateType() != state.getStateType()) { | ||
cgardens marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throw new IllegalStateException("Unexpected type migration from '" + previousState.get().getStateType() + "' to '" + state.getStateType() + | ||
"'. Migration of StateType need to go through an explicit reset."); | ||
} | ||
|
||
this.database.transaction(ctx -> { | ||
if (isMigration) { | ||
clearLegacyState(ctx, connectionId); | ||
} | ||
switch (state.getStateType()) { | ||
case GLOBAL -> saveGlobalState(ctx, connectionId, state.getGlobal().getGlobal()); | ||
case STREAM -> saveStreamState(ctx, connectionId, state.getStateMessages()); | ||
case LEGACY -> saveLegacyState(ctx, connectionId, state.getLegacyState()); | ||
} | ||
return null; | ||
}); | ||
} | ||
|
||
private static void clearLegacyState(final DSLContext ctx, final UUID connectionId) { | ||
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, null); | ||
} | ||
|
||
private static void saveGlobalState(final DSLContext ctx, final UUID connectionId, final AirbyteGlobalState globalState) { | ||
writeStateToDb(ctx, connectionId, null, null, StateType.GLOBAL, globalState.getSharedState()); | ||
for (final AirbyteStreamState streamState : globalState.getStreamStates()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Any reason not to use a stream/lambda for this? Are we concerned about swallowing any exceptions raised by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is very procedural in the end, a for loop felt more straightforward in this case. |
||
writeStateToDb(ctx, | ||
connectionId, | ||
streamState.getStreamDescriptor().getName(), | ||
streamState.getStreamDescriptor().getNamespace(), | ||
StateType.GLOBAL, | ||
streamState.getStreamState()); | ||
} | ||
} | ||
|
||
private static void saveStreamState(final DSLContext ctx, final UUID connectionId, final List<AirbyteStateMessage> stateMessages) { | ||
for (final AirbyteStateMessage stateMessage : stateMessages) { | ||
final AirbyteStreamState streamState = stateMessage.getStream(); | ||
writeStateToDb(ctx, | ||
connectionId, | ||
streamState.getStreamDescriptor().getName(), | ||
streamState.getStreamDescriptor().getNamespace(), | ||
StateType.STREAM, | ||
streamState.getStreamState()); | ||
} | ||
} | ||
|
||
private static void saveLegacyState(final DSLContext ctx, final UUID connectionId, final JsonNode state) { | ||
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, state); | ||
} | ||
|
||
/** | ||
* Performs the actual SQL operation depending on the state | ||
* | ||
* If the state is null, it will delete the row, otherwise do an insert or update on conflict | ||
*/ | ||
static void writeStateToDb(final DSLContext ctx, | ||
final UUID connectionId, | ||
final String streamName, | ||
final String namespace, | ||
final StateType stateType, | ||
final JsonNode state) { | ||
if (state != null) { | ||
final boolean hasState = ctx.selectFrom(STATE) | ||
.where( | ||
STATE.CONNECTION_ID.eq(connectionId), | ||
isNullOrEquals(STATE.STREAM_NAME, streamName), | ||
isNullOrEquals(STATE.NAMESPACE, namespace)) | ||
.fetch().isNotEmpty(); | ||
|
||
final JSONB jsonbState = JSONB.valueOf(Jsons.serialize(state)); | ||
final OffsetDateTime now = OffsetDateTime.now(); | ||
|
||
if (!hasState) { | ||
ctx.insertInto(STATE) | ||
.columns( | ||
STATE.ID, | ||
STATE.CREATED_AT, | ||
STATE.UPDATED_AT, | ||
STATE.CONNECTION_ID, | ||
STATE.STREAM_NAME, | ||
STATE.NAMESPACE, | ||
STATE.STATE_, | ||
STATE.TYPE) | ||
.values( | ||
UUID.randomUUID(), | ||
now, | ||
now, | ||
connectionId, | ||
streamName, | ||
namespace, | ||
jsonbState, | ||
Enums.convertTo(stateType, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.class)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. usually when i use this, I throw in a unit test that looks like this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it, added. |
||
.execute(); | ||
|
||
} else { | ||
ctx.update(STATE) | ||
.set(STATE.UPDATED_AT, now) | ||
.set(STATE.STATE_, jsonbState) | ||
.where( | ||
STATE.CONNECTION_ID.eq(connectionId), | ||
isNullOrEquals(STATE.STREAM_NAME, streamName), | ||
isNullOrEquals(STATE.NAMESPACE, namespace)) | ||
.execute(); | ||
} | ||
|
||
} else { | ||
// If the state is null, we remove the state instead of keeping a null row | ||
ctx.deleteFrom(STATE) | ||
.where( | ||
STATE.CONNECTION_ID.eq(connectionId), | ||
isNullOrEquals(STATE.STREAM_NAME, streamName), | ||
isNullOrEquals(STATE.NAMESPACE, namespace)) | ||
.execute(); | ||
} | ||
} | ||
|
||
/** | ||
* Helper function to handle null or equal case for the optional strings | ||
* | ||
* We need to have an explicit check for null values because NULL != "str" is NULL, not a boolean. | ||
* | ||
* @param field the targeted field | ||
* @param value the value to check | ||
* @return The Condition that performs the desired check | ||
*/ | ||
private static Condition isNullOrEquals(final Field<String> field, final String value) { | ||
return value != null ? field.eq(value) : field.isNull(); | ||
} | ||
|
||
/** | ||
* Get the StateType for a given list of StateRecords | ||
* | ||
* @param connectionId The connectionId of the records, used to add more debugging context if an | ||
* error is detected | ||
* @param records The list of StateRecords to process, must not be empty | ||
* @return the StateType of the records | ||
* @throws IllegalStateException If StateRecords have inconsistent types | ||
*/ | ||
private static io.airbyte.db.instance.configs.jooq.generated.enums.StateType getStateType( | ||
final UUID connectionId, | ||
final List<StateRecord> records) { | ||
final Set<io.airbyte.db.instance.configs.jooq.generated.enums.StateType> types = | ||
records.stream().map(r -> r.type).collect(Collectors.toSet()); | ||
if (types.size() == 1) { | ||
return types.stream().findFirst().get(); | ||
} | ||
|
||
throw new IllegalStateException("Inconsistent StateTypes for connectionId " + connectionId + | ||
" (" + String.join(", ", types.stream().map(stateType -> stateType.getLiteral()).toList()) + ")"); | ||
} | ||
|
||
/** | ||
* Get the state records from the DB | ||
* | ||
* @param ctx A valid DSL context to use for the query | ||
* @param connectionId the ID of the connection | ||
* @return The StateRecords for the connectionId | ||
*/ | ||
private static List<StateRecord> getStateRecords(final DSLContext ctx, final UUID connectionId) { | ||
return ctx.select(DSL.asterisk()) | ||
.from(STATE) | ||
.where(STATE.CONNECTION_ID.eq(connectionId)) | ||
.fetch(getStateRecordMapper()) | ||
.stream().toList(); | ||
} | ||
|
||
/** | ||
* Build Global state | ||
* | ||
* The list of records can contain one global shared state that is the state without streamName and | ||
* without namespace The other records should be translated into AirbyteStreamState | ||
*/ | ||
private static StateWrapper buildGlobalState(final List<StateRecord> records) { | ||
// Split the global shared state from the other per stream records | ||
final Map<Boolean, List<StateRecord>> partitions = records.stream() | ||
.collect(Collectors.partitioningBy(r -> r.streamName == null && r.namespace == null)); | ||
|
||
final AirbyteGlobalState globalState = new AirbyteGlobalState() | ||
.withSharedState(partitions.get(Boolean.TRUE).stream().map(r -> r.state).findFirst().orElse(null)) | ||
.withStreamStates(partitions.get(Boolean.FALSE).stream().map(StatePersistence::buildAirbyteStreamState).toList()); | ||
|
||
final AirbyteStateMessage msg = new AirbyteStateMessage() | ||
.withType(AirbyteStateType.GLOBAL) | ||
.withGlobal(globalState); | ||
return new StateWrapper().withStateType(StateType.GLOBAL).withGlobal(msg); | ||
} | ||
|
||
/** | ||
* Build StateWrapper for a PerStream state | ||
*/ | ||
private static StateWrapper buildStreamState(final List<StateRecord> records) { | ||
final List<AirbyteStateMessage> messages = records.stream().map( | ||
record -> new AirbyteStateMessage() | ||
.withType(AirbyteStateType.STREAM) | ||
.withStream(buildAirbyteStreamState(record))) | ||
.toList(); | ||
return new StateWrapper().withStateType(StateType.STREAM).withStateMessages(messages); | ||
} | ||
|
||
/** | ||
* Build a StateWrapper for Legacy state | ||
*/ | ||
private static StateWrapper buildLegacyState(final List<StateRecord> records) { | ||
return new StateWrapper() | ||
.withStateType(StateType.LEGACY) | ||
.withLegacyState(records.get(0).state); | ||
} | ||
|
||
/** | ||
* Convert a StateRecord to an AirbyteStreamState | ||
*/ | ||
private static AirbyteStreamState buildAirbyteStreamState(final StateRecord record) { | ||
return new AirbyteStreamState() | ||
.withStreamDescriptor(new StreamDescriptor().withName(record.streamName).withNamespace(record.namespace)) | ||
.withStreamState(record.state); | ||
} | ||
|
||
private static RecordMapper<Record, StateRecord> getStateRecordMapper() { | ||
return record -> new StateRecord( | ||
record.get(STATE.TYPE, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.class), | ||
record.get(STATE.STREAM_NAME, String.class), | ||
record.get(STATE.NAMESPACE, String.class), | ||
Jsons.deserialize(record.get(STATE.STATE_).data())); | ||
} | ||
|
||
private record StateRecord( | ||
io.airbyte.db.instance.configs.jooq.generated.enums.StateType type, | ||
String streamName, | ||
String namespace, | ||
JsonNode state) {} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Add a class JavaDoc. This will be one less issue to fix when we enable PMD/Checkstyle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done