Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add joins, insertion & search to correlation engine #7771

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand Down Expand Up @@ -102,6 +103,127 @@ public void testCreatingACorrelationRuleWithNoTimestampField() throws IOExceptio
);
}

@SuppressWarnings("unchecked")
public void testCorrelationWithSingleRule() throws IOException {
String windowsIndex = "windows";
Request request = new Request("PUT", "/" + windowsIndex);
request.setJsonEntity(windowsMappings());
client().performRequest(request);

String appLogsIndex = "app_logs";
request = new Request("PUT", "/" + appLogsIndex);
request.setJsonEntity(appLogMappings());
client().performRequest(request);

String correlationRule = windowsToAppLogsCorrelationRule();
request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(correlationRule);
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", windowsIndex));
request.setJsonEntity(sampleWindowsEvent());
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", appLogsIndex));
request.setJsonEntity(sampleAppLogsEvent());
Response response = client().performRequest(request);
String appLogsId = responseAsMap(response).get("_id").toString();

request = new Request("POST", "/_correlation/events");
request.setJsonEntity(prepareCorrelateEventRequest(appLogsIndex, appLogsId));
response = client().performRequest(request);
Map<String, Object> responseAsMap = responseAsMap(response);
Assert.assertEquals(1, ((Map<String, Object>) responseAsMap.get("neighbor_events")).size());
}

@SuppressWarnings("unchecked")
public void testSearchCorrelationsWithSingleRule() throws IOException {
String windowsIndex = "windows";
Request request = new Request("PUT", "/" + windowsIndex);
request.setJsonEntity(windowsMappings());
client().performRequest(request);

String appLogsIndex = "app_logs";
request = new Request("PUT", "/" + appLogsIndex);
request.setJsonEntity(appLogMappings());
client().performRequest(request);

String correlationRule = windowsToAppLogsCorrelationRule();
request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(correlationRule);
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", windowsIndex));
request.setJsonEntity(sampleWindowsEvent());
Response response = client().performRequest(request);
String windowsId = responseAsMap(response).get("_id").toString();

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", appLogsIndex));
request.setJsonEntity(sampleAppLogsEvent());
response = client().performRequest(request);
String appLogsId = responseAsMap(response).get("_id").toString();

request = new Request("POST", "/_correlation/events");
request.setJsonEntity(prepareCorrelateEventRequest(windowsIndex, windowsId, true));
client().performRequest(request);

request = new Request("POST", "/_correlation/events");
request.setJsonEntity(prepareCorrelateEventRequest(appLogsIndex, appLogsId, true));
response = client().performRequest(request);
Map<String, Object> responseAsMap = responseAsMap(response);
Assert.assertEquals(1, ((Map<String, Object>) responseAsMap.get("neighbor_events")).size());

request = new Request("GET", prepareSearchCorrelationsRequest(windowsIndex, windowsId, "winlog.timestamp", 300000L, 5));
response = client().performRequest(request);
responseAsMap = responseAsMap(response);
Assert.assertEquals(1, ((List<Object>) responseAsMap.get("events")).size());
}

private String prepareCorrelateEventRequest(String index, String event) {
return "{\n" + " \"index\": \"" + index + "\",\n" + " \"event\": \"" + event + "\",\n" + " \"store\": false\n" + "}";
}

private String prepareCorrelateEventRequest(String index, String event, Boolean store) {
return "{\n" + " \"index\": \"" + index + "\",\n" + " \"event\": \"" + event + "\",\n" + " \"store\": " + store + "\n" + "}";
}

private String prepareSearchCorrelationsRequest(String index, String event, String timestampField, Long timeWindow, int neighbors) {
return String.format(
Locale.ROOT,
"%s?index=%s&event=%s&timestamp_field=%s&time_window=%s&nearby_events=%s",
"/_correlation/events",
index,
event,
timestampField,
timeWindow,
neighbors
);
}

private String windowsToAppLogsCorrelationRule() {
return "{\n"
+ " \"name\": \"windows to app logs\",\n"
+ " \"correlate\": [\n"
+ " {\n"
+ " \"index\": \"windows\",\n"
+ " \"query\": \"host.hostname:EC2AMAZ*\",\n"
+ " \"timestampField\": \"winlog.timestamp\",\n"
+ " \"tags\": [\n"
+ " \"windows\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"index\": \"app_logs\",\n"
+ " \"query\": \"endpoint:\\\\/customer_records.txt\",\n"
+ " \"timestampField\": \"timestamp\",\n"
+ " \"tags\": [\n"
+ " \"others_application\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}";
}

