diff --git a/docs/changelog/94724.yaml b/docs/changelog/94724.yaml new file mode 100644 index 0000000000000..a821fe1fa71e4 --- /dev/null +++ b/docs/changelog/94724.yaml @@ -0,0 +1,5 @@ +pr: 94724 +summary: Expose authorization failure as transform health issue +area: Transform +type: enhancement +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java index 2c7964ccac457..be99bc7d2106c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksRequest; @@ -19,6 +20,7 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -46,6 +48,7 @@ public static class Request extends BaseTasksRequest { private final String id; private final boolean deferValidation; private TransformConfig config; + private AuthorizationState authState; public Request(TransformConfigUpdate update, String id, boolean deferValidation, TimeValue timeout) { this.update = update; @@ -62,6 +65,11 @@ public Request(StreamInput in) throws IOException { if (in.readBoolean()) { this.config = new TransformConfig(in); } + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + if (in.readBoolean()) { + this.authState = new AuthorizationState(in); + } + } } public static Request fromXContent( @@ -124,6 +132,14 @@ public void setConfig(TransformConfig config) { this.config = config; } + public AuthorizationState getAuthState() { + return authState; + } + + public void setAuthState(AuthorizationState authState) { + this.authState = authState; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -136,12 +152,20 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); config.writeTo(out); } + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + if (authState == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + authState.writeTo(out); + } + } } @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(getTimeout(), update, id, deferValidation, config); + return Objects.hash(getTimeout(), update, id, deferValidation, config, authState); } @Override @@ -159,6 +183,7 @@ public boolean equals(Object obj) { && this.deferValidation == other.deferValidation && this.id.equals(other.id) && Objects.equals(config, other.config) + && Objects.equals(authState, other.authState) && getTimeout().equals(other.getTimeout()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java new file mode 100644 index 0000000000000..0d7c4de935592 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.transforms; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.util.Locale; +import java.util.Objects; + +/** + * {@link AuthorizationState} holds the state of the authorization performed in the past. + * By examining the instance of this class the caller can learn whether or not the user was authorized to access the source/dest indices + * present in the {@link TransformConfig}. + * + * This class is immutable. + */ +public class AuthorizationState implements Writeable, ToXContentObject { + + public static AuthorizationState green() { + return new AuthorizationState(System.currentTimeMillis(), HealthStatus.GREEN, null); + } + + public static boolean isNullOrGreen(AuthorizationState authState) { + return authState == null || HealthStatus.GREEN.equals(authState.getStatus()); + } + + public static AuthorizationState red(Exception e) { + return new AuthorizationState(System.currentTimeMillis(), HealthStatus.RED, e != null ? e.getMessage() : "unknown exception"); + } + + public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField STATUS = new ParseField("status"); + public static final ParseField LAST_AUTH_ERROR = new ParseField("last_auth_error"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "transform_authorization_state", + true, + a -> new AuthorizationState((Long) a[0], (HealthStatus) a[1], (String) a[2]) + ); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP); + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return HealthStatus.valueOf(p.text().toUpperCase(Locale.ROOT)); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, STATUS, ObjectParser.ValueType.STRING); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LAST_AUTH_ERROR); + } + + private final long timestampMillis; + private final HealthStatus status; + @Nullable + private final String lastAuthError; + + public AuthorizationState(Long timestamp, HealthStatus status, @Nullable String lastAuthError) { + this.timestampMillis = timestamp; + this.status = status; + this.lastAuthError = lastAuthError; + } + + public AuthorizationState(StreamInput in) throws IOException { + this.timestampMillis = in.readLong(); + this.status = in.readEnum(HealthStatus.class); + this.lastAuthError = in.readOptionalString(); + } + + public Instant getTimestamp() { + return Instant.ofEpochMilli(timestampMillis); + } + + public HealthStatus getStatus() { + return status; + } + + public String getLastAuthError() { + return lastAuthError; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP.getPreferredName(), timestampMillis); + builder.field(STATUS.getPreferredName(), status.xContentValue()); + if (lastAuthError != null) { + builder.field(LAST_AUTH_ERROR.getPreferredName(), lastAuthError); + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(timestampMillis); + status.writeTo(out); + out.writeOptionalString(lastAuthError); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + AuthorizationState that = (AuthorizationState) other; + + return this.timestampMillis == that.timestampMillis + && this.status.value() == that.status.value() + && Objects.equals(this.lastAuthError, that.lastAuthError); + } + + @Override + public int hashCode() { + return Objects.hash(timestampMillis, status, lastAuthError); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index e6e73fec5893c..b122e96339e9e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -263,7 +263,7 @@ public static TransformCheckpoint fromXContent(final XContentParser parser, bool public static String documentId(String transformId, long checkpoint) { if (checkpoint < 0) { - throw new IllegalArgumentException("checkpoint must be a positive number"); + throw new IllegalArgumentException("checkpoint must be a non-negative number"); } return NAME + "-" + transformId + "-" + checkpoint; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java index dfe0fabf00f4e..25e25dfe80e0f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java @@ -34,7 +34,7 @@ public class TransformConfigUpdate implements Writeable { public static final String NAME = "data_frame_transform_config_update"; - public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + public static final TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null, null); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, @@ -235,6 +235,10 @@ public boolean changesSettings(TransformConfig config) { return isNullOrEqual(settings, config.getSettings()) == false; } + public boolean changesHeaders(TransformConfig config) { + return isNullOrEqual(headers, config.getHeaders()) == false; + } + private boolean isNullOrEqual(Object lft, Object rgt) { return lft == null || lft.equals(rgt); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssue.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssue.java index 97764067990c9..d53df56e5d9a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssue.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssue.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -21,18 +22,23 @@ public class TransformHealthIssue implements Writeable, ToXContentObject { + private static final String TYPE = "type"; private static final String ISSUE = "issue"; private static final String DETAILS = "details"; private static final String COUNT = "count"; private static final String FIRST_OCCURRENCE = "first_occurrence"; private static final String FIRST_OCCURRENCE_HUMAN_READABLE = FIRST_OCCURRENCE + "_string"; + private static final String DEFAULT_TYPE_PRE_8_8 = "unknown"; + + private final String type; private final String issue; private final String details; private final int count; private final Instant firstOccurrence; - public TransformHealthIssue(String issue, String details, int count, Instant firstOccurrence) { + public TransformHealthIssue(String type, String issue, String details, int count, Instant firstOccurrence) { + this.type = Objects.requireNonNull(type); this.issue = Objects.requireNonNull(issue); this.details = details; if (count < 1) { @@ -43,12 +49,21 @@ public TransformHealthIssue(String issue, String details, int count, Instant fir } public TransformHealthIssue(StreamInput in) throws IOException { + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + this.type = in.readString(); + } else { + this.type = DEFAULT_TYPE_PRE_8_8; + } this.issue = in.readString(); this.details = in.readOptionalString(); this.count = in.readVInt(); this.firstOccurrence = in.readOptionalInstant(); } + public String getType() { + return type; + } + public String getIssue() { return issue; } @@ -68,6 +83,7 @@ public Instant getFirstOccurrence() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field(TYPE, type); builder.field(ISSUE, issue); if (Strings.isNullOrEmpty(details) == false) { builder.field(DETAILS, details); @@ -81,6 +97,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + out.writeString(type); + } out.writeString(issue); out.writeOptionalString(details); out.writeVInt(count); @@ -100,6 +119,7 @@ public boolean equals(Object other) { TransformHealthIssue that = (TransformHealthIssue) other; return this.count == that.count + && Objects.equals(this.type, that.type) && Objects.equals(this.issue, that.issue) && Objects.equals(this.details, that.details) && Objects.equals(this.firstOccurrence, that.firstOccurrence); @@ -107,7 +127,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(issue, details, count, firstOccurrence); + return Objects.hash(type, issue, details, count, firstOccurrence); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java index 50f4ba56aaabc..f09291d5a9a7d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java @@ -45,6 +45,8 @@ public class TransformState implements Task.Status, PersistentTaskState { private NodeAttributes node; private final boolean shouldStopAtNextCheckpoint; + @Nullable + private final AuthorizationState authState; public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -57,6 +59,7 @@ public class TransformState implements Task.Status, PersistentTaskState { public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint"); + public static final ParseField AUTH_STATE = new ParseField("auth_state"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, args -> { @@ -75,6 +78,7 @@ public class TransformState implements Task.Status, PersistentTaskState { TransformProgress progress = (TransformProgress) args[6]; NodeAttributes node = (NodeAttributes) args[7]; boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean) args[8]; + AuthorizationState authState = (AuthorizationState) args[9]; return new TransformState( taskState, @@ -84,7 +88,8 @@ public class TransformState implements Task.Status, PersistentTaskState { reason, progress, node, - shouldStopAtNextCheckpoint + shouldStopAtNextCheckpoint, + authState ); }); @@ -98,6 +103,7 @@ public class TransformState implements Task.Status, PersistentTaskState { PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT); + PARSER.declareField(optionalConstructorArg(), AuthorizationState.PARSER::apply, AUTH_STATE, ValueType.OBJECT); } public TransformState( @@ -108,7 +114,8 @@ public TransformState( @Nullable String reason, @Nullable TransformProgress progress, @Nullable NodeAttributes node, - boolean shouldStopAtNextCheckpoint + boolean shouldStopAtNextCheckpoint, + @Nullable AuthorizationState authState ) { this.taskState = taskState; this.indexerState = indexerState; @@ -118,29 +125,7 @@ public TransformState( this.progress = progress; this.node = node; this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint; - } - - public TransformState( - TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress, - @Nullable NodeAttributes node - ) { - this(taskState, indexerState, position, checkpoint, reason, progress, node, false); - } - - public TransformState( - TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress - ) { - this(taskState, indexerState, position, checkpoint, reason, progress, null); + this.authState = authState; } public TransformState(StreamInput in) throws IOException { @@ -165,6 +150,11 @@ public TransformState(StreamInput in) throws IOException { } else { shouldStopAtNextCheckpoint = false; } + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + authState = in.readOptionalWriteable(AuthorizationState::new); + } else { + authState = null; + } } public TransformTaskState getTaskState() { @@ -204,6 +194,10 @@ public boolean shouldStopAtNextCheckpoint() { return shouldStopAtNextCheckpoint; } + public AuthorizationState getAuthState() { + return authState; + } + public static TransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -231,6 +225,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NODE.getPreferredName(), node); } builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint); + if (authState != null) { + builder.field(AUTH_STATE.getPreferredName(), authState); + } builder.endObject(); return builder; } @@ -258,6 +255,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersion.V_7_6_0)) { out.writeBoolean(shouldStopAtNextCheckpoint); } + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + out.writeOptionalWriteable(authState); + } } @Override @@ -279,12 +279,13 @@ public boolean equals(Object other) { && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) - && Objects.equals(this.node, that.node); + && Objects.equals(this.node, that.node) + && Objects.equals(this.authState, that.authState); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint, authState); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index d79c9219ead5e..09e0837256a35 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -49,16 +49,12 @@ public class TransformStats implements Writeable, ToXContentObject { private final TransformHealth health; public static TransformStats initialStats(String id) { - return stoppedStats(id, new TransformIndexerStats()); - } - - public static TransformStats stoppedStats(String id, TransformIndexerStats indexerTransformStats) { return new TransformStats( id, State.STOPPED, null, null, - indexerTransformStats, + new TransformIndexerStats(), TransformCheckpointingInfo.EMPTY, TransformHealth.GREEN ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java index 555383a9fdc0b..bcfe2b1728cbf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationStateTests; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -33,6 +34,9 @@ protected Request createTestInstance() { if (randomBoolean()) { request.setConfig(TransformConfigTests.randomTransformConfig()); } + if (randomBoolean()) { + request.setAuthState(AuthorizationStateTests.randomAuthorizationState()); + } return request; } @@ -70,5 +74,4 @@ protected Request mutateInstance(Request instance) { return new Request(update, id, deferValidation, timeout); } - } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java new file mode 100644 index 0000000000000..0173d67116774 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.transforms; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class AuthorizationStateTests extends AbstractSerializingTransformTestCase { + + public static AuthorizationState randomAuthorizationState() { + return new AuthorizationState( + randomNonNegativeLong(), + randomFrom(HealthStatus.values()), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100) + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return AuthorizationState::new; + } + + @Override + protected AuthorizationState createTestInstance() { + return randomAuthorizationState(); + } + + @Override + protected AuthorizationState mutateInstance(AuthorizationState instance) { + int statusCount = HealthStatus.values().length; + assert statusCount > 1; + return new AuthorizationState( + instance.getTimestamp().toEpochMilli() + 1, + HealthStatus.values()[(instance.getStatus().ordinal() + 1) % statusCount], + instance.getLastAuthError() == null ? randomAlphaOfLengthBetween(1, 100) : null + ); + } + + @Override + protected AuthorizationState doParseInstance(XContentParser parser) throws IOException { + return AuthorizationState.PARSER.apply(parser, null); + } + + public void testGreen() { + AuthorizationState authState = AuthorizationState.green(); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.GREEN))); + assertThat(authState.getLastAuthError(), is(nullValue())); + } + + public void testRed() { + Exception e = new Exception("some exception"); + AuthorizationState authState = AuthorizationState.red(e); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(authState.getLastAuthError(), is(equalTo("some exception"))); + + authState = AuthorizationState.red(null); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(authState.getLastAuthError(), is(equalTo("unknown exception"))); + } + + public void testIsNullOrGreen() { + assertThat(AuthorizationState.isNullOrGreen(null), is(true)); + assertThat(AuthorizationState.isNullOrGreen(AuthorizationState.green()), is(true)); + Exception e = new Exception("some exception"); + assertThat(AuthorizationState.isNullOrGreen(AuthorizationState.red(e)), is(false)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java index 5e8e0d0cdb561..390211a43daf1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java @@ -25,6 +25,7 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; public class TransformCheckpointTests extends AbstractSerializingTransformTestCase { @@ -283,6 +284,14 @@ public void testGetChangedIndices() { ); } + public void testDocumentId() { + assertThat(TransformCheckpoint.documentId("my-transform", 0), is(equalTo("data_frame_transform_checkpoint-my-transform-0"))); + assertThat(TransformCheckpoint.documentId("my-transform", 1), is(equalTo("data_frame_transform_checkpoint-my-transform-1"))); + assertThat(TransformCheckpoint.documentId("my-transform", 2), is(equalTo("data_frame_transform_checkpoint-my-transform-2"))); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> TransformCheckpoint.documentId("my-transform", -1)); + assertThat(e.getMessage(), is(equalTo("checkpoint must be a non-negative number"))); + } + private static Map randomCheckpointsByIndex() { Map checkpointsByIndex = new TreeMap<>(); int indices = randomIntBetween(1, 10); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index c0858574be981..14c28e6407f9e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -22,6 +22,7 @@ import java.time.Instant; import java.util.Collections; import java.util.Map; +import java.util.Optional; import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -67,8 +68,9 @@ protected Reader instanceReader() { public void testIsNoop() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { TransformConfig config = randomTransformConfig(); - TransformConfigUpdate update = TransformConfigUpdate.EMPTY; - assertTrue("null update is not noop", update.isNoop(config)); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertTrue("null update should be no-op", update.isNoop(config)); + update = new TransformConfigUpdate( config.getSource(), config.getDestination(), @@ -79,7 +81,7 @@ public void testIsNoop() { config.getMetadata(), config.getRetentionPolicyConfig() ); - assertTrue("equal update is not noop", update.isNoop(config)); + assertTrue("equal update should be no-op", update.isNoop(config)); update = new TransformConfigUpdate( config.getSource(), @@ -91,10 +93,38 @@ public void testIsNoop() { config.getMetadata(), config.getRetentionPolicyConfig() ); - assertFalse("true update is noop", update.isNoop(config)); + assertFalse("true update should not be no-op", update.isNoop(config)); } } + public void testChangesSettings() { + TransformConfig config = randomTransformConfig(); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertFalse("null update does not change settings", update.changesSettings(config)); + + update = new TransformConfigUpdate(null, null, null, null, null, config.getSettings(), null, null); + assertFalse("equal update does not change settings", update.changesSettings(config)); + + SettingsConfig newSettings = new SettingsConfig.Builder(config.getSettings()).setMaxPageSearchSize( + Optional.ofNullable(config.getSettings().getMaxPageSearchSize()).orElse(0) + 1 + ).build(); + update = new TransformConfigUpdate(null, null, null, null, null, newSettings, null, null); + assertTrue("true update changes settings", update.changesSettings(config)); + } + + public void testChangesHeaders() { + TransformConfig config = randomTransformConfig(); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertFalse("null update does not change headers", update.changesHeaders(config)); + + update.setHeaders(config.getHeaders()); + assertFalse("equal update does not change headers", update.changesHeaders(config)); + + Map newHeaders = Map.of("new-key", "new-value"); + update.setHeaders(newHeaders); + assertTrue("true update changes headers", update.changesHeaders(config)); + } + public void testApply() { TransformConfig config = new TransformConfig( "time-transform", diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssueTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssueTests.java index 487de07bf096e..9595ad9f35787 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssueTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformHealthIssueTests.java @@ -7,15 +7,21 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.io.IOException; import java.time.Instant; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + public class TransformHealthIssueTests extends AbstractWireSerializingTestCase { public static TransformHealthIssue randomTransformHealthIssue() { return new TransformHealthIssue( + randomAlphaOfLengthBetween(10, 200), randomAlphaOfLengthBetween(10, 200), randomBoolean() ? randomAlphaOfLengthBetween(10, 200) : null, randomIntBetween(1, 10), @@ -37,4 +43,17 @@ protected TransformHealthIssue createTestInstance() { protected TransformHealthIssue mutateInstance(TransformHealthIssue instance) { return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 } + + public void testMissingTypePre88() throws IOException { + TransformHealthIssue originalIssue = new TransformHealthIssue("some-type", "some-issue", null, 1, null); + assertThat(originalIssue.getType(), is(equalTo("some-type"))); + TransformHealthIssue deserializedIssue = copyInstance( + originalIssue, + getNamedWriteableRegistry(), + (out, value) -> value.writeTo(out), + in -> new TransformHealthIssue(in), + TransportVersion.V_8_7_0 + ); + assertThat(deserializedIssue.getType(), is(equalTo("unknown"))); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java index 159109a8e11df..b8a71359c2def 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java @@ -32,7 +32,8 @@ public static TransformState randomTransformState() { randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomTransformProgress(), randomBoolean() ? null : randomNodeAttributes(), - randomBoolean() + randomBoolean(), + randomBoolean() ? null : AuthorizationStateTests.randomAuthorizationState() ); } @@ -75,15 +76,28 @@ public void testBackwardsSerialization() throws IOException { randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomTransformProgress(), randomBoolean() ? null : randomNodeAttributes(), - false - ); // Will be false after BWC deserialization + false, + randomBoolean() ? null : AuthorizationStateTests.randomAuthorizationState() + ); + // auth_state will be null after BWC deserialization + TransformState expectedState = new TransformState( + state.getTaskState(), + state.getIndexerState(), + state.getPosition(), + state.getCheckpoint(), + state.getReason(), + state.getProgress(), + state.getNode(), + state.shouldStopAtNextCheckpoint(), + null + ); try (BytesStreamOutput output = new BytesStreamOutput()) { output.setTransportVersion(TransportVersion.V_7_5_0); state.writeTo(output); try (StreamInput in = output.bytes().streamInput()) { in.setTransportVersion(TransportVersion.V_7_5_0); TransformState streamedState = new TransformState(in); - assertEquals(state, streamedState); + assertEquals(expectedState, streamedState); } } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index 4c118f1b6dbb6..82938b76765da 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -24,17 +24,21 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.junit.After; +import java.io.IOException; import java.util.List; -import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class TransformInsufficientPermissionsIT extends TransformRestTestCase { @@ -49,28 +53,34 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase { private static final int NUM_USERS = 28; + // Transform Health statuses + private static final String GREEN = "green"; + private static final String RED = "red"; + @After public void cleanTransforms() throws Exception { cleanUp(); } /** - * defer_validation = false - * unattended = false + * defer_validation = false + * unattended = false + * pre-existing dest index = false */ - public void testTransformPermissionsNoDeferValidationNoUnattended() throws Exception { - testTransformPermissionsNoDeferValidation(false); + public void testTransformPermissionsNoDeferNoUnattended() throws Exception { + testTransformPermissionsNoDefer(false); } /** - * defer_validation = false - * unattended = true + * defer_validation = false + * unattended = true + * pre-existing dest index = false */ - public void testTransformPermissionsNoDeferValidationUnattended() throws Exception { - testTransformPermissionsNoDeferValidation(true); + public void testTransformPermissionsNoDeferUnattended() throws Exception { + testTransformPermissionsNoDefer(true); } - private void testTransformPermissionsNoDeferValidation(boolean unattended) throws Exception { + private void testTransformPermissionsNoDefer(boolean unattended) throws Exception { String transformId = "transform-permissions-nodefer-" + (unattended ? 1 : 0); String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -93,8 +103,7 @@ private void testTransformPermissionsNoDeferValidation(boolean unattended) throw assertThat( e.getMessage(), containsString( - String.format( - Locale.ROOT, + Strings.format( "Cannot create transform [%s] because user %s lacks the required permissions " + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", transformId, @@ -113,14 +122,16 @@ private void testTransformPermissionsNoDeferValidation(boolean unattended) throw .addParameter("defer_validation", String.valueOf(false)) .build() ); + + assertGreen(transformId); } /** - * defer_validation = true - * unattended = false + * defer_validation = true + * unattended = false + * pre-existing dest index = false */ - @SuppressWarnings("unchecked") - public void testTransformPermissionsDeferValidationNoUnattended() throws Exception { + public void testTransformPermissionsDeferNoUnattendedNoDest() throws Exception { String transformId = "transform-permissions-defer-nounattended"; String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -132,51 +143,109 @@ public void testTransformPermissionsDeferValidationNoUnattended() throws Excepti Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() ); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + String authIssue = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertRed(transformId, authIssue); ResponseException e = expectThrows( ResponseException.class, () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build()) ); - assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(500))); - assertThat( - e.getMessage(), - containsString( - String.format(Locale.ROOT, "Could not create destination index [%s] for transform [%s]", destIndexName, transformId) - ) - ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(authIssue)); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertRed(transformId, authIssue); e = expectThrows( ResponseException.class, () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()) ); - assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(500))); - assertThat( - e.getMessage(), - containsString( - String.format(Locale.ROOT, "Could not create destination index [%s] for transform [%s]", destIndexName, transformId) - ) + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(authIssue)); + + assertRed(transformId, authIssue); + + // update transform's credentials so that the transform has permission to access source/dest indices + updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + + assertGreen(transformId); + + // _start API now works + startTransform(config.getId(), RequestOptions.DEFAULT); + waitUntilCheckpoint(transformId, 1); + + assertGreen(transformId); + } + + /** + * defer_validation = true + * unattended = false + * pre-existing dest index = true + */ + public void testTransformPermissionsDeferNoUnattendedDest() throws Exception { + String transformId = "transform-permissions-defer-nounattended-dest-exists"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + createIndex(adminClient(), destIndexName, Settings.EMPTY); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false); + putTransform( + transformId, + Strings.toString(config), + RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() + ); + String authIssue = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName ); + assertRed(transformId, authIssue); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + ResponseException e = expectThrows( + ResponseException.class, + () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build()) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(authIssue)); + + assertRed(transformId, authIssue); + + e = expectThrows( + ResponseException.class, + () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(authIssue)); + + assertRed(transformId, authIssue); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + assertGreen(transformId); + // _start API now works startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(transformId, 1); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertGreen(transformId); } /** * defer_validation = true * unattended = false */ - @SuppressWarnings("unchecked") public void testNoTransformAdminRoleInSecondaryAuth() throws Exception { String transformId = "transform-permissions-no-admin-role"; String sourceIndexName = transformId + "-index"; @@ -207,11 +276,11 @@ public void testNoTransformAdminRoleInSecondaryAuth() throws Exception { } /** - * defer_validation = true - * unattended = true + * defer_validation = true + * unattended = true + * pre-existing dest index = false */ - @SuppressWarnings("unchecked") - public void testTransformPermissionsDeferValidationUnattended() throws Exception { + public void testTransformPermissionsDeferUnattendedNoDest() throws Exception { String transformId = "transform-permissions-defer-unattended"; String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -223,28 +292,70 @@ public void testTransformPermissionsDeferValidationUnattended() throws Exception Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() ); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + String authIssue = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertRed(transformId, authIssue); startTransform(config.getId(), RequestOptions.DEFAULT); - // transform is yellow - assertBusy(() -> { - Map stats = getTransformStats(transformId); - assertThat(extractValue(stats, "health", "status"), is(equalTo("yellow"))); - List issues = (List) extractValue(stats, "health", "issues"); - assertThat(issues, hasSize(1)); - assertThat( - (String) extractValue((Map) issues.get(0), "details"), - containsString(String.format(Locale.ROOT, "no such index [%s]", destIndexName)) - ); - }, 10, TimeUnit.SECONDS); + // transform is red with two issues + String noSuchIndexIssue = Strings.format("org.elasticsearch.index.IndexNotFoundException: no such index [%s]", destIndexName); + assertBusy(() -> assertRed(transformId, authIssue, noSuchIndexIssue), 10, TimeUnit.SECONDS); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); waitUntilCheckpoint(transformId, 1); // transform is green again - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertGreen(transformId); + } + + /** + * defer_validation = true + * unattended = true + * pre-existing dest index = true + */ + public void testTransformPermissionsDeferUnattendedDest() throws Exception { + String transformId = "transform-permissions-defer-unattended-dest-exists"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + createIndex(adminClient(), destIndexName, Settings.EMPTY); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, true); + putTransform( + transformId, + Strings.toString(config), + RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() + ); + String authIssue = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertRed(transformId, authIssue); + + startTransform(config.getId(), RequestOptions.DEFAULT); + + // transform's auth state status is still RED, but the health status is GREEN (because dest index exists) + assertRed(transformId, authIssue); + + // update transform's credentials so that the transform has permission to access source/dest indices + updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + waitUntilCheckpoint(transformId, 1); + + // transform is green again + assertGreen(transformId); } public void testPreviewRequestFailsPermissionsCheck() throws Exception { @@ -263,8 +374,7 @@ public void testPreviewRequestFailsPermissionsCheck() throws Exception { assertThat( e.getMessage(), containsString( - String.format( - Locale.ROOT, + Strings.format( "Cannot preview transform [%s] because user %s lacks the required permissions " + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", transformId, @@ -311,4 +421,22 @@ private TransformConfig createConfig(String transformId, String sourceIndexName, return config; } + + private void assertGreen(String transformId) throws IOException { + Map stats = getTransformStats(transformId); + assertThat("Stats were: " + stats, extractValue(stats, "health", "status"), is(equalTo(GREEN))); + assertThat("Stats were: " + stats, extractValue(stats, "health", "issues"), is(nullValue())); + } + + @SuppressWarnings("unchecked") + private void assertRed(String transformId, String... expectedHealthIssueDetails) throws IOException { + Map stats = getTransformStats(transformId); + assertThat("Stats were: " + stats, extractValue(stats, "health", "status"), is(equalTo(RED))); + List issues = (List) extractValue(stats, "health", "issues"); + assertThat("Stats were: " + stats, issues, hasSize(expectedHealthIssueDetails.length)); + Set actualHealthIssueDetailsSet = issues.stream() + .map(issue -> (String) extractValue((Map) issue, "details")) + .collect(toSet()); + assertThat("Stats were: " + stats, actualHealthIssueDetailsSet, containsInAnyOrder(expectedHealthIssueDetails)); + } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java index 317fad1f99aa3..5e7522636514e 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java @@ -77,10 +77,10 @@ public void testUsage() throws Exception { + "be prevented by default" ) ); - // Verify that we have one stat document + // Verify that we have 4 stat documents, one per transform assertBusy(() -> { Map hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest)); - assertEquals(1, XContentMapValues.extractValue("hits.total.value", hasStatsMap)); + assertEquals(4, XContentMapValues.extractValue("hits.total.value", hasStatsMap)); }); startAndWaitForContinuousTransform("test_usage_continuous", "pivot_reviews_continuous", null); diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java new file mode 100644 index 0000000000000..c6c98c2708d86 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.persistence; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class AuthorizationStatePersistenceUtilsTests extends TransformSingleNodeTestCase { + + private ClusterService clusterService; + private IndexBasedTransformConfigManager transformConfigManager; + + @Before + public void createComponents() { + clusterService = mock(ClusterService.class); + transformConfigManager = new IndexBasedTransformConfigManager( + clusterService, + TestIndexNameExpressionResolver.newInstance(), + client(), + xContentRegistry() + ); + } + + public void testAuthState() throws InterruptedException { + String transformId = "transform_test_auth_state_create_read_update"; + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + (AuthorizationState) null + ); + + AuthorizationState greenAuthState = AuthorizationState.green(); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + greenAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + greenAuthState + ); + + AuthorizationState redAuthState = AuthorizationState.red(new ElasticsearchSecurityException("missing privileges")); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + redAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + redAuthState + ); + + AuthorizationState otherRedAuthState = AuthorizationState.red(new ElasticsearchSecurityException("other missing privileges")); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + otherRedAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + otherRedAuthState + ); + } + + private void assertAsync(Consumer> function, T expected) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> assertThat(r, is(equalTo(expected))), e -> fail("got unexpected exception: " + e.getMessage())), + latch + ); + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java index 453877e0ed319..b401068087b0d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java @@ -13,7 +13,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.RemoteClusterLicenseChecker; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -40,6 +42,7 @@ final class TransformPrivilegeChecker { static void checkPrivileges( String operationName, + Settings settings, SecurityContext securityContext, IndexNameExpressionResolver indexNameExpressionResolver, ClusterState clusterState, @@ -48,6 +51,8 @@ static void checkPrivileges( boolean checkDestIndexPrivileges, ActionListener listener ) { + assert XPackSettings.SECURITY_ENABLED.get(settings); + useSecondaryAuthIfAvailable(securityContext, () -> { final String username = securityContext.getUser().principal(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java index a0d06066afb35..2a087486ac6dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; @@ -24,6 +25,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -57,24 +59,35 @@ public enum Status { } // the new config after the update + @Nullable private final TransformConfig config; + // the auth state to persist after the update + @Nullable + private final AuthorizationState authState; + // the action taken for the upgrade private final Status status; - UpdateResult(final TransformConfig config, final Status status) { + UpdateResult(final TransformConfig config, final AuthorizationState authState, final Status status) { this.config = config; + this.authState = authState; this.status = status; } - public Status getStatus() { - return status; - } - @Nullable public TransformConfig getConfig() { return config; } + + @Nullable + public AuthorizationState getAuthState() { + return authState; + } + + public Status getStatus() { + return status; + } } /** @@ -115,14 +128,15 @@ public static void updateTransform( ActionListener listener ) { // rewrite config into a new format if necessary - TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config); - TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig; + final TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config); + final TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig; + final SetOnce authStateHolder = new SetOnce<>(); // <5> Update checkpoints ActionListener updateStateListener = ActionListener.wrap(lastCheckpoint -> { // config was updated, but the transform has no state or checkpoint if (lastCheckpoint == null || lastCheckpoint == -1) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.UPDATED)); return; } @@ -131,7 +145,7 @@ public static void updateTransform( lastCheckpoint, transformConfigManager, ActionListener.wrap( - r -> listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)), + r -> listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.UPDATED)), listener::onFailure ) ); @@ -153,12 +167,12 @@ public static void updateTransform( if (config.getVersion() != null && config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED) && updatedConfig.equals(config)) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NONE)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.NONE)); return; } if (dryRun) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NEEDS_UPDATE)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.NEEDS_UPDATE)); return; } @@ -175,22 +189,29 @@ public static void updateTransform( }, listener::onFailure); // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap( - aVoid -> validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener), - listener::onFailure - ); + ActionListener checkPrivilegesListener = ActionListener.wrap(authState -> { + authStateHolder.set(authState); + validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener); + }, listener::onFailure); // <1> Early check to verify that the user can create the destination index and can read from the source - if (checkAccess && XPackSettings.SECURITY_ENABLED.get(settings) && deferValidation == false) { + if (checkAccess && XPackSettings.SECURITY_ENABLED.get(settings)) { TransformPrivilegeChecker.checkPrivileges( "update", + settings, securityContext, indexNameExpressionResolver, clusterState, client, updatedConfig, true, - checkPrivilegesListener + ActionListener.wrap(aVoid -> checkPrivilegesListener.onResponse(AuthorizationState.green()), e -> { + if (deferValidation) { + checkPrivilegesListener.onResponse(AuthorizationState.red(e)); + } else { + checkPrivilegesListener.onFailure(e); + } + }) ); } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index cfa532735d32a..c7b443fc14063 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction.Response; import org.elasticsearch.xpack.core.transform.transforms.NodeAttributes; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; -import org.elasticsearch.xpack.core.transform.transforms.TransformHealth; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformStats; import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; @@ -247,7 +246,7 @@ static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpo null, task.getStats(), checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo, - TransformHealthChecker.checkTransform(task) + TransformHealthChecker.checkTransform(task, transformState.getAuthState()) ); } @@ -359,7 +358,11 @@ private void addCheckpointingInfoForTransformsWithoutTasks( null, stat.getTransformStats(), checkpointingInfo, - TransformHealthChecker.checkUnassignedTransform(stat.getId(), clusterState) + TransformHealthChecker.checkUnassignedTransform( + stat.getId(), + clusterState, + stat.getTransformState().getAuthState() + ) ) ); } else { @@ -371,7 +374,7 @@ private void addCheckpointingInfoForTransformsWithoutTasks( null, stat.getTransformStats(), checkpointingInfo, - TransformHealth.GREEN + TransformHealthChecker.checkTransform(stat.getTransformState().getAuthState()) ) ); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 47d768e01daf0..edf9fefb8416a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -174,6 +174,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (XPackSettings.SECURITY_ENABLED.get(nodeSettings)) { TransformPrivilegeChecker.checkPrivileges( "preview", + nodeSettings, securityContext, indexNameExpressionResolver, clusterState, @@ -184,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener li DUMMY_DEST_INDEX_FOR_PREVIEW.equals(config.getDestination().getIndex()) == false, checkPrivilegesListener ); - } else { // No security enabled, just create the transform + } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 780aac0c1ee5b..14ccfcf0111d3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -35,9 +35,11 @@ import org.elasticsearch.xpack.core.transform.action.PutTransformAction; import org.elasticsearch.xpack.core.transform.action.PutTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; @@ -123,16 +125,38 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS ); // <1> Early check to verify that the user can create the destination index and can read from the source - if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { + if (XPackSettings.SECURITY_ENABLED.get(settings)) { TransformPrivilegeChecker.checkPrivileges( "create", + settings, securityContext, indexNameExpressionResolver, clusterState, client, config, true, - checkPrivilegesListener + ActionListener.wrap( + aVoid -> AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + transformId, + AuthorizationState.green(), + checkPrivilegesListener + ), + e -> { + if (request.isDeferValidation()) { + AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + transformId, + AuthorizationState.red(e), + checkPrivilegesListener + ); + } else { + checkPrivilegesListener.onFailure(e); + } + } + ) ); } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index a8bccbf5d0077..7848271cc9c89 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -24,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; @@ -34,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -41,6 +45,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; import org.elasticsearch.xpack.transform.transforms.TransformNodes; @@ -48,7 +53,6 @@ import java.time.Clock; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -125,8 +129,8 @@ protected void masterOperation( ) { TransformNodes.warnIfNoTransformNodes(state); - final AtomicReference transformTaskParamsHolder = new AtomicReference<>(); - final AtomicReference transformConfigHolder = new AtomicReference<>(); + final SetOnce transformTaskParamsHolder = new SetOnce<>(); + final SetOnce transformConfigHolder = new SetOnce<>(); // <5> Wait for the allocated task's state to STARTED ActionListener> newPersistentTaskActionListener = ActionListener @@ -192,7 +196,16 @@ protected void masterOperation( }); // <2> run transform validations - ActionListener getTransformListener = ActionListener.wrap(config -> { + ActionListener fetchAuthStateListener = ActionListener.wrap(authState -> { + if (authState != null && HealthStatus.RED.equals(authState.getStatus())) { + // AuthorizationState status is RED which means there was permission check error during PUT or _update. + // Since this transform is *not* unattended (otherwise authState would be null), we fail immediately. + listener.onFailure(new ElasticsearchSecurityException(authState.getLastAuthError(), RestStatus.FORBIDDEN)); + return; + } + + TransformConfig config = transformConfigHolder.get(); + ActionRequestValidationException validationException = config.validate(null); if (request.from() != null && config.getSyncConfig() == null) { validationException = addValidationError( @@ -222,7 +235,6 @@ protected void masterOperation( config.getSource().requiresRemoteCluster() ) ); - transformConfigHolder.set(config); ClientHelper.executeAsyncWithOrigin( client, ClientHelper.TRANSFORM_ORIGIN, @@ -232,7 +244,20 @@ protected void masterOperation( ); }, listener::onFailure); - // <1> Get the config to verify it exists and is valid + // <1> Check if there is an auth error stored for this transform + ActionListener getTransformListener = ActionListener.wrap(config -> { + transformConfigHolder.set(config); + + if (Boolean.TRUE.equals(config.getSettings().getUnattended())) { + // We do not fail the _start request of the unattended transform due to permission issues, + // we just let it run + fetchAuthStateListener.onResponse(null); + } else { + AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, request.getId(), fetchAuthStateListener); + } + }, listener::onFailure); + + // <0> Get the config to verify it exists and is valid transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index ef410affcc4b1..3b2c35b4fe9bb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -34,12 +34,14 @@ import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; @@ -138,14 +140,18 @@ protected void doExecute(Task task, Request request, ActionListener li false, // dryRun true, // checkAccess request.getTimeout(), - ActionListener.wrap(updateResponse -> { - TransformConfig updatedConfig = updateResponse.getConfig(); + ActionListener.wrap(updateResult -> { + TransformConfig originalConfig = configAndVersion.v1(); + TransformConfig updatedConfig = updateResult.getConfig(); + AuthorizationState authState = updateResult.getAuthState(); auditor.info(updatedConfig.getId(), "Updated transform."); - logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResult.getStatus()); checkTransformConfigAndLogWarnings(updatedConfig); - if (update.changesSettings(configAndVersion.v1())) { + boolean updateChangesSettings = update.changesSettings(originalConfig); + boolean updateChangesHeaders = update.changesHeaders(originalConfig); + if (updateChangesSettings || updateChangesHeaders) { PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( request.getId(), clusterState @@ -187,11 +193,21 @@ protected void doExecute(Task task, Request request, ActionListener li request.setNodes(transformTask.getExecutorNode()); request.setConfig(updatedConfig); + request.setAuthState(authState); super.doExecute(task, request, taskUpdateListener); return; + } else if (updateChangesHeaders) { + AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + updatedConfig.getId(), + authState, + ActionListener.wrap(aVoid -> listener.onResponse(new Response(updatedConfig)), listener::onFailure) + ); } + } else { + listener.onResponse(new Response(updatedConfig)); } - listener.onResponse(new Response(updatedConfig)); }, listener::onFailure) ), listener::onFailure @@ -211,8 +227,8 @@ private void checkTransformConfigAndLogWarnings(TransformConfig config) { @Override protected void taskOperation(Task actionTask, Request request, TransformTask transformTask, ActionListener listener) { - // apply the settings transformTask.applyNewSettings(request.getConfig().getSettings()); + transformTask.applyNewAuthState(request.getAuthState()); listener.onResponse(new Response(request.getConfig())); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java index e8a6a243a3f68..222d2f0f8c9f7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java @@ -170,7 +170,7 @@ private void updateOneTransform(String id, boolean dryRun, TimeValue timeout, Ac }, failure -> { // ignore if transform got deleted while upgrade was running if (failure instanceof ResourceNotFoundException) { - listener.onResponse(new UpdateResult(null, UpdateResult.Status.DELETED)); + listener.onResponse(new UpdateResult(null, null, UpdateResult.Status.DELETED)); } else { listener.onFailure(failure); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java new file mode 100644 index 0000000000000..2b3bc52d32f87 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.persistence; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; + +/** + * {@link AuthorizationStatePersistenceUtils} is a utility class built on top of {@link TransformConfigManager}. + * It allows fetching and persisting {@link AuthorizationState} objects. + */ +public final class AuthorizationStatePersistenceUtils { + + private static final Logger logger = LogManager.getLogger(AuthorizationStatePersistenceUtils.class); + + /** + * Fetches persisted authorization state object and returns it back to the caller. + */ + public static void fetchAuthState( + TransformConfigManager transformConfigManager, + String transformId, + ActionListener listener + ) { + ActionListener> getTransformStoredDocListener = ActionListener.wrap( + stateAndStatsAndSeqNoPrimaryTermAndIndex -> { + if (stateAndStatsAndSeqNoPrimaryTermAndIndex == null) { + listener.onResponse(null); + return; + } + TransformState transformState = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1().getTransformState(); + if (transformState == null) { + listener.onResponse(null); + return; + } + AuthorizationState authState = transformState.getAuthState(); + if (authState == null) { + listener.onResponse(null); + return; + } + listener.onResponse(authState); + }, + listener::onFailure + ); + + transformConfigManager.getTransformStoredDoc(transformId, true, getTransformStoredDocListener); + } + + /** + * Persists the authorization state object given as an argument. + */ + public static void persistAuthState( + Settings settings, + TransformConfigManager transformConfigManager, + String transformId, + AuthorizationState authState, + ActionListener listener + ) { + assert XPackSettings.SECURITY_ENABLED.get(settings); + + logger.trace("Started persisting auth state: {}", authState); + + ActionListener persistListener = ActionListener.wrap(unusedResponse -> { + logger.trace("Finished persisting auth state: {}", authState); + listener.onResponse(null); + }, e -> { + logger.trace("Failed persisting auth state: {}, exception = {}", authState, e); + listener.onFailure(e); + }); + + ActionListener> transformStatsActionListener = ActionListener.wrap( + stateAndStatsAndSeqNoPrimaryTermAndIndex -> { + // Stored doc does not exist yet, we need to create one + if (stateAndStatsAndSeqNoPrimaryTermAndIndex == null) { + TransformState state = new TransformState( + TransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0, + null, + null, + null, + false, + authState + ); + TransformIndexerStats stats = new TransformIndexerStats(); + transformConfigManager.putOrUpdateTransformStoredDoc( + new TransformStoredDoc(transformId, state, stats), + null, + persistListener + ); + return; + } + // Stored doc already exists, we just need to update its authState field + TransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1(); + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2(); + TransformState oldState = stateAndStats.getTransformState(); + TransformState newState = new TransformState( + oldState.getTaskState(), + oldState.getIndexerState(), + oldState.getPosition(), + oldState.getCheckpoint(), + oldState.getReason(), + oldState.getProgress(), + oldState.getNode(), + oldState.shouldStopAtNextCheckpoint(), + authState + ); + TransformIndexerStats stats = stateAndStats.getTransformStats(); + transformConfigManager.putOrUpdateTransformStoredDoc( + new TransformStoredDoc(transformId, newState, stats), + seqNoPrimaryTermAndIndex, + persistListener + ); + }, + error -> { + String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId); + logger.error(msg, error); + listener.onFailure(error); + } + ); + + transformConfigManager.getTransformStoredDoc(transformId, true, transformStatsActionListener); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 4e20ec4dcb615..9ef9823e10a07 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; @@ -41,6 +42,7 @@ public interface Listener { private volatile Instant changesLastDetectedAt; private volatile Instant lastSearchTime; private volatile boolean shouldStopAtCheckpoint = false; + private volatile AuthorizationState authState; private volatile int pageSize = 0; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -169,6 +171,14 @@ public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } + public AuthorizationState getAuthState() { + return authState; + } + + public void setAuthState(AuthorizationState authState) { + this.authState = authState; + } + int getPageSize() { return pageSize; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java index a85f034d09d2d..86d8ce4a6173c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java @@ -9,36 +9,88 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.core.Nullable; import org.elasticsearch.health.HealthStatus; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformHealth; import org.elasticsearch.xpack.core.transform.transforms.TransformHealthIssue; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Locale; /** * Check the health of a transform. */ public final class TransformHealthChecker { + /** + * Describes all the known transform health issue types. + * + * The list of issue type can be extended over time, when we add additional health checks. + */ + public enum IssueType { + ASSIGNMENT_FAILED("Failed to assign transform to a node"), + PRIVILEGES_CHECK_FAILED("Privileges check failed"), + TRANSFORM_TASK_FAILED("Transform task state is [failed]"), + TRANSFORM_INDEXER_FAILED("Transform indexer failed"), + TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state"); + + private final String issue; + + IssueType(String issue) { + this.issue = issue; + } + + public TransformHealthIssue newIssue(@Nullable String details, int count, @Nullable Instant firstOccurrence) { + String type = name().toLowerCase(Locale.ROOT); + return new TransformHealthIssue(type, issue, details, count, firstOccurrence); + } + } + // simple boundary to decide when to report a red status vs. a yellow status after consecutive retries static int RED_STATUS_FAILURE_COUNT_BOUNDARY = 5; - public static TransformHealth checkUnassignedTransform(String transformId, ClusterState clusterState) { + public static TransformHealth checkUnassignedTransform( + String transformId, + ClusterState clusterState, + @Nullable AuthorizationState authState + ) { final Assignment assignment = TransformNodes.getAssignment(transformId, clusterState); + final List issues = new ArrayList<>(2); + issues.add(IssueType.ASSIGNMENT_FAILED.newIssue(assignment.getExplanation(), 1, null)); + if (AuthorizationState.isNullOrGreen(authState) == false) { + issues.add(IssueType.PRIVILEGES_CHECK_FAILED.newIssue(authState.getLastAuthError(), 1, null)); + } + return new TransformHealth(HealthStatus.RED, Collections.unmodifiableList(issues)); + } + + public static TransformHealth checkTransform(@Nullable AuthorizationState authState) { + // quick check + if (AuthorizationState.isNullOrGreen(authState)) { + return TransformHealth.GREEN; + } + return new TransformHealth( - HealthStatus.RED, - List.of(new TransformHealthIssue("Failed to assign transform to a node", assignment.getExplanation(), 1, null)) + authState.getStatus(), + List.of(IssueType.PRIVILEGES_CHECK_FAILED.newIssue(authState.getLastAuthError(), 1, null)) ); } public static TransformHealth checkTransform(TransformTask transformTask) { + return checkTransform(transformTask, null); + } + + public static TransformHealth checkTransform(TransformTask transformTask, @Nullable AuthorizationState authState) { // quick check if (TransformTaskState.FAILED.equals(transformTask.getState().getTaskState()) == false && transformTask.getContext().getFailureCount() == 0 - && transformTask.getContext().getStatePersistenceFailureCount() == 0) { + && transformTask.getContext().getStatePersistenceFailureCount() == 0 + && AuthorizationState.isNullOrGreen(authState)) { return TransformHealth.GREEN; } @@ -46,15 +98,15 @@ public static TransformHealth checkTransform(TransformTask transformTask) { List issues = new ArrayList<>(); HealthStatus maxStatus = HealthStatus.GREEN; + if (AuthorizationState.isNullOrGreen(authState) == false) { + maxStatus = authState.getStatus(); + issues.add(IssueType.PRIVILEGES_CHECK_FAILED.newIssue(authState.getLastAuthError(), 1, null)); + } + if (TransformTaskState.FAILED.equals(transformTask.getState().getTaskState())) { maxStatus = HealthStatus.RED; issues.add( - new TransformHealthIssue( - "Transform task state is [failed]", - transformTask.getState().getReason(), - 1, - transformContext.getStateFailureTime() - ) + IssueType.TRANSFORM_TASK_FAILED.newIssue(transformTask.getState().getReason(), 1, transformContext.getStateFailureTime()) ); } @@ -69,8 +121,7 @@ public static TransformHealth checkTransform(TransformTask transformTask) { } issues.add( - new TransformHealthIssue( - "Transform indexer failed", + IssueType.TRANSFORM_INDEXER_FAILED.newIssue( lastFailureMessage, transformContext.getFailureCount(), transformContext.getLastFailureStartTime() @@ -86,8 +137,7 @@ public static TransformHealth checkTransform(TransformTask transformTask) { } issues.add( - new TransformHealthIssue( - "Task encountered failures updating internal state", + IssueType.TRANSFORM_INTERNAL_STATE_UPDATE_FAILED.newIssue( transformContext.getLastStatePersistenceFailure().getMessage(), transformContext.getStatePersistenceFailureCount(), transformContext.getLastStatePersistenceFailureStartTime() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 7bfbc3f078dfb..95f5747935217 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -733,7 +733,8 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p context.getStateReason(), getProgress(), null, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + context.getAuthState() ); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); @@ -845,7 +846,8 @@ private boolean addSetStopAtCheckpointListener(boolean shouldStopAtCheckpoint, A context.getStateReason(), getProgress(), null, - newIndexerState == IndexerState.STARTED + newIndexerState == IndexerState.STARTED, + context.getAuthState() ); // because save state is async we need to block the call until state is persisted, so that the job can not diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 4f48176f24cca..9d8173ca562bc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -214,8 +215,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa } final long lastCheckpoint = stateHolder.get().getCheckpoint(); + final AuthorizationState authState = stateHolder.get().getAuthState(); - startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); + startTask(buildTask, indexerBuilder, authState, lastCheckpoint, startTaskListener); }, error -> { // TODO: do not use the same error message as for loading the last checkpoint String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); @@ -282,7 +284,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa markAsFailed(buildTask, msg); } else { logger.trace("[{}] No stats found (new transform), starting the task", transformId); - startTask(buildTask, indexerBuilder, null, startTaskListener); + startTask(buildTask, indexerBuilder, null, null, startTaskListener); } } ); @@ -377,12 +379,14 @@ private void markAsFailed(TransformTask task, String reason) { private void startTask( TransformTask buildTask, ClientTransformIndexerBuilder indexerBuilder, + AuthorizationState authState, Long previousCheckpoint, ActionListener listener ) { // switch the threadpool to generic, because the caller is on the system_read threadpool threadPool.generic().execute(() -> { buildTask.initializeIndexer(indexerBuilder); + buildTask.setAuthState(authState); // TransformTask#start will fail if the task state is FAILED buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); }); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 4ade3a31608af..8478525cb4a53 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; @@ -115,6 +116,9 @@ public TransformTask( this.initialPosition = initialPosition; this.context = new TransformContext(initialTaskState, initialReason, initialCheckpoint, transform.from(), this); + if (state != null) { + this.context.setAuthState(state.getAuthState()); + } } public ParentTaskAssigningClient getParentTaskClient() { @@ -147,7 +151,8 @@ public TransformState getState() { context.getStateReason(), null, null, - false + false, + context.getAuthState() ); } else { return new TransformState( @@ -158,7 +163,8 @@ public TransformState getState() { context.getStateReason(), getIndexer().getProgress(), null, - context.shouldStopAtCheckpoint() + context.shouldStopAtCheckpoint(), + context.getAuthState() ); } } @@ -210,7 +216,6 @@ public void getCheckpointingInfo( /** * Starts the transform and schedules it to be triggered in the future. * - * * @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint * @param listener The listener to alert once started */ @@ -265,7 +270,8 @@ void start(Long startingCheckpoint, ActionListener void ActionListener listener ) { if (request instanceof HasPrivilegesRequest) { - listener.onResponse((Response) new HasPrivilegesResponse()); + HasPrivilegesRequest hasPrivilegesRequest = (HasPrivilegesRequest) request; + switch (hasPrivilegesRequest.username()) { + case BOB: + // bob has all the privileges + listener.onResponse((Response) new HasPrivilegesResponse()); + break; + case JOHN: + // john does not have required privileges + listener.onFailure(new ElasticsearchSecurityException("missing privileges")); + break; + default: + fail("Unexpected username = " + hasPrivilegesRequest.username()); + } } else if (request instanceof ValidateTransformAction.Request) { listener.onResponse((Response) new ValidateTransformAction.Response(Collections.emptyMap())); } else { @@ -116,7 +134,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -134,6 +152,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); assertEquals(maxCompatibleConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener), config -> { @@ -149,7 +168,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -167,6 +186,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); assertEquals(minCompatibleConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> { @@ -207,7 +227,8 @@ public void testTransformUpdateRewrite() throws InterruptedException { null, // reason null, // progress null, // node attributes - false // shouldStopAtNextCheckpoint + false,// shouldStopAtNextCheckpoint + null // auth state ), TransformIndexerStatsTests.randomStats() ); @@ -218,7 +239,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -236,6 +257,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.UPDATED, updateResult.getStatus()); assertNotEquals(oldConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> { @@ -283,7 +305,7 @@ public void testTransformUpdateDryRun() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -302,6 +324,7 @@ public void testTransformUpdateDryRun() throws InterruptedException { assertEquals(UpdateResult.Status.NEEDS_UPDATE, updateResult.getStatus()); assertNotEquals(oldConfigForDryRunUpdate, updateResult.getConfig()); assertEquals(Version.CURRENT, updateResult.getConfig().getVersion()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration( @@ -313,6 +336,115 @@ public void testTransformUpdateDryRun() throws InterruptedException { ); } + public void testTransformUpdateCheckAccessSuccess() throws InterruptedException { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) + ) + ); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + bobSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + false, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + listener + ), + updateResult -> { + assertThat(updateResult.getStatus(), is(equalTo(UpdateResult.Status.UPDATED))); + assertThat(updateResult.getConfig(), is(not(equalTo(oldConfig)))); + assertThat(updateResult.getConfig().getVersion(), is(equalTo(Version.CURRENT))); + assertThat(updateResult.getAuthState(), is(notNullValue())); + assertThat(updateResult.getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + assertThat(updateResult.getAuthState().getLastAuthError(), is(nullValue())); + } + ); + } + + public void testTransformUpdateCheckAccessFailureDeferValidation() throws InterruptedException { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) + ) + ); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + johnSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + true, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + listener + ), + updateResult -> { + assertThat(updateResult.getStatus(), is(equalTo(UpdateResult.Status.UPDATED))); + assertThat(updateResult.getConfig(), is(not(equalTo(oldConfig)))); + assertThat(updateResult.getConfig().getVersion(), is(equalTo(Version.CURRENT))); + assertThat(updateResult.getAuthState(), is(notNullValue())); + assertThat(updateResult.getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(updateResult.getAuthState().getLastAuthError(), is(equalTo("missing privileges"))); + } + ); + } + + public void testTransformUpdateCheckAccessFailureNoDeferValidation() { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig(); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + TransformUpdater.updateTransform( + johnSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + false, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + ActionListener.wrap( + r -> fail("Should fail due to missing privileges"), + e -> assertThat(e.getMessage(), is(equalTo("missing privileges"))) + ) + ); + } + private void assertUpdate(Consumer> function, Consumer furtherTests) throws InterruptedException { assertAsync(function, furtherTests); @@ -349,4 +481,13 @@ private void assertAsync(Consumer> function, Consumer f function.accept(listener); assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); } + + private static SecurityContext newSecurityContextFor(String username) { + return new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY)) { + @Override + public User getUser() { + return new User(username); + } + }; + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java index 6f990ca420811..a3f56efa9e138 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -6,19 +6,21 @@ */ package org.elasticsearch.xpack.transform.action; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.health.HealthStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformHealth; -import org.elasticsearch.xpack.core.transform.transforms.TransformHealthIssue; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.transforms.TransformContext; +import org.elasticsearch.xpack.transform.transforms.TransformHealthChecker; import org.elasticsearch.xpack.transform.transforms.TransformTask; import java.time.Instant; @@ -30,7 +32,14 @@ public class TransportGetTransformStatsActionTests extends ESTestCase { - private TransformTask task = mock(TransformTask.class); + private static final TransformCheckpointingInfo CHECKPOINTING_INFO = new TransformCheckpointingInfo( + new TransformCheckpointStats(1, null, null, 1, 1), + new TransformCheckpointStats(2, null, null, 2, 5), + 2, + Instant.now(), + Instant.now() + ); + private final TransformTask task = mock(TransformTask.class); public void testDeriveStatsStopped() { String transformId = "transform-with-stats"; @@ -44,17 +53,10 @@ public void testDeriveStatsStopped() { reason, null, null, - true + true, + null ); withIdStateAndStats(transformId, stoppedState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() - ); - assertThat( TransportGetTransformStatsAction.deriveStats(task, null), equalTo( @@ -70,12 +72,22 @@ public void testDeriveStatsStopped() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPED, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN + ) + ) ); reason = "foo"; - stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true); + stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, stoppedState, stats); assertThat( @@ -92,9 +104,61 @@ public void testDeriveStatsStopped() { ) ) ); + assertThat( + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPED, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN + ) + ) + ); + } + + public void testDeriveStatsStoppedWithAuthStateGreen() { + testDeriveStatsStoppedWithAuthState(null, AuthorizationState.green(), TransformCheckpointingInfo.EMPTY, TransformHealth.GREEN); + testDeriveStatsStoppedWithAuthState(CHECKPOINTING_INFO, AuthorizationState.green(), CHECKPOINTING_INFO, TransformHealth.GREEN); + } + + public void testDeriveStatsStoppedWithAuthStateRed() { + AuthorizationState redAuthState = AuthorizationState.red(new ElasticsearchSecurityException("missing privileges")); + TransformHealth expectedHealth = new TransformHealth( + HealthStatus.RED, + List.of(TransformHealthChecker.IssueType.PRIVILEGES_CHECK_FAILED.newIssue("missing privileges", 1, null)) + ); + testDeriveStatsStoppedWithAuthState(null, redAuthState, TransformCheckpointingInfo.EMPTY, expectedHealth); + testDeriveStatsStoppedWithAuthState(CHECKPOINTING_INFO, redAuthState, CHECKPOINTING_INFO, expectedHealth); + } + + private void testDeriveStatsStoppedWithAuthState( + TransformCheckpointingInfo info, + AuthorizationState authState, + TransformCheckpointingInfo expectedInfo, + TransformHealth expectedHealth + ) { + String transformId = "transform-with-auth-state"; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState stoppedState = new TransformState( + TransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0, + null, + null, + null, + true, + authState + ); + withIdStateAndStats(transformId, stoppedState, stats); + assertThat( TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN)) + equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, null, null, stats, expectedInfo, expectedHealth)) ); } @@ -103,20 +167,22 @@ public void testDeriveStatsFailed() { String reason = null; TransformHealth expectedHealth = new TransformHealth( HealthStatus.RED, - List.of(new TransformHealthIssue("Transform task state is [failed]", null, 1, null)) + List.of(TransformHealthChecker.IssueType.TRANSFORM_TASK_FAILED.newIssue(null, 1, null)) ); TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); - TransformState failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); - withIdStateAndStats(transformId, failedState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() + TransformState failedState = new TransformState( + TransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0, + reason, + null, + null, + true, + null ); - + withIdStateAndStats(transformId, failedState, stats); assertThat( TransportGetTransformStatsAction.deriveStats(task, null), equalTo( @@ -132,16 +198,16 @@ public void testDeriveStatsFailed() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, CHECKPOINTING_INFO, expectedHealth)) ); reason = "the task is failed"; expectedHealth = new TransformHealth( HealthStatus.RED, - List.of(new TransformHealthIssue("Transform task state is [failed]", reason, 1, null)) + List.of(TransformHealthChecker.IssueType.TRANSFORM_TASK_FAILED.newIssue(reason, 1, null)) ); - failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); + failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, failedState, stats); assertThat( @@ -159,8 +225,8 @@ public void testDeriveStatsFailed() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, CHECKPOINTING_INFO, expectedHealth)) ); } @@ -176,16 +242,10 @@ public void testDeriveStats() { reason, null, null, - true + true, + null ); withIdStateAndStats(transformId, runningState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() - ); assertThat( TransportGetTransformStatsAction.deriveStats(task, null), @@ -202,7 +262,7 @@ public void testDeriveStats() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), equalTo( new TransformStats( transformId, @@ -210,14 +270,14 @@ public void testDeriveStats() { "transform is set to stop at the next checkpoint", null, stats, - info, + CHECKPOINTING_INFO, TransformHealth.GREEN ) ) ); reason = "foo"; - runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true); + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, runningState, stats); assertThat( @@ -235,12 +295,22 @@ public void testDeriveStats() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPING, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN + ) + ) ); // Stop at next checkpoint is false. - runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false); + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false, null); withIdStateAndStats(transformId, runningState, stats); assertThat( @@ -258,8 +328,18 @@ public void testDeriveStats() { ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.INDEXING, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN + ) + ) ); } @@ -269,5 +349,4 @@ private void withIdStateAndStats(String transformId, TransformState state, Trans when(task.getStats()).thenReturn(stats); when(task.getContext()).thenReturn(new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))); } - } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java index a4c9b858a8233..c9d929fce0047 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java @@ -62,7 +62,7 @@ public void testTaskStateValidationWithTransformTasks() { // test again with a non failed task but this time it has internal state pTasksBuilder.updateTaskState( "non-failed-task", - new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null) + new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null, null, false, null) ); csBuilder = ClusterState.builder(new ClusterName("_name")).metadata(buildMetadata(pTasksBuilder.build())); @@ -76,7 +76,7 @@ public void testTaskStateValidationWithTransformTasks() { ) .updateTaskState( "failed-task", - new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null) + new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null, null, false, null) ); final ClusterState cs = ClusterState.builder(new ClusterName("_name")).metadata(buildMetadata(pTasksBuilder.build())).build(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java index b2ca0769ea976..86193ef511618 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java @@ -7,7 +7,10 @@ package org.elasticsearch.xpack.transform.transforms; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.junit.After; import org.junit.Before; @@ -16,6 +19,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -91,6 +95,22 @@ public void testStateReason() { verify(listener).failureCountChanged(); } + public void testAuthState() { + TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, listener); + assertThat(context.getAuthState(), is(nullValue())); + + context.setAuthState(AuthorizationState.green()); + assertThat(context.getAuthState(), is(notNullValue())); + assertThat(context.getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + context.setAuthState(AuthorizationState.red(new ElasticsearchSecurityException("missing privileges"))); + assertThat(context.getAuthState(), is(notNullValue())); + assertThat(context.getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + + context.setAuthState(null); + assertThat(context.getAuthState(), is(nullValue())); + } + public void testFrom() { Instant from = Instant.ofEpochMilli(randomLongBetween(0, 1_000_000_000_000L)); TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, from, listener); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java index 96678b629b435..3b4604caca5cd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java @@ -112,7 +112,7 @@ private static Instant getNow() { private static void withIdStateAndContext(TransformTask task, String transformId, TransformContext context) { when(task.getTransformId()).thenReturn(transformId); when(task.getState()).thenReturn( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, "", null, null, false) + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, "", null, null, false, null) ); when(task.getContext()).thenReturn(context); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index cedd8c8651317..afa10afbdf638 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -245,7 +245,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -257,7 +257,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -269,7 +269,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -327,7 +327,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -340,7 +340,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -352,7 +352,7 @@ public void fail(String failureMessage, ActionListener listener) { // fail again this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -364,7 +364,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -376,7 +376,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -458,7 +458,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -472,7 +472,7 @@ public void fail(String failureMessage, ActionListener listener) { listener -> configManager.putOrUpdateTransformStoredDoc( new TransformStoredDoc( config.getId(), - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), indexer.getStats() ), indexer.getSeqNoPrimaryTermAndIndex(), @@ -487,7 +487,7 @@ public void fail(String failureMessage, ActionListener listener) { // state persistence should fail with a version conflict this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -500,7 +500,7 @@ public void fail(String failureMessage, ActionListener listener) { // recovered this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -513,7 +513,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index abc2cd14d1990..317e4c5616ce2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -146,7 +146,8 @@ class MockedTransformIndexer extends TransformIndexer { context.getStateReason(), getProgress(), null, - context.shouldStopAtCheckpoint() + context.shouldStopAtCheckpoint(), + null ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index ec286b3af3e6c..bcb92adaf2003 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -389,7 +389,7 @@ private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { } - public TransformPersistentTasksExecutor buildTaskExecutor() { + private TransformPersistentTasksExecutor buildTaskExecutor() { ClusterService clusterService = mock(ClusterService.class); Client client = mock(Client.class); TransformAuditor mockAuditor = mock(TransformAuditor.class); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 44c3d82fbc381..ffe65df048c51 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.transforms; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -19,6 +20,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; @@ -30,6 +32,7 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -117,7 +120,8 @@ public void testStopOnFailedTaskWithStoppedIndexer() { "because", null, null, - false + false, + null ); TransformTask transformTask = new TransformTask( @@ -195,7 +199,8 @@ public void testStopOnFailedTaskWithoutIndexer() { "because", null, null, - false + false, + null ); TransformTask transformTask = new TransformTask( @@ -401,6 +406,51 @@ public void testFindTransformTasks() { assertThat(TransformTask.findTransformTasks(Set.of("transform-4", "transform-5"), clusterState), is(empty())); } + public void testApplyNewAuthState() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); + + TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders(); + TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + + TransformState transformState = new TransformState( + TransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0L, + "because", + null, + null, + false, + AuthorizationState.green() + ); + + TransformTask transformTask = new TransformTask( + 42, + "some_type", + "some_action", + TaskId.EMPTY_TASK_ID, + client, + createTransformTaskParams(transformConfig.getId()), + transformState, + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY), + auditor, + threadPool, + Collections.emptyMap() + ); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + transformTask.applyNewAuthState(AuthorizationState.red(new ElasticsearchSecurityException("missing permissions"))); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(transformTask.getContext().getAuthState().getLastAuthError(), is(equalTo("missing permissions"))); + + transformTask.applyNewAuthState(AuthorizationState.green()); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + transformTask.applyNewAuthState(null); + assertThat(transformTask.getContext().getAuthState(), is(nullValue())); + } + private static TransformTaskParams createTransformTaskParams(String transformId) { return new TransformTaskParams(transformId, Version.CURRENT, TimeValue.timeValueSeconds(10), false); } diff --git a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_health.schema.json b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_health.schema.json index 9ff8a24a27d5e..903814ac96dcd 100644 --- a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_health.schema.json +++ b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_health.schema.json @@ -29,9 +29,14 @@ "description": "describes a single issue", "additionalProperties": false, "required": [ + "type", "issue" ], "properties": { + "type": { + "type": "string", + "description": "unique issue type" + }, "issue": { "type": "string", "description": "single issue description"