-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML Data Frame] Persist and restore checkpoint and position #41942
Conversation
Pinging @elastic/ml-core |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
builder.field(DataFrameField.ID.getPreferredName(), id); | ||
builder.field(STATE_FIELD.getPreferredName(), transformState, params); | ||
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params); | ||
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params); | ||
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe off-topic: what about adding the version here (only if internal storage is true), note: we can do that as separate PR if you prefer.
listener.onResponse(stats); | ||
}, | ||
e -> { | ||
if (e.getClass() == IndexNotFoundException.class) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
related to #42176
The change to how stats are persisted required a corresponding change to the index mappings and I updated how the usage stats are aggregated |
IndexerState currentState = state.updateAndGet(previousState -> { | ||
if (previousState == IndexerState.INDEXING) { | ||
return IndexerState.STOPPING; | ||
} else if (previousState == IndexerState.STARTED) { | ||
onStop(); | ||
wasStartedAndSetStopped.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onStop
is now persisting state so it must be called after the state has been updated.
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName()) | ||
.field(TYPE, LONG) | ||
.startObject(DataFrameTransformStateAndStats.CHECKPOINTING_INFO_FIELD.getPreferredName()) | ||
.field(ENABLED, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added explicit mappings for the state and stats objects but not the checkpoints. Maintaining the mappings is a burden and the tests may not fail after they have change, I think this is sufficient
6ed0fd3
to
7267343
Compare
run elasticsearch-ci/default-distro |
Persist and restore Data frame's current checkpoint and position
* master: (176 commits) Avoid unnecessary persistence of retention leases (elastic#42299) [ML][TEST] Fix limits in AutodetectMemoryLimitIT (elastic#42279) [ML Data Frame] Persist and restore checkpoint and position (elastic#41942) mute failing filerealm hash caching tests (elastic#42304) Safer Wait for Snapshot Success in ClusterPrivilegeTests (elastic#40943) Remove 7.0.2 (elastic#42282) Revert "Remove 7.0.2 (elastic#42282)" [DOCS] Copied note on slicing support to Slicing section. Closes 26114 (elastic#40426) Remove 7.0.2 (elastic#42282) Mute all ml_datafeed_crud rolling upgrade tests Move the FIPS configuration back to the build plugin (elastic#41989) Remove stray back tick that's messing up table format (elastic#41705) Add missing comma in code section (elastic#41678) add 7.1.1 and 6.8.1 versions (elastic#42253) Use spearate testkit dir for each run (elastic#42013) Add experimental and warnings to vector functions (elastic#42205) Fix version in tests since elastic#41906 was merged Bump version in BWC check after backport Prevent in-place downgrades and invalid upgrades (elastic#41731) Mute date_histo interval bwc test ...
* master: (82 commits) Fix off-by-one error in an index shard test Cleanup Redundant BlobStoreFormat Class (elastic#42195) remove backcompat handling of 6.2.x versions (elastic#42044) Mute testDelayedOperationsBeforeAndAfterRelocated Execute actions under permit in primary mode only (elastic#42241) Mute another transforms_stats yaml test Deprecate support for chained multi-fields. (elastic#41926) Mute transforms_stats yaml test Make unwrapCorrupt Check Suppressed Ex. (elastic#41889) Remove Dead Code from Azure Repo Plugin (elastic#42178) Reorganize Painless doc structure (elastic#42303) Avoid unnecessary persistence of retention leases (elastic#42299) [ML][TEST] Fix limits in AutodetectMemoryLimitIT (elastic#42279) [ML Data Frame] Persist and restore checkpoint and position (elastic#41942) mute failing filerealm hash caching tests (elastic#42304) Safer Wait for Snapshot Success in ClusterPrivilegeTests (elastic#40943) Remove 7.0.2 (elastic#42282) Revert "Remove 7.0.2 (elastic#42282)" [DOCS] Copied note on slicing support to Slicing section. Closes 26114 (elastic#40426) Remove 7.0.2 (elastic#42282) ...
…41942) Persist and restore Data frame's current checkpoint and position
Rather than persisting just
DataFrameIndexerTransformStats
(contains counts of indexed docs etc)DataFrameTransformStateAndStats
is now written. This contains aDataFrameIndexerTransformStats
and also aDataFrameTransformState
which in turn has thecheckpoint
andposition
map.A
DataFrameTransformStateAndStats
is returned by GET _stats so there is no change to the API response.When starting a previously stopped data frame the current checkpoint and position are restored from the persisted
DataFrameTransformStateAndStats
. A relocated data frame where the persistent task has moved to a new node restores the checkpoint and position from the p. task state. Re-starting a failed data frame does not set the checkpoint or position as the p. task is is extant and the state up to date.WIP: This is closely related to #41752 at that PR changes to stop to complete the persistent task which means this change is required to restart the data frame from the correct position. I will implement persistingDataFrameTransformStateAndStats
to the index on stop once #41752 is merged.UPDATED #41752 is merged