private String sampleCorrelationRule() {
return "{\n"
+ " \"name\": \"s3 to app logs\",\n"
Expand Down Expand Up @@ -151,4 +273,115 @@ private String sampleCorrelationRuleWithNoTimestamp() {
private String matchIdQuery(String id) {
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
}

private String windowsMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"server.user.hash\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"host.hostname\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"windows.message\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.provider_name\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_data.ServiceName\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String appLogMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"http_method\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"endpoint\": {\n"
+ " \"type\": \"text\",\n"
+ " \"analyzer\": \"whitespace\""
+ " },\n"
+ " \"keywords\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String sampleWindowsEvent() {
return "{\n"
+ " \"EventTime\": \"2020-02-04T14:59:39.343541+00:00\",\n"
+ " \"host.hostname\": \"EC2AMAZEPO7HKA\",\n"
+ " \"Keywords\": \"9223372036854775808\",\n"
+ " \"SeverityValue\": 2,\n"
+ " \"Severity\": \"INFO\",\n"
+ " \"winlog.event_id\": 22,\n"
+ " \"SourceName\": \"Microsoft-Windows-Sysmon\",\n"
+ " \"ProviderGuid\": \"{5770385F-C22A-43E0-BF4C-06F5698FFBD9}\",\n"
+ " \"Version\": 5,\n"
+ " \"TaskValue\": 22,\n"
+ " \"OpcodeValue\": 0,\n"
+ " \"RecordNumber\": 9532,\n"
+ " \"ExecutionProcessID\": 1996,\n"
+ " \"ExecutionThreadID\": 2616,\n"
+ " \"Channel\": \"Microsoft-Windows-Sysmon/Operational\",\n"
+ " \"winlog.event_data.SubjectDomainName\": \"NTAUTHORITY\",\n"
+ " \"AccountName\": \"SYSTEM\",\n"
+ " \"UserID\": \"S-1-5-18\",\n"
+ " \"AccountType\": \"User\",\n"
+ " \"windows.message\": \"Dns query:\\r\\nRuleName: \\r\\nUtcTime: 2020-02-04 14:59:38.349\\r\\nProcessGuid: {b3c285a4-3cda-5dc0-0000-001077270b00}\\r\\nProcessId: 1904\\r\\nQueryName: EC2AMAZ-EPO7HKA\\r\\nQueryStatus: 0\\r\\nQueryResults: 172.31.46.38;\\r\\nImage: C:\\\\Program Files\\\\nxlog\\\\nxlog.exe\",\n"
+ " \"Category\": \"Dns query (rule: DnsQuery)\",\n"
+ " \"Opcode\": \"Info\",\n"
+ " \"UtcTime\": \"2020-02-04 14:59:38.349\",\n"
+ " \"ProcessGuid\": \"{b3c285a4-3cda-5dc0-0000-001077270b00}\",\n"
+ " \"ProcessId\": \"1904\",\n"
+ " \"QueryName\": \"EC2AMAZ-EPO7HKA\",\n"
+ " \"QueryStatus\": \"0\",\n"
+ " \"QueryResults\": \"172.31.46.38;\",\n"
+ " \"Image\": \"C:\\\\Program Files\\\\nxlog\\\\regsvr32.exe\",\n"
+ " \"EventReceivedTime\": \"2020-02-04T14:59:40.780905+00:00\",\n"
+ " \"SourceModuleName\": \"in\",\n"
+ " \"SourceModuleType\": \"im_msvistalog\",\n"
+ " \"CommandLine\": \"eachtest\",\n"
+ " \"Initiated\": \"true\",\n"
+ " \"winlog.timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}

private String sampleAppLogsEvent() {
return "{\n"
+ " \"endpoint\": \"/customer_records.txt\",\n"
+ " \"http_method\": \"POST\",\n"
+ " \"keywords\": \"PermissionDenied\",\n"
+ " \"timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@
import org.opensearch.plugin.correlation.core.index.mapper.CorrelationVectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.mapper.VectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.query.CorrelationQueryBuilder;
import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction;
import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.transport.TransportSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportStoreCorrelationAction;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.resthandler.RestIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.transport.TransportIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings;
import org.opensearch.plugin.correlation.utils.CorrelationIndices;
import org.opensearch.plugin.correlation.utils.CorrelationRuleIndices;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
Expand Down Expand Up @@ -67,9 +76,12 @@
* events-correlation-engine rules uri
*/
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/rules";
public static final String CORRELATION_EVENTS_BASE_URI = PLUGINS_BASE_URI + "/events";

private CorrelationRuleIndices correlationRuleIndices;

private CorrelationIndices correlationIndices;

/**
* Default constructor
*/
Expand All @@ -90,7 +102,8 @@
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
return List.of(correlationRuleIndices);
correlationIndices = new CorrelationIndices(client, clusterService, clusterService.getSettings());
return List.of(correlationRuleIndices, correlationIndices);

Check warning on line 106 in plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java#L105-L106

Added lines #L105 - L106 were not covered by tests
}

@Override
Expand All @@ -103,7 +116,7 @@
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestIndexCorrelationRuleAction());
return List.of(new RestIndexCorrelationRuleAction(), new RestSearchCorrelatedEventsAction(), new RestIndexCorrelationAction());

Check warning on line 119 in plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java#L119

Added line #L119 was not covered by tests
}

@Override
Expand Down Expand Up @@ -132,11 +145,21 @@

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class));
return List.of(

Check warning on line 148 in plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java#L148

Added line #L148 was not covered by tests
new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class),
new ActionPlugin.ActionHandler<>(IndexCorrelationAction.INSTANCE, TransportIndexCorrelationAction.class),
new ActionPlugin.ActionHandler<>(StoreCorrelationAction.INSTANCE, TransportStoreCorrelationAction.class),
new ActionPlugin.ActionHandler<>(SearchCorrelatedEventsAction.INSTANCE, TransportSearchCorrelatedEventsAction.class)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, EventsCorrelationSettings.CORRELATION_TIME_WINDOW);
return List.of(
EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING,
EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS,
EventsCorrelationSettings.CORRELATION_TIME_WINDOW,
EventsCorrelationSettings.REQUEST_TIMEOUT
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

/**
* Transport Action for indexing correlations
*
* @opensearch.internal
*/
public class IndexCorrelationAction extends ActionType<IndexCorrelationResponse> {

/**
* Instance of IndexCorrelationAction
*/
public static final IndexCorrelationAction INSTANCE = new IndexCorrelationAction();
/**
* Name of IndexCorrelationAction
*/
public static final String NAME = "cluster:admin/index/correlation/events";

private IndexCorrelationAction() {
super(NAME, IndexCorrelationResponse::new);
}
}
Loading
Loading