Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StatePersistence object #13900

Merged
merged 18 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StateType.yaml
title: StateType
description: State Types
type: string
enum:
- global
- stream
- legacy
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StateWrapper.yaml
title: StateWrapper
description: Wrapper around the different type of States
type: object
additionalProperties: false
required:
- stateType
properties:
stateType:
description: The type of the state being wrapped
"$ref": StateType.yaml
legacyState:
description: Legacy State for states that haven't been migrated yet
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
global:
description: Representation of the shared
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteStateMessage
stateMessages:
type: array
items:
type: object
existingJavaType: io.airbyte.protocol.models.AirbyteStateMessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.generated.Tables.STATE;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.impl.DSL;

public class StatePersistence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add a class JavaDoc. This will be one less issue to fix when we enable PMD/Checkstyle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final ExceptionWrappingDatabase database;

public StatePersistence(final Database database) {
this.database = new ExceptionWrappingDatabase(database);
}

/**
* Get the current State of a Connection
*
* @param connectionId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdpgrailsdev will our linters scream at us if we have @param, @return, and @throws without a description in them?

* @return
* @throws IOException
*/
public Optional<StateWrapper> getCurrentState(final UUID connectionId) throws IOException {
final List<StateRecord> records = this.database.query(ctx -> getStateRecords(ctx, connectionId));

if (records.isEmpty())
return Optional.empty();

return switch (records.stream().findFirst().get().type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some validatino that ensure that all the records have the same type here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about that. I will add a check and log an error.

Not sure what is a desirable behavior here. If we have an inconsistent state in the DB, throwing an error means that recovering from this state will hard since the reset should rely on this same function.
Did we have something in mind to handle those type of errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to make it fail harder when we read or write.
Checking before updating the state should prevent us from landing in a bad state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would worth having a team discussion around that because it feels that we are heading to a potential state similar to the quarantine where a connection is blocked and can only be unblocked with a manual intervention. Should we make sure that the write clear any other state if there is a conflict? @cgardens @lmossman @jdpgrailsdev WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau @gosusnp looking at the current code, it seems like even if we try to reset the connection, if there is inconsistent state types in the db then that will fail. This seems not great, because we should at least be able to fix bad state in the db with a full connection reset.

This makes me lean toward just saying that we should clear out any existing inconsistent state when writing new state, since it seems like we shouldn't cripple the system in that case. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmossman I think that it might lead to duplication of data in case of the Incremental | Append. And if we introduce an error, all the connection that are running will be fully refreshed which can be very costly. I would prefer to fail when getting and/or saving the state.

case GLOBAL -> Optional.of(buildGlobalState(records));
case STREAM -> Optional.of(buildStreamState(records));
default -> Optional.of(buildLegacyState(records));
};
}

/**
* Create or update the states described in the StateWrapper
* Null states will be deleted.
*
* @param connectionId
* @param state
* @throws IOException
*/
public void updateOrCreateState(final UUID connectionId, final StateWrapper state) throws IOException {
final Optional<StateWrapper> previousState = getCurrentState(connectionId);
final boolean isMigration = previousState.isPresent() && previousState.get().getStateType() == StateType.LEGACY &&
state.getStateType() != StateType.LEGACY;

// The only case where we allow a state migration is moving from LEGACY.
// We expect any other migration to go through an explicit reset.
if (!isMigration && previousState.isPresent() && previousState.get().getStateType() != state.getStateType()) {
throw new IOException("Unexpected type migration from '" + previousState.get().getStateType() + "' to '" + state.getStateType() +
"'. Migration of StateType need to go through an explicit reset.");
}

this.database.transaction(ctx -> {
if (isMigration) {
clearLegacyState(ctx, connectionId);
}
switch (state.getStateType()) {
case GLOBAL -> saveGlobalState(ctx, connectionId, state.getGlobal().getGlobal());
case STREAM -> saveStreamState(ctx, connectionId, state.getStateMessages());
case LEGACY -> saveLegacyState(ctx, connectionId, state.getLegacyState());
}
return null;
});
}

private void clearLegacyState(final DSLContext ctx, final UUID connectionId) {
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, null);
}

private void saveGlobalState(final DSLContext ctx, final UUID connectionId, final AirbyteGlobalState globalState) {
writeStateToDb(ctx, connectionId, null, null, StateType.GLOBAL, globalState.getSharedState());
for (final AirbyteStreamState streamState : globalState.getStreamStates()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any reason not to use a stream/lambda for this? Are we concerned about swallowing any exceptions raised by writeStateToDb?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is very procedural in the end, a for loop felt more straightforward in this case.
About the exceptions, I'd rather let them bubble up for debugging errors if any.

writeStateToDb(ctx,
connectionId,
streamState.getStreamDescriptor().getName(),
streamState.getStreamDescriptor().getNamespace(),
StateType.GLOBAL,
streamState.getStreamState());
}
}

private void saveStreamState(final DSLContext ctx, final UUID connectionId, final List<AirbyteStateMessage> stateMessages) {
for (final AirbyteStateMessage stateMessage : stateMessages) {
final AirbyteStreamState streamState = stateMessage.getStream();
writeStateToDb(ctx,
connectionId,
streamState.getStreamDescriptor().getName(),
streamState.getStreamDescriptor().getNamespace(),
StateType.STREAM,
streamState.getStreamState());
}
}

