Skip to content

Commit

Permalink
entire custom logtype implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Jul 18, 2023
1 parent 485b2a4 commit 10f5bf1
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,20 +294,24 @@ public void onResponse(SearchResponse response) {
bulkRequest.add(indexRequest);
}

bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
logger.info("Indexing [" + bulkRequest.numberOfActions() + "] customLogTypes");
client.bulk(
bulkRequest,
ActionListener.delegateFailure(listener, (l, r) -> {
if (r.hasFailures()) {
logger.error("Custom LogType Bulk Index had failures:\n ", r.buildFailureMessage());
listener.onFailure(new IllegalStateException(r.buildFailureMessage()));
} else {
logger.info("Loaded [" + r.getItems().length + "] customLogType docs successfully!");
listener.onResponse(null);
}
})
);
if (bulkRequest.numberOfActions() > 0) {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
logger.info("Indexing [" + bulkRequest.numberOfActions() + "] customLogTypes");
client.bulk(
bulkRequest,
ActionListener.delegateFailure(listener, (l, r) -> {
if (r.hasFailures()) {
logger.error("Custom LogType Bulk Index had failures:\n ", r.buildFailureMessage());
listener.onFailure(new IllegalStateException(r.buildFailureMessage()));
} else {
logger.info("Loaded [" + r.getItems().length + "] customLogType docs successfully!");
listener.onResponse(null);
}
})
);
} else {
listener.onResponse(null);
}
} catch (URISyntaxException | IOException e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -480,13 +484,13 @@ public void onFailure(Exception e) {
listener,
(delegatedListener, unused) -> {
isConfigIndexInitialized = true;
delegatedListener.onResponse(null);
doIndexLogTypeMetadata(listener);
})
);
}));
} else {
if (isConfigIndexInitialized) {
listener.onResponse(null);
doIndexLogTypeMetadata(listener);
return;
}
loadBuiltinLogTypes(ActionListener.delegateFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class TransportIndexCustomLogTypeAction extends HandledTransportAction<In

private final CustomLogTypeIndices customLogTypeIndices;

private final LogTypeService logTypeService;

private volatile Boolean filterByEnabled;

private volatile TimeValue indexTimeout;
Expand All @@ -80,6 +82,7 @@ public TransportIndexCustomLogTypeAction(TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
CustomLogTypeIndices customLogTypeIndices,
LogTypeService logTypeService,
Settings settings,
ThreadPool threadPool) {
super(IndexCustomLogTypeAction.NAME, transportService, actionFilters, IndexCustomLogTypeRequest::new);
Expand All @@ -88,6 +91,7 @@ public TransportIndexCustomLogTypeAction(TransportService transportService,
this.threadPool = threadPool;
this.settings = settings;
this.customLogTypeIndices = customLogTypeIndices;
this.logTypeService = logTypeService;
this.filterByEnabled = SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES.get(this.settings);
this.indexTimeout = SecurityAnalyticsSettings.INDEX_TIMEOUT.get(this.settings);

Expand Down Expand Up @@ -326,48 +330,58 @@ public void onFailure(Exception e) {
}
});
} else {
MaxAggregationBuilder queryBuilder = AggregationBuilders.max("agg").field("tags.correlation_id");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(queryBuilder);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(LogTypeService.LOG_TYPE_INDEX);
searchRequest.source(searchSourceBuilder);

client.search(searchRequest, new ActionListener<>() {
logTypeService.ensureConfigIndexIsInitialized(new ActionListener<Void>() {
@Override
public void onResponse(SearchResponse response) {
if (response.isTimedOut()) {
onFailures(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
return;
}

try {
Max agg = response.getAggregations().get("agg");
int value = Double.valueOf(agg.getValue()).intValue();
request.getCustomLogType().setTags(Map.of("correlation_id", value+1));
IndexRequest indexRequest = new IndexRequest(LogTypeService.LOG_TYPE_INDEX)
.setRefreshPolicy(request.getRefreshPolicy())
.source(request.getCustomLogType().toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(indexTimeout);

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse response) {
if (response.status() != RestStatus.CREATED) {
onFailures(new OpenSearchStatusException(String.format(Locale.getDefault(), "Log Type with id %s cannot be updated", logTypeId), RestStatus.INTERNAL_SERVER_ERROR));
}
request.getCustomLogType().setId(response.getId());
onOperation(response, request.getCustomLogType());
public void onResponse(Void unused) {
MaxAggregationBuilder queryBuilder = AggregationBuilders.max("agg").field("tags.correlation_id");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(queryBuilder);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(LogTypeService.LOG_TYPE_INDEX);
searchRequest.source(searchSourceBuilder);

client.search(searchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {
if (response.isTimedOut()) {
onFailures(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
return;
}

@Override
public void onFailure(Exception e) {
onFailures(e);
try {
Max agg = response.getAggregations().get("agg");
int value = Double.valueOf(agg.getValue()).intValue();
request.getCustomLogType().setTags(Map.of("correlation_id", value+1));
IndexRequest indexRequest = new IndexRequest(LogTypeService.LOG_TYPE_INDEX)
.setRefreshPolicy(request.getRefreshPolicy())
.source(request.getCustomLogType().toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.timeout(indexTimeout);

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse response) {
if (response.status() != RestStatus.CREATED) {
onFailures(new OpenSearchStatusException(String.format(Locale.getDefault(), "Log Type with id %s cannot be updated", logTypeId), RestStatus.INTERNAL_SERVER_ERROR));
}
request.getCustomLogType().setId(response.getId());
onOperation(response, request.getCustomLogType());
}

@Override
public void onFailure(Exception e) {
onFailures(e);
}
});
} catch (IOException ex) {
onFailures(ex);
}
});
} catch (IOException ex) {
onFailures(ex);
}
}

@Override
public void onFailure(Exception e) {
onFailures(e);
}
});
}

@Override
Expand Down

0 comments on commit 10f5bf1

Please sign in to comment.