Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Store username on watch execution #31873

Merged
merged 13 commits into from
Jul 16, 2018
Merged
4 changes: 3 additions & 1 deletion x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ This is an example of the output:
"type": "index"
}
]
}
},
"user": "test_admin" <4>
}
}
--------------------------------------------------
Expand All @@ -281,6 +282,7 @@ This is an example of the output:
<1> The id of the watch record as it would be stored in the `.watcher-history` index.
<2> The watch record document as it would be stored in the `.watcher-history` index.
<3> The watch execution results.
<4> The user used to execute the watch.

You can set a different execution mode for every action by associating the mode
name with the action id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@ static Authentication deserializeHeaderAndPutInContext(String header, ThreadCont
throws IOException, IllegalArgumentException {
assert ctx.getTransient(AuthenticationField.AUTHENTICATION_KEY) == null;

Authentication authentication = decode(header);
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
return authentication;
}

public static Authentication decode(String header) throws IOException {
byte[] bytes = Base64.getDecoder().decode(header);
StreamInput input = StreamInput.wrap(bytes);
Version version = Version.readVersion(input);
input.setVersion(version);
Authentication authentication = new Authentication(input);
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
return authentication;
return new Authentication(input);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.condition.Condition;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.joda.time.DateTime;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,6 +46,7 @@ public abstract class WatchExecutionContext {
private Transform.Result transformResult;
private ConcurrentMap<String, ActionWrapperResult> actionsResults = ConcurrentCollections.newConcurrentMap();
private String nodeId;
private String user;

public WatchExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watchId, executionTime);
Expand Down Expand Up @@ -85,6 +89,7 @@ public Watch watch() {
public final void ensureWatchExists(CheckedSupplier<Watch, Exception> supplier) throws Exception {
if (watch == null) {
watch = supplier.get();
user = WatchExecutionContext.getUsernameFromWatch(watch);
}
}

Expand Down Expand Up @@ -137,6 +142,11 @@ public String getNodeId() {
return nodeId;
}

/**
* @return The user that executes the watch, which will be stored in the watch history
*/
public String getUser() { return user; }

public void start() {
assert phase == ExecutionPhase.AWAITS_EXECUTION;
relativeStartTime = System.nanoTime();
Expand Down Expand Up @@ -243,4 +253,19 @@ public WatchRecord finish() {
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
}

/**
* Given a watch, this extracts and decodes the relevant auth header and returns the principal of the user that is
* executing the watch.
*/
public static String getUsernameFromWatch(Watch watch) throws IOException {
if (watch != null && watch.status() != null && watch.status().getHeaders() != null) {
String header = watch.status().getHeaders().get(AuthenticationField.AUTHENTICATION_KEY);
if (header != null) {
Authentication auth = Authentication.decode(header);
return auth.getUser().principal();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public abstract class WatchRecord implements ToXContentObject {
private static final ParseField METADATA = new ParseField("metadata");
private static final ParseField EXECUTION_RESULT = new ParseField("result");
private static final ParseField EXCEPTION = new ParseField("exception");
private static final ParseField USER = new ParseField("user");

protected final Wid id;
protected final Watch watch;
private final String nodeId;
protected final TriggerEvent triggerEvent;
protected final ExecutionState state;
private final String user;

// only emitted to xcontent in "debug" mode
protected final Map<String, Object> vars;
Expand All @@ -60,7 +62,7 @@ public abstract class WatchRecord implements ToXContentObject {

private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map<String, Object> vars, ExecutableInput input,
ExecutableCondition condition, Map<String, Object> metadata, Watch watch, WatchExecutionResult executionResult,
String nodeId) {
String nodeId, String user) {
this.id = id;
this.triggerEvent = triggerEvent;
this.state = state;
Expand All @@ -71,15 +73,16 @@ private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map
this.executionResult = executionResult;
this.watch = watch;
this.nodeId = nodeId;
this.user = user;
}

private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, String nodeId) {
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId);
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId, null);
}

private WatchRecord(WatchRecord record, ExecutionState state) {
this(record.id, record.triggerEvent, state, record.vars, record.input, record.condition, record.metadata, record.watch,
record.executionResult, record.nodeId);
record.executionResult, record.nodeId, record.user);
}

private WatchRecord(WatchExecutionContext context, ExecutionState state) {
Expand All @@ -88,12 +91,13 @@ private WatchRecord(WatchExecutionContext context, ExecutionState state) {
context.watch() != null ? context.watch().condition() : null,
context.watch() != null ? context.watch().metadata() : null,
context.watch(),
null, context.getNodeId());
null, context.getNodeId(), context.getUser());
}

private WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
this(context.id(), context.triggerEvent(), getState(executionResult), context.vars(), context.watch().input(),
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId());
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId(),
context.getUser());
}

public static ExecutionState getState(WatchExecutionResult executionResult) {
Expand Down Expand Up @@ -152,6 +156,9 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
builder.field(NODE.getPreferredName(), nodeId);
builder.field(STATE.getPreferredName(), state.id());

if (user != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

builder.field(USER.getPreferredName(), user);
}
if (watch != null && watch.status() != null) {
builder.field(STATUS.getPreferredName(), watch.status(), params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 6: upgrade to ES 6, removal of _status field
// version 7: add full exception stack traces for better debugging
// version 8: fix slack attachment property not to be dynamic, causing field type issues
// version 9: add a user field defining which user executed the watch
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "8";
public static final String INDEX_TEMPLATE_VERSION = "9";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
public static final String WATCHES_TEMPLATE_NAME = ".watches";
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
"messages": {
"type": "text"
},
"user": {
"type": "text"
},
"exception" : {
"type" : "object",
"enabled" : false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.core.watcher.actions.Action;
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
Expand Down Expand Up @@ -85,6 +88,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -1072,6 +1076,33 @@ public void testManualWatchExecutionContextGetsAlwaysExecuted() throws Exception
assertThat(watchRecord.state(), is(ExecutionState.EXECUTED));
}

public void testLoadingWatchExecutionUser() throws Exception {
DateTime now = now(UTC);
Watch watch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);

// Should be null
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertNull(context.getUser());

// Should still be null, header is not yet set
when(watch.status()).thenReturn(status);
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertNull(context.getUser());

Authentication authentication = new Authentication(new User("joe", "admin"),
new Authentication.RealmRef("native_realm", "native", "node1"), null);

// Should no longer be null now that the proper header is set
when(status.getHeaders()).thenReturn(Collections.singletonMap(AuthenticationField.AUTHENTICATION_KEY, authentication.encode()));
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertThat(context.getUser(), equalTo("joe"));
}

private WatchExecutionContext createMockWatchExecutionContext(String watchId, DateTime executionTime) {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(ctx.id()).thenReturn(new Wid(watchId, executionTime));
Expand Down
1 change: 1 addition & 0 deletions x-pack/qa/smoke-test-watcher-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ watcher_manager:
run_as:
- powerless_user
- watcher_manager
- x_pack_rest_user

watcher_monitor:
cluster:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,63 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }




---
"Test watch is runas user properly recorded":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "my_watch" }

- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers

- do:
headers: { es-security-runas-user: x_pack_rest_user }
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "x_pack_rest_user" }


---
"Test watch search input does not work against index user is not allowed to read":

Expand Down Expand Up @@ -130,6 +183,7 @@ teardown:
- match: { watch_record.watch_id: "my_watch" }
# because we are not allowed to read the index, there wont be any data
- match: { watch_record.state: "execution_not_needed" }
- match: { watch_record.user: "watcher_manager" }


---
Expand Down Expand Up @@ -272,6 +326,7 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }

- do:
get:
Expand Down Expand Up @@ -320,6 +375,7 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }

- do:
get:
Expand Down