Skip to content

Commit

Permalink
Don’t update CS
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed May 28, 2019
1 parent ea7f790 commit 47eb26a
Showing 1 changed file with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;

private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();

private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
Expand Down Expand Up @@ -125,7 +125,7 @@ public Status getStatus() {
return getState();
}

private DataFrameIndexer getIndexer() {
private ClientDataFrameIndexer getIndexer() {
return indexer.get();
}

Expand Down Expand Up @@ -236,7 +236,10 @@ public synchronized void stop() {
return;
}

getIndexer().stop();
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().saveState(state, () -> getIndexer().onStop());
}
}

@Override
Expand Down Expand Up @@ -531,6 +534,11 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
return;
}

saveState(indexerState, next);
}

public void saveState(IndexerState indexerState, Runnable next){

final DataFrameTransformState state = new DataFrameTransformState(
transformTask.taskState.get(),
indexerState,
Expand All @@ -542,28 +550,18 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi

// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);

transformTask.persistStateToClusterState(state, updateClusterStateListener);
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
}

@Override
Expand Down

0 comments on commit 47eb26a

Please sign in to comment.