Skip to content

Commit

Permalink
[Transform] improve tracing for TransformSurvivesUpgradeIT (#69854)
Browse files Browse the repository at this point in the history
report number of deleted state docs in order to help tracing issues

relates #68646
  • Loading branch information
Hendrik Muhs authored Mar 4, 2021
1 parent 7837124 commit 36f3ea9
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ public void testDeleteOldTransformStoredDocuments() throws Exception {
is(true)
);

assertAsync(listener -> transformConfigManager.deleteOldTransformStoredDocuments(transformId, listener), true, null, null);
assertAsync(listener -> transformConfigManager.deleteOldTransformStoredDocuments(transformId, listener), 1L, null, null);

client().admin().indices().refresh(new RefreshRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)).actionGet();
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
}

@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Long> listener) {
DeleteByQueryRequest deleteByQueryRequest = createDeleteByQueryRequest();
deleteByQueryRequest.indices(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
Expand All @@ -209,7 +209,7 @@ public void deleteOldTransformStoredDocuments(String transformId, ActionListener
);
return;
}
listener.onResponse(true);
listener.onResponse(response.getDeleted());
}, listener::onFailure)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ void updateTransformConfiguration(
* This deletes stored state/stats documents for the given transformId that are contained in old index versions.
*
* @param transformId The transform ID referenced by the documents
* @param listener listener to alert on completion
* @param listener listener to alert on completion, returning the number of deleted docs
*/
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener);
void deleteOldTransformStoredDocuments(String transformId, ActionListener<Long> listener);

/**
* This deletes stored checkpoint documents for the given transformId, based on number and age.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,12 @@ private void doSaveState(TransformState state, ActionListener<Void> listener) {

// Only do this clean up once, if it succeeded, no reason to do the query again.
if (oldStatsCleanedUp.compareAndSet(false, true)) {
transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(nil -> {
logger.trace("[{}] deleted old transform stats and state document", getJobId());
transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(deletedDocs -> {
logger.trace(
"[{}] deleted old transform stats and state document, deleted: [{}] documents",
getJobId(),
deletedDocs
);
listener.onResponse(null);
}, e -> {
String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
}

@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
transformStoredDocs.remove(transformId);
listener.onResponse(true);
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Long> listener) {
listener.onResponse(transformStoredDocs.remove(transformId) == null ? 0L : 1L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAss
Response response = client().performRequest(getStatsDocsRequest);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", responseBody));
assertEquals("expected only 1 hit, got: " + responseBody, 1, XContentMapValues.extractValue("hits.total.value", responseBody));
responseAssertion.accept(responseBody);
}, 60, TimeUnit.SECONDS);
}
Expand Down

0 comments on commit 36f3ea9

Please sign in to comment.