Skip to content

Commit

Permalink
fix NPE in usage retrieval if task state is null
Browse files Browse the repository at this point in the history
  • Loading branch information
Hendrik Muhs committed Feb 28, 2020
1 parent 442e895 commit 0808be3
Showing 1 changed file with 71 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,36 @@ public class TransformUsageTransportAction extends XPackUsageFeatureTransportAct
private final Client client;

@Inject
public TransformUsageTransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Settings settings, XPackLicenseState licenseState, Client client) {
super(XPackUsageFeatureAction.TRANSFORM.name(), transportService, clusterService,
threadPool, actionFilters, indexNameExpressionResolver);
public TransformUsageTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Settings settings,
XPackLicenseState licenseState,
Client client
) {
super(
XPackUsageFeatureAction.TRANSFORM.name(),
transportService,
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
);
this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
this.licenseState = licenseState;
this.client = client;
}

@Override
protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener) {
protected void masterOperation(
Task task,
XPackUsageRequest request,
ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener
) {
boolean available = licenseState.isTransformAllowed();
if (enabled == false) {
var usage = new TransformFeatureSetUsage(available, enabled, Collections.emptyMap(), new TransformIndexerStats());
Expand All @@ -75,61 +92,66 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
}

PersistentTasksCustomMetaData taskMetadata = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(state);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> transformTasks = taskMetadata == null ?
Collections.emptyList() :
taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> transformTasks = taskMetadata == null
? Collections.emptyList()
: taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true);
final int taskCount = transformTasks.size();
final Map<String, Long> transformsCountByState = new HashMap<>();
for(PersistentTasksCustomMetaData.PersistentTask<?> transformTask : transformTasks) {
TransformState transformState = (TransformState)transformTask.getState();
transformsCountByState.merge(transformState.getTaskState().value(), 1L, Long::sum);
for (PersistentTasksCustomMetaData.PersistentTask<?> transformTask : transformTasks) {
TransformState transformState = (TransformState) transformTask.getState();
TransformTaskState taskState = transformState.getTaskState();
if (taskState != null) {
transformsCountByState.merge(taskState.value(), 1L, Long::sum);
}
}

ActionListener<TransformIndexerStats> totalStatsListener = ActionListener.wrap(
statSummations -> {
var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations);
listener.onResponse(new XPackUsageFeatureResponse(usage));
},
listener::onFailure
);
ActionListener<TransformIndexerStats> totalStatsListener = ActionListener.wrap(statSummations -> {
var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations);
listener.onResponse(new XPackUsageFeatureResponse(usage));
}, listener::onFailure);

ActionListener<SearchResponse> totalTransformCountListener = ActionListener.wrap(
transformCountSuccess -> {
if (transformCountSuccess.getShardFailures().length > 0) {
logger.error("total transform count search returned shard failures: {}",
Arrays.toString(transformCountSuccess.getShardFailures()));
}
long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
if (totalTransforms == 0) {
var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState,
new TransformIndexerStats());
listener.onResponse(new XPackUsageFeatureResponse(usage));
return;
}
transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
ActionListener<SearchResponse> totalTransformCountListener = ActionListener.wrap(transformCountSuccess -> {
if (transformCountSuccess.getShardFailures().length > 0) {
logger.error(
"total transform count search returned shard failures: {}",
Arrays.toString(transformCountSuccess.getShardFailures())
);
}
long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
if (totalTransforms == 0) {
var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, new TransformIndexerStats());
listener.onResponse(new XPackUsageFeatureResponse(usage));
return;
}
transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
}, transformCountFailure -> {
if (transformCountFailure instanceof ResourceNotFoundException) {
TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
},
transformCountFailure -> {
if (transformCountFailure instanceof ResourceNotFoundException) {
TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
} else {
listener.onFailure(transformCountFailure);
}
} else {
listener.onFailure(transformCountFailure);
}
);
});

SearchRequest totalTransformCount = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
SearchRequest totalTransformCount = client.prepareSearch(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))))
.setQuery(
QueryBuilders.constantScoreQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))
)
)
.request();

ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ClientHelper.TRANSFORM_ORIGIN,
totalTransformCount,
totalTransformCountListener,
client::search);
client::search
);
}
}

0 comments on commit 0808be3

Please sign in to comment.