private void saveLegacyState(final DSLContext ctx, final UUID connectionId, final JsonNode state) {
writeStateToDb(ctx, connectionId, null, null, StateType.LEGACY, state);
}

/**
* Performs the actual SQL operation depending on the state
*
* If the state is null, it will delete the row, otherwise do an insert or update on conflict
*/
void writeStateToDb(final DSLContext ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice helper! great code reuse.

final UUID connectionId,
final String streamName,
final String namespace,
final StateType stateType,
final JsonNode state) {
if (state != null) {
boolean hasState = ctx.selectFrom(STATE)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are repeated a few times in this function. Would it make sense to try to encapsulte/DRY this, or do you think that would make this logic less readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the inline if, which had the benefit of coming with a tiny bit of doc about why we need the null or equal in the first place.
For the where clause itself, I think it makes it easier to read the where clause rather than abstract it.

.fetch().isNotEmpty();

final JSONB jsonbState = JSONB.valueOf(Jsons.serialize(state));
final OffsetDateTime now = OffsetDateTime.now();

if (!hasState) {
ctx.insertInto(STATE)
.columns(
STATE.ID,
STATE.CREATED_AT,
STATE.UPDATED_AT,
STATE.CONNECTION_ID,
STATE.STREAM_NAME,
STATE.NAMESPACE,
STATE.STATE_,
STATE.TYPE)
.values(
UUID.randomUUID(),
now,
now,
connectionId,
streamName,
namespace,
jsonbState,
convertStateType(stateType))
.execute();

} else {
ctx.update(STATE)
.set(STATE.UPDATED_AT, now)
.set(STATE.STATE_, jsonbState)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
.execute();
}

} else {
// If the state is null, we remove the state instead of keeping a null row
ctx.deleteFrom(STATE)
.where(
STATE.CONNECTION_ID.eq(connectionId),
streamName != null ? STATE.STREAM_NAME.eq(streamName) : STATE.STREAM_NAME.isNull(),
namespace != null ? STATE.NAMESPACE.eq(namespace) : STATE.NAMESPACE.isNull())
.execute();
}
}

/**
* Get the state records from the DB
*
* @param ctx A valid DSL context to use for the query
* @param connectionId the ID of the connection
* @return The StateRecords for the connectionId
*/
private List<StateRecord> getStateRecords(final DSLContext ctx, final UUID connectionId) {
return ctx.select(DSL.asterisk())
.from(STATE)
.where(STATE.CONNECTION_ID.eq(connectionId))
.fetch(getStateRecordMapper())
.stream().toList();
}

/**
* Build Global state
*
* The list of records can contain one global shared state that is the state without streamName and
* without namespace The other records should be tronslated into AirbyteStreamState
*/
private StateWrapper buildGlobalState(final List<StateRecord> records) {
// Split the global shared state from the other per stream records
final Map<Boolean, List<StateRecord>> partitions = records.stream()
.collect(Collectors.partitioningBy(r -> r.streamName == null && r.namespace == null));

final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(partitions.get(Boolean.TRUE).stream().map(r -> r.state).findFirst().orElse(null))
.withStreamStates(partitions.get(Boolean.FALSE).stream().map(this::buildAirbyteStreamState).toList());

final AirbyteStateMessage msg = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
return new StateWrapper().withStateType(StateType.GLOBAL).withGlobal(msg);
}

/**
* Build StateWrapper for a PerStream state
*/
private StateWrapper buildStreamState(final List<StateRecord> records) {
List<AirbyteStateMessage> messages = records.stream().map(
record -> new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(buildAirbyteStreamState(record)))
.toList();
return new StateWrapper().withStateType(StateType.STREAM).withStateMessages(messages);
}

/**
* Build a StateWrapper for Legacy state
*/
private StateWrapper buildLegacyState(final List<StateRecord> records) {
return new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(records.get(0).state);
}

/**
* Convert a StateRecord to an AirbyteStreamState
*/
private AirbyteStreamState buildAirbyteStreamState(final StateRecord record) {
return new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(record.streamName).withNamespace(record.namespace))
.withStreamState(record.state);
}

private static RecordMapper<Record, StateRecord> getStateRecordMapper() {
return record -> new StateRecord(
record.get(STATE.TYPE, io.airbyte.db.instance.configs.jooq.generated.enums.StateType.class),
record.get(STATE.STREAM_NAME, String.class),
record.get(STATE.NAMESPACE, String.class),
Jsons.deserialize(record.get(STATE.STATE_).data()));
}

private static io.airbyte.db.instance.configs.jooq.generated.enums.StateType convertStateType(final StateType stateType) {
return switch (stateType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a helper called Enums.convertTo(...) that should allow you to avoid needing to do this conversion.

case LEGACY -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.LEGACY;
case GLOBAL -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.GLOBAL;
case STREAM -> io.airbyte.db.instance.configs.jooq.generated.enums.StateType.STREAM;
default -> throw new RuntimeException("Unsupported StateType: " + stateType);
};
}

private record StateRecord(
io.airbyte.db.instance.configs.jooq.generated.enums.StateType type,
String streamName,
String namespace,
JsonNode state) {}

}
Loading