diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java index 2c7964ccac457..be99bc7d2106c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksRequest; @@ -19,6 +20,7 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -46,6 +48,7 @@ public static class Request extends BaseTasksRequest { private final String id; private final boolean deferValidation; private TransformConfig config; + private AuthorizationState authState; public Request(TransformConfigUpdate update, String id, boolean deferValidation, TimeValue timeout) { this.update = update; @@ -62,6 +65,11 @@ public Request(StreamInput in) throws IOException { if (in.readBoolean()) { this.config = new TransformConfig(in); } + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + if (in.readBoolean()) { + this.authState = new AuthorizationState(in); + } + } } public static Request fromXContent( @@ -124,6 +132,14 @@ public void setConfig(TransformConfig config) { this.config = config; } + public AuthorizationState getAuthState() { + return authState; + } + + public void setAuthState(AuthorizationState authState) { + this.authState = authState; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -136,12 +152,20 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); config.writeTo(out); } + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + if (authState == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + authState.writeTo(out); + } + } } @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(getTimeout(), update, id, deferValidation, config); + return Objects.hash(getTimeout(), update, id, deferValidation, config, authState); } @Override @@ -159,6 +183,7 @@ public boolean equals(Object obj) { && this.deferValidation == other.deferValidation && this.id.equals(other.id) && Objects.equals(config, other.config) + && Objects.equals(authState, other.authState) && getTimeout().equals(other.getTimeout()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java new file mode 100644 index 0000000000000..5ca7165199d89 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationState.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.transforms; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.Instant; +import java.util.Locale; +import java.util.Objects; + +/** + * {@link AuthorizationState} holds the state of the authorization performed in the past. + * By examining the instance of this class the caller can learn whether or not the user was authorized to access the source/dest indices + * present in the {@link TransformConfig}. + * + * This class is immutable. + */ +public class AuthorizationState implements Writeable, ToXContentObject { + + public static AuthorizationState green() { + return new AuthorizationState(System.currentTimeMillis(), HealthStatus.GREEN, null); + } + + public static AuthorizationState red(Exception e) { + return new AuthorizationState(System.currentTimeMillis(), HealthStatus.RED, e != null ? e.getMessage() : "unknown exception"); + } + + public static final ParseField TIMESTAMP = new ParseField("timestamp"); + public static final ParseField STATUS = new ParseField("status"); + public static final ParseField LAST_AUTH_ERROR = new ParseField("last_auth_error"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "transform_authorization_state", + true, + a -> new AuthorizationState((Long) a[0], (HealthStatus) a[1], (String) a[2]) + ); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP); + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return HealthStatus.valueOf(p.text().toUpperCase(Locale.ROOT)); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, STATUS, ObjectParser.ValueType.STRING); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LAST_AUTH_ERROR); + } + + private final long timestampMillis; + private final HealthStatus status; + @Nullable + private final String lastAuthError; + + public AuthorizationState(Long timestamp, HealthStatus status, @Nullable String lastAuthError) { + this.timestampMillis = timestamp; + this.status = status; + this.lastAuthError = lastAuthError; + } + + public AuthorizationState(StreamInput in) throws IOException { + this.timestampMillis = in.readLong(); + this.status = in.readEnum(HealthStatus.class); + this.lastAuthError = in.readOptionalString(); + } + + public Instant getTimestamp() { + return Instant.ofEpochMilli(timestampMillis); + } + + public HealthStatus getStatus() { + return status; + } + + public String getLastAuthError() { + return lastAuthError; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP.getPreferredName(), timestampMillis); + builder.field(STATUS.getPreferredName(), status.xContentValue()); + if (lastAuthError != null) { + builder.field(LAST_AUTH_ERROR.getPreferredName(), lastAuthError); + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(timestampMillis); + status.writeTo(out); + out.writeOptionalString(lastAuthError); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + AuthorizationState that = (AuthorizationState) other; + + return this.timestampMillis == that.timestampMillis + && this.status.value() == that.status.value() + && Objects.equals(this.lastAuthError, that.lastAuthError); + } + + @Override + public int hashCode() { + return Objects.hash(timestampMillis, status, lastAuthError); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index e6e73fec5893c..b122e96339e9e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -263,7 +263,7 @@ public static TransformCheckpoint fromXContent(final XContentParser parser, bool public static String documentId(String transformId, long checkpoint) { if (checkpoint < 0) { - throw new IllegalArgumentException("checkpoint must be a positive number"); + throw new IllegalArgumentException("checkpoint must be a non-negative number"); } return NAME + "-" + transformId + "-" + checkpoint; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java index dfe0fabf00f4e..25e25dfe80e0f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java @@ -34,7 +34,7 @@ public class TransformConfigUpdate implements Writeable { public static final String NAME = "data_frame_transform_config_update"; - public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + public static final TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null, null); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, @@ -235,6 +235,10 @@ public boolean changesSettings(TransformConfig config) { return isNullOrEqual(settings, config.getSettings()) == false; } + public boolean changesHeaders(TransformConfig config) { + return isNullOrEqual(headers, config.getHeaders()) == false; + } + private boolean isNullOrEqual(Object lft, Object rgt) { return lft == null || lft.equals(rgt); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java index 50f4ba56aaabc..f09291d5a9a7d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java @@ -45,6 +45,8 @@ public class TransformState implements Task.Status, PersistentTaskState { private NodeAttributes node; private final boolean shouldStopAtNextCheckpoint; + @Nullable + private final AuthorizationState authState; public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -57,6 +59,7 @@ public class TransformState implements Task.Status, PersistentTaskState { public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint"); + public static final ParseField AUTH_STATE = new ParseField("auth_state"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, args -> { @@ -75,6 +78,7 @@ public class TransformState implements Task.Status, PersistentTaskState { TransformProgress progress = (TransformProgress) args[6]; NodeAttributes node = (NodeAttributes) args[7]; boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean) args[8]; + AuthorizationState authState = (AuthorizationState) args[9]; return new TransformState( taskState, @@ -84,7 +88,8 @@ public class TransformState implements Task.Status, PersistentTaskState { reason, progress, node, - shouldStopAtNextCheckpoint + shouldStopAtNextCheckpoint, + authState ); }); @@ -98,6 +103,7 @@ public class TransformState implements Task.Status, PersistentTaskState { PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT); + PARSER.declareField(optionalConstructorArg(), AuthorizationState.PARSER::apply, AUTH_STATE, ValueType.OBJECT); } public TransformState( @@ -108,7 +114,8 @@ public TransformState( @Nullable String reason, @Nullable TransformProgress progress, @Nullable NodeAttributes node, - boolean shouldStopAtNextCheckpoint + boolean shouldStopAtNextCheckpoint, + @Nullable AuthorizationState authState ) { this.taskState = taskState; this.indexerState = indexerState; @@ -118,29 +125,7 @@ public TransformState( this.progress = progress; this.node = node; this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint; - } - - public TransformState( - TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress, - @Nullable NodeAttributes node - ) { - this(taskState, indexerState, position, checkpoint, reason, progress, node, false); - } - - public TransformState( - TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress - ) { - this(taskState, indexerState, position, checkpoint, reason, progress, null); + this.authState = authState; } public TransformState(StreamInput in) throws IOException { @@ -165,6 +150,11 @@ public TransformState(StreamInput in) throws IOException { } else { shouldStopAtNextCheckpoint = false; } + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + authState = in.readOptionalWriteable(AuthorizationState::new); + } else { + authState = null; + } } public TransformTaskState getTaskState() { @@ -204,6 +194,10 @@ public boolean shouldStopAtNextCheckpoint() { return shouldStopAtNextCheckpoint; } + public AuthorizationState getAuthState() { + return authState; + } + public static TransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -231,6 +225,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NODE.getPreferredName(), node); } builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint); + if (authState != null) { + builder.field(AUTH_STATE.getPreferredName(), authState); + } builder.endObject(); return builder; } @@ -258,6 +255,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersion.V_7_6_0)) { out.writeBoolean(shouldStopAtNextCheckpoint); } + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + out.writeOptionalWriteable(authState); + } } @Override @@ -279,12 +279,13 @@ public boolean equals(Object other) { && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) - && Objects.equals(this.node, that.node); + && Objects.equals(this.node, that.node) + && Objects.equals(this.authState, that.authState); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint, authState); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index d79c9219ead5e..61e5353155fb3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -37,6 +37,7 @@ public class TransformStats implements Writeable, ToXContentObject { public static final ParseField REASON_FIELD = new ParseField("reason"); public static final ParseField NODE_FIELD = new ParseField("node"); public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing"); + public static final ParseField AUTH_STATE = new ParseField("auth_state"); private final String id; private final State state; @@ -47,20 +48,19 @@ public class TransformStats implements Writeable, ToXContentObject { private final TransformIndexerStats indexerStats; private final TransformCheckpointingInfo checkpointingInfo; private final TransformHealth health; + @Nullable + private final AuthorizationState authState; public static TransformStats initialStats(String id) { - return stoppedStats(id, new TransformIndexerStats()); - } - - public static TransformStats stoppedStats(String id, TransformIndexerStats indexerTransformStats) { return new TransformStats( id, State.STOPPED, null, null, - indexerTransformStats, + new TransformIndexerStats(), TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null ); } @@ -71,7 +71,8 @@ public TransformStats( @Nullable NodeAttributes node, TransformIndexerStats stats, TransformCheckpointingInfo checkpointingInfo, - TransformHealth health + TransformHealth health, + @Nullable AuthorizationState authState ) { this.id = Objects.requireNonNull(id); this.state = Objects.requireNonNull(state); @@ -80,6 +81,7 @@ public TransformStats( this.indexerStats = Objects.requireNonNull(stats); this.checkpointingInfo = Objects.requireNonNull(checkpointingInfo); this.health = Objects.requireNonNull(health); + this.authState = authState; } public TransformStats(StreamInput in) throws IOException { @@ -103,6 +105,12 @@ public TransformStats(StreamInput in) throws IOException { } else { this.health = null; } + + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + this.authState = in.readOptionalWriteable(AuthorizationState::new); + } else { + this.authState = null; + } } @Override @@ -121,6 +129,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (health != null) { builder.field(HEALTH_FIELD.getPreferredName(), health); } + if (authState != null) { + builder.field(AUTH_STATE.getPreferredName(), authState); + } builder.endObject(); return builder; } @@ -146,11 +157,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + out.writeOptionalWriteable(authState); + } } @Override public int hashCode() { - return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo, health); + return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo, health, authState); } @Override @@ -171,7 +185,8 @@ public boolean equals(Object other) { && Objects.equals(this.node, that.node) && Objects.equals(this.indexerStats, that.indexerStats) && Objects.equals(this.checkpointingInfo, that.checkpointingInfo) - && Objects.equals(this.health, that.health); + && Objects.equals(this.health, that.health) + && Objects.equals(this.authState, that.authState); } public String getId() { @@ -204,6 +219,11 @@ public TransformCheckpointingInfo getCheckpointingInfo() { return checkpointingInfo; } + @Nullable + public AuthorizationState getAuthState() { + return authState; + } + @Override public String toString() { return Strings.toString(this); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java index 555383a9fdc0b..bcfe2b1728cbf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationStateTests; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -33,6 +34,9 @@ protected Request createTestInstance() { if (randomBoolean()) { request.setConfig(TransformConfigTests.randomTransformConfig()); } + if (randomBoolean()) { + request.setAuthState(AuthorizationStateTests.randomAuthorizationState()); + } return request; } @@ -70,5 +74,4 @@ protected Request mutateInstance(Request instance) { return new Request(update, id, deferValidation, timeout); } - } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java new file mode 100644 index 0000000000000..cc07ce49c1103 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/AuthorizationStateTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.transform.transforms; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class AuthorizationStateTests extends AbstractSerializingTransformTestCase { + + public static AuthorizationState randomAuthorizationState() { + return new AuthorizationState( + randomNonNegativeLong(), + randomFrom(HealthStatus.values()), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100) + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return AuthorizationState::new; + } + + @Override + protected AuthorizationState createTestInstance() { + return randomAuthorizationState(); + } + + @Override + protected AuthorizationState mutateInstance(AuthorizationState instance) { + int statusCount = HealthStatus.values().length; + assert statusCount > 1; + return new AuthorizationState( + instance.getTimestamp().toEpochMilli() + 1, + HealthStatus.values()[(instance.getStatus().ordinal() + 1) % statusCount], + instance.getLastAuthError() == null ? randomAlphaOfLengthBetween(1, 100) : null + ); + } + + @Override + protected AuthorizationState doParseInstance(XContentParser parser) throws IOException { + return AuthorizationState.PARSER.apply(parser, null); + } + + public void testGreen() { + AuthorizationState authState = AuthorizationState.green(); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.GREEN))); + assertThat(authState.getLastAuthError(), is(nullValue())); + } + + public void testRed() { + Exception e = new Exception("some exception"); + AuthorizationState authState = AuthorizationState.red(e); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(authState.getLastAuthError(), is(equalTo("some exception"))); + + authState = AuthorizationState.red(null); + assertThat(authState.getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(authState.getLastAuthError(), is(equalTo("unknown exception"))); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java index 5e8e0d0cdb561..390211a43daf1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java @@ -25,6 +25,7 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; public class TransformCheckpointTests extends AbstractSerializingTransformTestCase { @@ -283,6 +284,14 @@ public void testGetChangedIndices() { ); } + public void testDocumentId() { + assertThat(TransformCheckpoint.documentId("my-transform", 0), is(equalTo("data_frame_transform_checkpoint-my-transform-0"))); + assertThat(TransformCheckpoint.documentId("my-transform", 1), is(equalTo("data_frame_transform_checkpoint-my-transform-1"))); + assertThat(TransformCheckpoint.documentId("my-transform", 2), is(equalTo("data_frame_transform_checkpoint-my-transform-2"))); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> TransformCheckpoint.documentId("my-transform", -1)); + assertThat(e.getMessage(), is(equalTo("checkpoint must be a non-negative number"))); + } + private static Map randomCheckpointsByIndex() { Map checkpointsByIndex = new TreeMap<>(); int indices = randomIntBetween(1, 10); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index c0858574be981..14c28e6407f9e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -22,6 +22,7 @@ import java.time.Instant; import java.util.Collections; import java.util.Map; +import java.util.Optional; import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -67,8 +68,9 @@ protected Reader instanceReader() { public void testIsNoop() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { TransformConfig config = randomTransformConfig(); - TransformConfigUpdate update = TransformConfigUpdate.EMPTY; - assertTrue("null update is not noop", update.isNoop(config)); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertTrue("null update should be no-op", update.isNoop(config)); + update = new TransformConfigUpdate( config.getSource(), config.getDestination(), @@ -79,7 +81,7 @@ public void testIsNoop() { config.getMetadata(), config.getRetentionPolicyConfig() ); - assertTrue("equal update is not noop", update.isNoop(config)); + assertTrue("equal update should be no-op", update.isNoop(config)); update = new TransformConfigUpdate( config.getSource(), @@ -91,10 +93,38 @@ public void testIsNoop() { config.getMetadata(), config.getRetentionPolicyConfig() ); - assertFalse("true update is noop", update.isNoop(config)); + assertFalse("true update should not be no-op", update.isNoop(config)); } } + public void testChangesSettings() { + TransformConfig config = randomTransformConfig(); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertFalse("null update does not change settings", update.changesSettings(config)); + + update = new TransformConfigUpdate(null, null, null, null, null, config.getSettings(), null, null); + assertFalse("equal update does not change settings", update.changesSettings(config)); + + SettingsConfig newSettings = new SettingsConfig.Builder(config.getSettings()).setMaxPageSearchSize( + Optional.ofNullable(config.getSettings().getMaxPageSearchSize()).orElse(0) + 1 + ).build(); + update = new TransformConfigUpdate(null, null, null, null, null, newSettings, null, null); + assertTrue("true update changes settings", update.changesSettings(config)); + } + + public void testChangesHeaders() { + TransformConfig config = randomTransformConfig(); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertFalse("null update does not change headers", update.changesHeaders(config)); + + update.setHeaders(config.getHeaders()); + assertFalse("equal update does not change headers", update.changesHeaders(config)); + + Map newHeaders = Map.of("new-key", "new-value"); + update.setHeaders(newHeaders); + assertTrue("true update changes headers", update.changesHeaders(config)); + } + public void testApply() { TransformConfig config = new TransformConfig( "time-transform", diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java index 159109a8e11df..b8a71359c2def 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java @@ -32,7 +32,8 @@ public static TransformState randomTransformState() { randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomTransformProgress(), randomBoolean() ? null : randomNodeAttributes(), - randomBoolean() + randomBoolean(), + randomBoolean() ? null : AuthorizationStateTests.randomAuthorizationState() ); } @@ -75,15 +76,28 @@ public void testBackwardsSerialization() throws IOException { randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomTransformProgress(), randomBoolean() ? null : randomNodeAttributes(), - false - ); // Will be false after BWC deserialization + false, + randomBoolean() ? null : AuthorizationStateTests.randomAuthorizationState() + ); + // auth_state will be null after BWC deserialization + TransformState expectedState = new TransformState( + state.getTaskState(), + state.getIndexerState(), + state.getPosition(), + state.getCheckpoint(), + state.getReason(), + state.getProgress(), + state.getNode(), + state.shouldStopAtNextCheckpoint(), + null + ); try (BytesStreamOutput output = new BytesStreamOutput()) { output.setTransportVersion(TransportVersion.V_7_5_0); state.writeTo(output); try (StreamInput in = output.bytes().streamInput()) { in.setTransportVersion(TransportVersion.V_7_5_0); TransformState streamedState = new TransformState(in); - assertEquals(state, streamedState); + assertEquals(expectedState, streamedState); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index c684a566a39a8..5e02c9539f182 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -20,7 +20,8 @@ public static TransformStats randomTransformStats() { randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(), TransformIndexerStatsTests.randomStats(), TransformCheckpointingInfoTests.randomTransformCheckpointingInfo(), - TransformHealthTests.randomTransformHealth() + TransformHealthTests.randomTransformHealth(), + randomBoolean() ? null : AuthorizationStateTests.randomAuthorizationState() ); } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index 4c118f1b6dbb6..f61789dfb6da0 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -24,8 +24,8 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; import org.junit.After; +import java.io.IOException; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -49,28 +49,35 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase { private static final int NUM_USERS = 28; + // Transform Health statuses + private static final String GREEN = "green"; + private static final String YELLOW = "yellow"; + private static final String RED = "red"; + @After public void cleanTransforms() throws Exception { cleanUp(); } /** - * defer_validation = false - * unattended = false + * defer_validation = false + * unattended = false + * pre-existing dest index = false */ - public void testTransformPermissionsNoDeferValidationNoUnattended() throws Exception { - testTransformPermissionsNoDeferValidation(false); + public void testTransformPermissionsNoDeferNoUnattended() throws Exception { + testTransformPermissionsNoDefer(false); } /** - * defer_validation = false - * unattended = true + * defer_validation = false + * unattended = true + * pre-existing dest index = false */ - public void testTransformPermissionsNoDeferValidationUnattended() throws Exception { - testTransformPermissionsNoDeferValidation(true); + public void testTransformPermissionsNoDeferUnattended() throws Exception { + testTransformPermissionsNoDefer(true); } - private void testTransformPermissionsNoDeferValidation(boolean unattended) throws Exception { + private void testTransformPermissionsNoDefer(boolean unattended) throws Exception { String transformId = "transform-permissions-nodefer-" + (unattended ? 1 : 0); String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -93,8 +100,7 @@ private void testTransformPermissionsNoDeferValidation(boolean unattended) throw assertThat( e.getMessage(), containsString( - String.format( - Locale.ROOT, + Strings.format( "Cannot create transform [%s] because user %s lacks the required permissions " + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", transformId, @@ -113,14 +119,17 @@ private void testTransformPermissionsNoDeferValidation(boolean unattended) throw .addParameter("defer_validation", String.valueOf(false)) .build() ); + + assertHealthAndAuthState(transformId, GREEN, GREEN, null); } /** - * defer_validation = true - * unattended = false + * defer_validation = true + * unattended = false + * pre-existing dest index = false */ @SuppressWarnings("unchecked") - public void testTransformPermissionsDeferValidationNoUnattended() throws Exception { + public void testTransformPermissionsDeferNoUnattendedNoDest() throws Exception { String transformId = "transform-permissions-defer-nounattended"; String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -132,44 +141,104 @@ public void testTransformPermissionsDeferValidationNoUnattended() throws Excepti Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() ); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + String expectedErrorMessage = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); ResponseException e = expectThrows( ResponseException.class, () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build()) ); - assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(500))); - assertThat( - e.getMessage(), - containsString( - String.format(Locale.ROOT, "Could not create destination index [%s] for transform [%s]", destIndexName, transformId) - ) - ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(expectedErrorMessage)); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); e = expectThrows( ResponseException.class, () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()) ); - assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(500))); - assertThat( - e.getMessage(), - containsString( - String.format(Locale.ROOT, "Could not create destination index [%s] for transform [%s]", destIndexName, transformId) - ) + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(expectedErrorMessage)); + + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); + + // update transform's credentials so that the transform has permission to access source/dest indices + updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + + assertHealthAndAuthState(transformId, GREEN, GREEN, null); + + // _start API now works + startTransform(config.getId(), RequestOptions.DEFAULT); + waitUntilCheckpoint(transformId, 1); + + assertHealthAndAuthState(transformId, GREEN, GREEN, null); + } + + /** + * defer_validation = true + * unattended = false + * pre-existing dest index = true + */ + @SuppressWarnings("unchecked") + public void testTransformPermissionsDeferNoUnattendedDest() throws Exception { + String transformId = "transform-permissions-defer-nounattended-dest-exists"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + createIndex(adminClient(), destIndexName, Settings.EMPTY); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false); + putTransform( + transformId, + Strings.toString(config), + RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() ); + String expectedErrorMessage = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + ResponseException e = expectThrows( + ResponseException.class, + () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build()) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(expectedErrorMessage)); + + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); + + e = expectThrows( + ResponseException.class, + () -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()) + ); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403))); + assertThat(e.getMessage(), containsString(expectedErrorMessage)); + + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + assertHealthAndAuthState(transformId, GREEN, GREEN, null); + // _start API now works startTransform(config.getId(), RequestOptions.DEFAULT); waitUntilCheckpoint(transformId, 1); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertHealthAndAuthState(transformId, GREEN, GREEN, null); } /** @@ -207,11 +276,12 @@ public void testNoTransformAdminRoleInSecondaryAuth() throws Exception { } /** - * defer_validation = true - * unattended = true + * defer_validation = true + * unattended = true + * pre-existing dest index = false */ @SuppressWarnings("unchecked") - public void testTransformPermissionsDeferValidationUnattended() throws Exception { + public void testTransformPermissionsDeferUnattendedNoDest() throws Exception { String transformId = "transform-permissions-defer-unattended"; String sourceIndexName = transformId + "-index"; String destIndexName = sourceIndexName + "-dest"; @@ -223,19 +293,27 @@ public void testTransformPermissionsDeferValidationUnattended() throws Exception Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() ); - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + String expectedErrorMessage = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); startTransform(config.getId(), RequestOptions.DEFAULT); // transform is yellow assertBusy(() -> { Map stats = getTransformStats(transformId); - assertThat(extractValue(stats, "health", "status"), is(equalTo("yellow"))); + assertThat(extractValue(stats, "health", "status"), is(equalTo(YELLOW))); List issues = (List) extractValue(stats, "health", "issues"); assertThat(issues, hasSize(1)); assertThat( (String) extractValue((Map) issues.get(0), "details"), - containsString(String.format(Locale.ROOT, "no such index [%s]", destIndexName)) + containsString(Strings.format("no such index [%s]", destIndexName)) ); }, 10, TimeUnit.SECONDS); @@ -244,7 +322,50 @@ public void testTransformPermissionsDeferValidationUnattended() throws Exception waitUntilCheckpoint(transformId, 1); // transform is green again - assertThat(extractValue(getTransformStats(transformId), "health", "status"), is(equalTo("green"))); + assertHealthAndAuthState(transformId, GREEN, GREEN, null); + } + + /** + * defer_validation = true + * unattended = true + * pre-existing dest index = true + */ + @SuppressWarnings("unchecked") + public void testTransformPermissionsDeferUnattendedDest() throws Exception { + String transformId = "transform-permissions-defer-unattended-dest-exists"; + String sourceIndexName = transformId + "-index"; + String destIndexName = sourceIndexName + "-dest"; + createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + + createIndex(adminClient(), destIndexName, Settings.EMPTY); + + TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, true); + putTransform( + transformId, + Strings.toString(config), + RequestOptions.DEFAULT.toBuilder().addParameter("defer_validation", String.valueOf(true)).build() + ); + String expectedErrorMessage = Strings.format( + "Cannot create transform [%s] because user %s lacks the required permissions " + + "[%s:[read, view_index_metadata], %s:[index, read]]", + transformId, + JUNIOR_USERNAME, + sourceIndexName, + destIndexName + ); + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); + + startTransform(config.getId(), RequestOptions.DEFAULT); + + // transform's auth state status is still RED, but the health status is GREEN (because dest index exists) + assertHealthAndAuthState(transformId, GREEN, RED, expectedErrorMessage); + + // update transform's credentials so that the transform has permission to access source/dest indices + updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build()); + waitUntilCheckpoint(transformId, 1); + + // transform is green again + assertHealthAndAuthState(transformId, GREEN, GREEN, null); } public void testPreviewRequestFailsPermissionsCheck() throws Exception { @@ -263,8 +384,7 @@ public void testPreviewRequestFailsPermissionsCheck() throws Exception { assertThat( e.getMessage(), containsString( - String.format( - Locale.ROOT, + Strings.format( "Cannot preview transform [%s] because user %s lacks the required permissions " + "[%s:[read, view_index_metadata], %s:[create_index, index, read]]", transformId, @@ -311,4 +431,16 @@ private TransformConfig createConfig(String transformId, String sourceIndexName, return config; } + + private void assertHealthAndAuthState( + String transformId, + String expectedHealthStatus, + String expectedAuthStatus, + String expectedLastAuthError + ) throws IOException { + Map stats = getTransformStats(transformId); + assertThat("Stats were: " + stats, extractValue(stats, "health", "status"), is(equalTo(expectedHealthStatus))); + assertThat("Stats were: " + stats, extractValue(stats, "auth_state", "status"), is(equalTo(expectedAuthStatus))); + assertThat("Stats were: " + stats, extractValue(stats, "auth_state", "last_auth_error"), is(equalTo(expectedLastAuthError))); + } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java index 317fad1f99aa3..5e7522636514e 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java @@ -77,10 +77,10 @@ public void testUsage() throws Exception { + "be prevented by default" ) ); - // Verify that we have one stat document + // Verify that we have 4 stat documents, one per transform assertBusy(() -> { Map hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest)); - assertEquals(1, XContentMapValues.extractValue("hits.total.value", hasStatsMap)); + assertEquals(4, XContentMapValues.extractValue("hits.total.value", hasStatsMap)); }); startAndWaitForContinuousTransform("test_usage_continuous", "pivot_reviews_continuous", null); diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java new file mode 100644 index 0000000000000..c6c98c2708d86 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtilsTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.persistence; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.TestIndexNameExpressionResolver; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class AuthorizationStatePersistenceUtilsTests extends TransformSingleNodeTestCase { + + private ClusterService clusterService; + private IndexBasedTransformConfigManager transformConfigManager; + + @Before + public void createComponents() { + clusterService = mock(ClusterService.class); + transformConfigManager = new IndexBasedTransformConfigManager( + clusterService, + TestIndexNameExpressionResolver.newInstance(), + client(), + xContentRegistry() + ); + } + + public void testAuthState() throws InterruptedException { + String transformId = "transform_test_auth_state_create_read_update"; + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + (AuthorizationState) null + ); + + AuthorizationState greenAuthState = AuthorizationState.green(); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + greenAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + greenAuthState + ); + + AuthorizationState redAuthState = AuthorizationState.red(new ElasticsearchSecurityException("missing privileges")); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + redAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + redAuthState + ); + + AuthorizationState otherRedAuthState = AuthorizationState.red(new ElasticsearchSecurityException("other missing privileges")); + assertAsync( + listener -> AuthorizationStatePersistenceUtils.persistAuthState( + Settings.EMPTY, + transformConfigManager, + transformId, + otherRedAuthState, + listener + ), + (Void) null + ); + + assertAsync( + listener -> AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, transformId, listener), + otherRedAuthState + ); + } + + private void assertAsync(Consumer> function, T expected) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> assertThat(r, is(equalTo(expected))), e -> fail("got unexpected exception: " + e.getMessage())), + latch + ); + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java index 453877e0ed319..b401068087b0d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java @@ -13,7 +13,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.RemoteClusterLicenseChecker; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -40,6 +42,7 @@ final class TransformPrivilegeChecker { static void checkPrivileges( String operationName, + Settings settings, SecurityContext securityContext, IndexNameExpressionResolver indexNameExpressionResolver, ClusterState clusterState, @@ -48,6 +51,8 @@ static void checkPrivileges( boolean checkDestIndexPrivileges, ActionListener listener ) { + assert XPackSettings.SECURITY_ENABLED.get(settings); + useSecondaryAuthIfAvailable(securityContext, () -> { final String username = securityContext.getUser().principal(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java index a0d06066afb35..2a087486ac6dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; @@ -24,6 +25,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; @@ -57,24 +59,35 @@ public enum Status { } // the new config after the update + @Nullable private final TransformConfig config; + // the auth state to persist after the update + @Nullable + private final AuthorizationState authState; + // the action taken for the upgrade private final Status status; - UpdateResult(final TransformConfig config, final Status status) { + UpdateResult(final TransformConfig config, final AuthorizationState authState, final Status status) { this.config = config; + this.authState = authState; this.status = status; } - public Status getStatus() { - return status; - } - @Nullable public TransformConfig getConfig() { return config; } + + @Nullable + public AuthorizationState getAuthState() { + return authState; + } + + public Status getStatus() { + return status; + } } /** @@ -115,14 +128,15 @@ public static void updateTransform( ActionListener listener ) { // rewrite config into a new format if necessary - TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config); - TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig; + final TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config); + final TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig; + final SetOnce authStateHolder = new SetOnce<>(); // <5> Update checkpoints ActionListener updateStateListener = ActionListener.wrap(lastCheckpoint -> { // config was updated, but the transform has no state or checkpoint if (lastCheckpoint == null || lastCheckpoint == -1) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.UPDATED)); return; } @@ -131,7 +145,7 @@ public static void updateTransform( lastCheckpoint, transformConfigManager, ActionListener.wrap( - r -> listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)), + r -> listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.UPDATED)), listener::onFailure ) ); @@ -153,12 +167,12 @@ public static void updateTransform( if (config.getVersion() != null && config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED) && updatedConfig.equals(config)) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NONE)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.NONE)); return; } if (dryRun) { - listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NEEDS_UPDATE)); + listener.onResponse(new UpdateResult(updatedConfig, authStateHolder.get(), UpdateResult.Status.NEEDS_UPDATE)); return; } @@ -175,22 +189,29 @@ public static void updateTransform( }, listener::onFailure); // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap( - aVoid -> validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener), - listener::onFailure - ); + ActionListener checkPrivilegesListener = ActionListener.wrap(authState -> { + authStateHolder.set(authState); + validateTransform(updatedConfig, client, deferValidation, timeout, validateTransformListener); + }, listener::onFailure); // <1> Early check to verify that the user can create the destination index and can read from the source - if (checkAccess && XPackSettings.SECURITY_ENABLED.get(settings) && deferValidation == false) { + if (checkAccess && XPackSettings.SECURITY_ENABLED.get(settings)) { TransformPrivilegeChecker.checkPrivileges( "update", + settings, securityContext, indexNameExpressionResolver, clusterState, client, updatedConfig, true, - checkPrivilegesListener + ActionListener.wrap(aVoid -> checkPrivilegesListener.onResponse(AuthorizationState.green()), e -> { + if (deferValidation) { + checkPrivilegesListener.onResponse(AuthorizationState.red(e)); + } else { + checkPrivilegesListener.onFailure(e); + } + }) ); } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index cfa532735d32a..d22baa4c4c9b7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -247,7 +247,8 @@ static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpo null, task.getStats(), checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo, - TransformHealthChecker.checkTransform(task) + TransformHealthChecker.checkTransform(task), + transformState.getAuthState() ); } @@ -359,7 +360,8 @@ private void addCheckpointingInfoForTransformsWithoutTasks( null, stat.getTransformStats(), checkpointingInfo, - TransformHealthChecker.checkUnassignedTransform(stat.getId(), clusterState) + TransformHealthChecker.checkUnassignedTransform(stat.getId(), clusterState), + stat.getTransformState().getAuthState() ) ); } else { @@ -371,7 +373,8 @@ private void addCheckpointingInfoForTransformsWithoutTasks( null, stat.getTransformStats(), checkpointingInfo, - TransformHealth.GREEN + TransformHealth.GREEN, + stat.getTransformState().getAuthState() ) ); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index 47d768e01daf0..edf9fefb8416a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -174,6 +174,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (XPackSettings.SECURITY_ENABLED.get(nodeSettings)) { TransformPrivilegeChecker.checkPrivileges( "preview", + nodeSettings, securityContext, indexNameExpressionResolver, clusterState, @@ -184,7 +185,7 @@ protected void doExecute(Task task, Request request, ActionListener li DUMMY_DEST_INDEX_FOR_PREVIEW.equals(config.getDestination().getIndex()) == false, checkPrivilegesListener ); - } else { // No security enabled, just create the transform + } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 780aac0c1ee5b..14ccfcf0111d3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -35,9 +35,11 @@ import org.elasticsearch.xpack.core.transform.action.PutTransformAction; import org.elasticsearch.xpack.core.transform.action.PutTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; @@ -123,16 +125,38 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS ); // <1> Early check to verify that the user can create the destination index and can read from the source - if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { + if (XPackSettings.SECURITY_ENABLED.get(settings)) { TransformPrivilegeChecker.checkPrivileges( "create", + settings, securityContext, indexNameExpressionResolver, clusterState, client, config, true, - checkPrivilegesListener + ActionListener.wrap( + aVoid -> AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + transformId, + AuthorizationState.green(), + checkPrivilegesListener + ), + e -> { + if (request.isDeferValidation()) { + AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + transformId, + AuthorizationState.red(e), + checkPrivilegesListener + ); + } else { + checkPrivilegesListener.onFailure(e); + } + } + ) ); } else { // No security enabled, just move on checkPrivilegesListener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index abe8450b3a374..b68fdcd4a5fe0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -24,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; @@ -34,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -41,6 +45,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; import org.elasticsearch.xpack.transform.transforms.TransformNodes; @@ -48,7 +53,6 @@ import java.time.Clock; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -125,8 +129,8 @@ protected void masterOperation( ) { TransformNodes.warnIfNoTransformNodes(state); - final AtomicReference transformTaskParamsHolder = new AtomicReference<>(); - final AtomicReference transformConfigHolder = new AtomicReference<>(); + final SetOnce transformTaskParamsHolder = new SetOnce<>(); + final SetOnce transformConfigHolder = new SetOnce<>(); // <5> Wait for the allocated task's state to STARTED ActionListener> newPersistentTaskActionListener = ActionListener @@ -203,7 +207,16 @@ protected void masterOperation( ); // <2> run transform validations - ActionListener getTransformListener = ActionListener.wrap(config -> { + ActionListener fetchAuthStateListener = ActionListener.wrap(authState -> { + if (authState != null && HealthStatus.RED.equals(authState.getStatus())) { + // AuthorizationState status is RED which means there was permission check error during PUT or _update. + // Since this transform is *not* unattended (otherwise authState would be null), we fail immediately. + listener.onFailure(new ElasticsearchSecurityException(authState.getLastAuthError(), RestStatus.FORBIDDEN)); + return; + } + + TransformConfig config = transformConfigHolder.get(); + ActionRequestValidationException validationException = config.validate(null); if (request.from() != null && config.getSyncConfig() == null) { validationException = addValidationError( @@ -233,7 +246,6 @@ protected void masterOperation( config.getSource().requiresRemoteCluster() ) ); - transformConfigHolder.set(config); ClientHelper.executeAsyncWithOrigin( client, ClientHelper.TRANSFORM_ORIGIN, @@ -243,7 +255,20 @@ protected void masterOperation( ); }, listener::onFailure); - // <1> Get the config to verify it exists and is valid + // <1> Check if there is an auth error stored for this transform + ActionListener getTransformListener = ActionListener.wrap(config -> { + transformConfigHolder.set(config); + + if (Boolean.TRUE.equals(config.getSettings().getUnattended())) { + // We do not fail the _start request of the unattended transform due to permission issues, + // we just let it run + fetchAuthStateListener.onResponse(null); + } else { + AuthorizationStatePersistenceUtils.fetchAuthState(transformConfigManager, request.getId(), fetchAuthStateListener); + } + }, listener::onFailure); + + // <0> Get the config to verify it exists and is valid transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index ef410affcc4b1..3b2c35b4fe9bb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -34,12 +34,14 @@ import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; @@ -138,14 +140,18 @@ protected void doExecute(Task task, Request request, ActionListener li false, // dryRun true, // checkAccess request.getTimeout(), - ActionListener.wrap(updateResponse -> { - TransformConfig updatedConfig = updateResponse.getConfig(); + ActionListener.wrap(updateResult -> { + TransformConfig originalConfig = configAndVersion.v1(); + TransformConfig updatedConfig = updateResult.getConfig(); + AuthorizationState authState = updateResult.getAuthState(); auditor.info(updatedConfig.getId(), "Updated transform."); - logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResult.getStatus()); checkTransformConfigAndLogWarnings(updatedConfig); - if (update.changesSettings(configAndVersion.v1())) { + boolean updateChangesSettings = update.changesSettings(originalConfig); + boolean updateChangesHeaders = update.changesHeaders(originalConfig); + if (updateChangesSettings || updateChangesHeaders) { PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( request.getId(), clusterState @@ -187,11 +193,21 @@ protected void doExecute(Task task, Request request, ActionListener li request.setNodes(transformTask.getExecutorNode()); request.setConfig(updatedConfig); + request.setAuthState(authState); super.doExecute(task, request, taskUpdateListener); return; + } else if (updateChangesHeaders) { + AuthorizationStatePersistenceUtils.persistAuthState( + settings, + transformConfigManager, + updatedConfig.getId(), + authState, + ActionListener.wrap(aVoid -> listener.onResponse(new Response(updatedConfig)), listener::onFailure) + ); } + } else { + listener.onResponse(new Response(updatedConfig)); } - listener.onResponse(new Response(updatedConfig)); }, listener::onFailure) ), listener::onFailure @@ -211,8 +227,8 @@ private void checkTransformConfigAndLogWarnings(TransformConfig config) { @Override protected void taskOperation(Task actionTask, Request request, TransformTask transformTask, ActionListener listener) { - // apply the settings transformTask.applyNewSettings(request.getConfig().getSettings()); + transformTask.applyNewAuthState(request.getAuthState()); listener.onResponse(new Response(request.getConfig())); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java index e8a6a243a3f68..222d2f0f8c9f7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpgradeTransformsAction.java @@ -170,7 +170,7 @@ private void updateOneTransform(String id, boolean dryRun, TimeValue timeout, Ac }, failure -> { // ignore if transform got deleted while upgrade was running if (failure instanceof ResourceNotFoundException) { - listener.onResponse(new UpdateResult(null, UpdateResult.Status.DELETED)); + listener.onResponse(new UpdateResult(null, null, UpdateResult.Status.DELETED)); } else { listener.onFailure(failure); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java new file mode 100644 index 0000000000000..2b3bc52d32f87 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/AuthorizationStatePersistenceUtils.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.persistence; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; + +/** + * {@link AuthorizationStatePersistenceUtils} is a utility class built on top of {@link TransformConfigManager}. + * It allows fetching and persisting {@link AuthorizationState} objects. + */ +public final class AuthorizationStatePersistenceUtils { + + private static final Logger logger = LogManager.getLogger(AuthorizationStatePersistenceUtils.class); + + /** + * Fetches persisted authorization state object and returns it back to the caller. + */ + public static void fetchAuthState( + TransformConfigManager transformConfigManager, + String transformId, + ActionListener listener + ) { + ActionListener> getTransformStoredDocListener = ActionListener.wrap( + stateAndStatsAndSeqNoPrimaryTermAndIndex -> { + if (stateAndStatsAndSeqNoPrimaryTermAndIndex == null) { + listener.onResponse(null); + return; + } + TransformState transformState = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1().getTransformState(); + if (transformState == null) { + listener.onResponse(null); + return; + } + AuthorizationState authState = transformState.getAuthState(); + if (authState == null) { + listener.onResponse(null); + return; + } + listener.onResponse(authState); + }, + listener::onFailure + ); + + transformConfigManager.getTransformStoredDoc(transformId, true, getTransformStoredDocListener); + } + + /** + * Persists the authorization state object given as an argument. + */ + public static void persistAuthState( + Settings settings, + TransformConfigManager transformConfigManager, + String transformId, + AuthorizationState authState, + ActionListener listener + ) { + assert XPackSettings.SECURITY_ENABLED.get(settings); + + logger.trace("Started persisting auth state: {}", authState); + + ActionListener persistListener = ActionListener.wrap(unusedResponse -> { + logger.trace("Finished persisting auth state: {}", authState); + listener.onResponse(null); + }, e -> { + logger.trace("Failed persisting auth state: {}, exception = {}", authState, e); + listener.onFailure(e); + }); + + ActionListener> transformStatsActionListener = ActionListener.wrap( + stateAndStatsAndSeqNoPrimaryTermAndIndex -> { + // Stored doc does not exist yet, we need to create one + if (stateAndStatsAndSeqNoPrimaryTermAndIndex == null) { + TransformState state = new TransformState( + TransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0, + null, + null, + null, + false, + authState + ); + TransformIndexerStats stats = new TransformIndexerStats(); + transformConfigManager.putOrUpdateTransformStoredDoc( + new TransformStoredDoc(transformId, state, stats), + null, + persistListener + ); + return; + } + // Stored doc already exists, we just need to update its authState field + TransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1(); + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2(); + TransformState oldState = stateAndStats.getTransformState(); + TransformState newState = new TransformState( + oldState.getTaskState(), + oldState.getIndexerState(), + oldState.getPosition(), + oldState.getCheckpoint(), + oldState.getReason(), + oldState.getProgress(), + oldState.getNode(), + oldState.shouldStopAtNextCheckpoint(), + authState + ); + TransformIndexerStats stats = stateAndStats.getTransformStats(); + transformConfigManager.putOrUpdateTransformStoredDoc( + new TransformStoredDoc(transformId, newState, stats), + seqNoPrimaryTermAndIndex, + persistListener + ); + }, + error -> { + String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId); + logger.error(msg, error); + listener.onFailure(error); + } + ); + + transformConfigManager.getTransformStoredDoc(transformId, true, transformStatsActionListener); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 4e20ec4dcb615..9ef9823e10a07 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; @@ -41,6 +42,7 @@ public interface Listener { private volatile Instant changesLastDetectedAt; private volatile Instant lastSearchTime; private volatile boolean shouldStopAtCheckpoint = false; + private volatile AuthorizationState authState; private volatile int pageSize = 0; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -169,6 +171,14 @@ public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } + public AuthorizationState getAuthState() { + return authState; + } + + public void setAuthState(AuthorizationState authState) { + this.authState = authState; + } + int getPageSize() { return pageSize; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 7bfbc3f078dfb..95f5747935217 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -733,7 +733,8 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p context.getStateReason(), getProgress(), null, - shouldStopAtCheckpoint + shouldStopAtCheckpoint, + context.getAuthState() ); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); @@ -845,7 +846,8 @@ private boolean addSetStopAtCheckpointListener(boolean shouldStopAtCheckpoint, A context.getStateReason(), getProgress(), null, - newIndexerState == IndexerState.STARTED + newIndexerState == IndexerState.STARTED, + context.getAuthState() ); // because save state is async we need to block the call until state is persisted, so that the job can not diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 4f48176f24cca..9d8173ca562bc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformMetadata; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -214,8 +215,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa } final long lastCheckpoint = stateHolder.get().getCheckpoint(); + final AuthorizationState authState = stateHolder.get().getAuthState(); - startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); + startTask(buildTask, indexerBuilder, authState, lastCheckpoint, startTaskListener); }, error -> { // TODO: do not use the same error message as for loading the last checkpoint String msg = TransformMessages.getMessage(TransformMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); @@ -282,7 +284,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa markAsFailed(buildTask, msg); } else { logger.trace("[{}] No stats found (new transform), starting the task", transformId); - startTask(buildTask, indexerBuilder, null, startTaskListener); + startTask(buildTask, indexerBuilder, null, null, startTaskListener); } } ); @@ -377,12 +379,14 @@ private void markAsFailed(TransformTask task, String reason) { private void startTask( TransformTask buildTask, ClientTransformIndexerBuilder indexerBuilder, + AuthorizationState authState, Long previousCheckpoint, ActionListener listener ) { // switch the threadpool to generic, because the caller is on the system_read threadpool threadPool.generic().execute(() -> { buildTask.initializeIndexer(indexerBuilder); + buildTask.setAuthState(authState); // TransformTask#start will fail if the task state is FAILED buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); }); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 4ade3a31608af..8478525cb4a53 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; @@ -115,6 +116,9 @@ public TransformTask( this.initialPosition = initialPosition; this.context = new TransformContext(initialTaskState, initialReason, initialCheckpoint, transform.from(), this); + if (state != null) { + this.context.setAuthState(state.getAuthState()); + } } public ParentTaskAssigningClient getParentTaskClient() { @@ -147,7 +151,8 @@ public TransformState getState() { context.getStateReason(), null, null, - false + false, + context.getAuthState() ); } else { return new TransformState( @@ -158,7 +163,8 @@ public TransformState getState() { context.getStateReason(), getIndexer().getProgress(), null, - context.shouldStopAtCheckpoint() + context.shouldStopAtCheckpoint(), + context.getAuthState() ); } } @@ -210,7 +216,6 @@ public void getCheckpointingInfo( /** * Starts the transform and schedules it to be triggered in the future. * - * * @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint * @param listener The listener to alert once started */ @@ -265,7 +270,8 @@ void start(Long startingCheckpoint, ActionListener void ActionListener listener ) { if (request instanceof HasPrivilegesRequest) { - listener.onResponse((Response) new HasPrivilegesResponse()); + HasPrivilegesRequest hasPrivilegesRequest = (HasPrivilegesRequest) request; + switch (hasPrivilegesRequest.username()) { + case BOB: + // bob has all the privileges + listener.onResponse((Response) new HasPrivilegesResponse()); + break; + case JOHN: + // john does not have required privileges + listener.onFailure(new ElasticsearchSecurityException("missing privileges")); + break; + default: + fail("Unexpected username = " + hasPrivilegesRequest.username()); + } } else if (request instanceof ValidateTransformAction.Request) { listener.onResponse((Response) new ValidateTransformAction.Response(Collections.emptyMap())); } else { @@ -116,7 +134,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -134,6 +152,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); assertEquals(maxCompatibleConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener), config -> { @@ -149,7 +168,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -167,6 +186,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.NONE, updateResult.getStatus()); assertEquals(minCompatibleConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> { @@ -207,7 +227,8 @@ public void testTransformUpdateRewrite() throws InterruptedException { null, // reason null, // progress null, // node attributes - false // shouldStopAtNextCheckpoint + false,// shouldStopAtNextCheckpoint + null // auth state ), TransformIndexerStatsTests.randomStats() ); @@ -218,7 +239,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -236,6 +257,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { updateResult -> { assertEquals(UpdateResult.Status.UPDATED, updateResult.getStatus()); assertNotEquals(oldConfig, updateResult.getConfig()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> { @@ -283,7 +305,7 @@ public void testTransformUpdateDryRun() throws InterruptedException { TransformConfigUpdate update = TransformConfigUpdate.EMPTY; assertUpdate( listener -> TransformUpdater.updateTransform( - securityContext, + bobSecurityContext, indexNameExpressionResolver, ClusterState.EMPTY_STATE, settings, @@ -302,6 +324,7 @@ public void testTransformUpdateDryRun() throws InterruptedException { assertEquals(UpdateResult.Status.NEEDS_UPDATE, updateResult.getStatus()); assertNotEquals(oldConfigForDryRunUpdate, updateResult.getConfig()); assertEquals(Version.CURRENT, updateResult.getConfig().getVersion()); + assertNull(updateResult.getAuthState()); } ); assertConfiguration( @@ -313,6 +336,115 @@ public void testTransformUpdateDryRun() throws InterruptedException { ); } + public void testTransformUpdateCheckAccessSuccess() throws InterruptedException { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) + ) + ); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + bobSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + false, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + listener + ), + updateResult -> { + assertThat(updateResult.getStatus(), is(equalTo(UpdateResult.Status.UPDATED))); + assertThat(updateResult.getConfig(), is(not(equalTo(oldConfig)))); + assertThat(updateResult.getConfig().getVersion(), is(equalTo(Version.CURRENT))); + assertThat(updateResult.getAuthState(), is(notNullValue())); + assertThat(updateResult.getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + assertThat(updateResult.getAuthState().getLastAuthError(), is(nullValue())); + } + ); + } + + public void testTransformUpdateCheckAccessFailureDeferValidation() throws InterruptedException { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig( + randomAlphaOfLengthBetween(1, 10), + VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) + ) + ); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + assertUpdate( + listener -> TransformUpdater.updateTransform( + johnSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + true, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + listener + ), + updateResult -> { + assertThat(updateResult.getStatus(), is(equalTo(UpdateResult.Status.UPDATED))); + assertThat(updateResult.getConfig(), is(not(equalTo(oldConfig)))); + assertThat(updateResult.getConfig().getVersion(), is(equalTo(Version.CURRENT))); + assertThat(updateResult.getAuthState(), is(notNullValue())); + assertThat(updateResult.getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(updateResult.getAuthState().getLastAuthError(), is(equalTo("missing privileges"))); + } + ); + } + + public void testTransformUpdateCheckAccessFailureNoDeferValidation() { + InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager(); + + TransformConfig oldConfig = TransformConfigTests.randomTransformConfig(); + transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.noop()); + + TransformUpdater.updateTransform( + johnSecurityContext, + indexNameExpressionResolver, + ClusterState.EMPTY_STATE, + settings, + client, + transformConfigManager, + oldConfig, + TransformConfigUpdate.EMPTY, + null, // seqNoPrimaryTermAndIndex + false, + false, + true, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, + ActionListener.wrap( + r -> fail("Should fail due to missing privileges"), + e -> assertThat(e.getMessage(), is(equalTo("missing privileges"))) + ) + ); + } + private void assertUpdate(Consumer> function, Consumer furtherTests) throws InterruptedException { assertAsync(function, furtherTests); @@ -349,4 +481,13 @@ private void assertAsync(Consumer> function, Consumer f function.accept(listener); assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); } + + private static SecurityContext newSecurityContextFor(String username) { + return new SecurityContext(Settings.EMPTY, new ThreadContext(Settings.EMPTY)) { + @Override + public User getUser() { + return new User(username); + } + }; + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java index 6f990ca420811..569be566941fd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -6,9 +6,11 @@ */ package org.elasticsearch.xpack.transform.action; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.health.HealthStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformHealth; @@ -30,7 +32,14 @@ public class TransportGetTransformStatsActionTests extends ESTestCase { - private TransformTask task = mock(TransformTask.class); + private static final TransformCheckpointingInfo CHECKPOINTING_INFO = new TransformCheckpointingInfo( + new TransformCheckpointStats(1, null, null, 1, 1), + new TransformCheckpointStats(2, null, null, 2, 5), + 2, + Instant.now(), + Instant.now() + ); + private final TransformTask task = mock(TransformTask.class); public void testDeriveStatsStopped() { String transformId = "transform-with-stats"; @@ -44,17 +53,10 @@ public void testDeriveStatsStopped() { reason, null, null, - true + true, + null ); withIdStateAndStats(transformId, stoppedState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() - ); - assertThat( TransportGetTransformStatsAction.deriveStats(task, null), equalTo( @@ -65,17 +67,29 @@ public void testDeriveStatsStopped() { null, stats, TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPED, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN, + null + ) + ) ); reason = "foo"; - stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true); + stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, stoppedState, stats); assertThat( @@ -88,13 +102,73 @@ public void testDeriveStatsStopped() { null, stats, TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null + ) + ) + ); + assertThat( + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPED, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN, + null ) ) ); + } + + public void testDeriveStatsStoppedWithAuthStateGreen() { + testDeriveStatsStoppedWithAuthState(null, AuthorizationState.green(), TransformCheckpointingInfo.EMPTY); + testDeriveStatsStoppedWithAuthState(CHECKPOINTING_INFO, AuthorizationState.green(), CHECKPOINTING_INFO); + } + + public void testDeriveStatsStoppedWithAuthStateRed() { + AuthorizationState redAuthState = AuthorizationState.red(new ElasticsearchSecurityException("missing privileges")); + testDeriveStatsStoppedWithAuthState(null, redAuthState, TransformCheckpointingInfo.EMPTY); + testDeriveStatsStoppedWithAuthState(CHECKPOINTING_INFO, redAuthState, CHECKPOINTING_INFO); + } + + private void testDeriveStatsStoppedWithAuthState( + TransformCheckpointingInfo info, + AuthorizationState authState, + TransformCheckpointingInfo expectedInfo + ) { + String transformId = "transform-with-auth-state"; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState stoppedState = new TransformState( + TransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0, + null, + null, + null, + true, + authState + ); + withIdStateAndStats(transformId, stoppedState, stats); + assertThat( TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN)) + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPED, + null, + null, + stats, + expectedInfo, + TransformHealth.GREEN, + authState + ) + ) ); } @@ -107,16 +181,18 @@ public void testDeriveStatsFailed() { ); TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); - TransformState failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); - withIdStateAndStats(transformId, failedState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() + TransformState failedState = new TransformState( + TransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0, + reason, + null, + null, + true, + null ); - + withIdStateAndStats(transformId, failedState, stats); assertThat( TransportGetTransformStatsAction.deriveStats(task, null), equalTo( @@ -127,13 +203,16 @@ public void testDeriveStatsFailed() { null, stats, TransformCheckpointingInfo.EMPTY, - expectedHealth + expectedHealth, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, CHECKPOINTING_INFO, expectedHealth, null) + ) ); reason = "the task is failed"; @@ -141,7 +220,7 @@ public void testDeriveStatsFailed() { HealthStatus.RED, List.of(new TransformHealthIssue("Transform task state is [failed]", reason, 1, null)) ); - failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); + failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, failedState, stats); assertThat( @@ -154,13 +233,16 @@ public void testDeriveStatsFailed() { null, stats, TransformCheckpointingInfo.EMPTY, - expectedHealth + expectedHealth, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, CHECKPOINTING_INFO, expectedHealth, null) + ) ); } @@ -176,16 +258,10 @@ public void testDeriveStats() { reason, null, null, - true + true, + null ); withIdStateAndStats(transformId, runningState, stats); - TransformCheckpointingInfo info = new TransformCheckpointingInfo( - new TransformCheckpointStats(1, null, null, 1, 1), - new TransformCheckpointStats(2, null, null, 2, 5), - 2, - Instant.now(), - Instant.now() - ); assertThat( TransportGetTransformStatsAction.deriveStats(task, null), @@ -197,12 +273,13 @@ public void testDeriveStats() { null, stats, TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), equalTo( new TransformStats( transformId, @@ -210,14 +287,15 @@ public void testDeriveStats() { "transform is set to stop at the next checkpoint", null, stats, - info, - TransformHealth.GREEN + CHECKPOINTING_INFO, + TransformHealth.GREEN, + null ) ) ); reason = "foo"; - runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true); + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true, null); withIdStateAndStats(transformId, runningState, stats); assertThat( @@ -230,17 +308,29 @@ public void testDeriveStats() { null, stats, TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.STOPPING, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN, + null + ) + ) ); // Stop at next checkpoint is false. - runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false); + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false, null); withIdStateAndStats(transformId, runningState, stats); assertThat( @@ -253,13 +343,25 @@ public void testDeriveStats() { null, stats, TransformCheckpointingInfo.EMPTY, - TransformHealth.GREEN + TransformHealth.GREEN, + null ) ) ); assertThat( - TransportGetTransformStatsAction.deriveStats(task, info), - equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info, TransformHealth.GREEN)) + TransportGetTransformStatsAction.deriveStats(task, CHECKPOINTING_INFO), + equalTo( + new TransformStats( + transformId, + TransformStats.State.INDEXING, + reason, + null, + stats, + CHECKPOINTING_INFO, + TransformHealth.GREEN, + null + ) + ) ); } @@ -269,5 +371,4 @@ private void withIdStateAndStats(String transformId, TransformState state, Trans when(task.getStats()).thenReturn(stats); when(task.getContext()).thenReturn(new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class))); } - } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java index a4c9b858a8233..c9d929fce0047 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java @@ -62,7 +62,7 @@ public void testTaskStateValidationWithTransformTasks() { // test again with a non failed task but this time it has internal state pTasksBuilder.updateTaskState( "non-failed-task", - new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null) + new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null, null, false, null) ); csBuilder = ClusterState.builder(new ClusterName("_name")).metadata(buildMetadata(pTasksBuilder.build())); @@ -76,7 +76,7 @@ public void testTaskStateValidationWithTransformTasks() { ) .updateTaskState( "failed-task", - new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null) + new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null, null, false, null) ); final ClusterState cs = ClusterState.builder(new ClusterName("_name")).metadata(buildMetadata(pTasksBuilder.build())).build(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java index b2ca0769ea976..86193ef511618 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java @@ -7,7 +7,10 @@ package org.elasticsearch.xpack.transform.transforms; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.junit.After; import org.junit.Before; @@ -16,6 +19,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -91,6 +95,22 @@ public void testStateReason() { verify(listener).failureCountChanged(); } + public void testAuthState() { + TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, listener); + assertThat(context.getAuthState(), is(nullValue())); + + context.setAuthState(AuthorizationState.green()); + assertThat(context.getAuthState(), is(notNullValue())); + assertThat(context.getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + context.setAuthState(AuthorizationState.red(new ElasticsearchSecurityException("missing privileges"))); + assertThat(context.getAuthState(), is(notNullValue())); + assertThat(context.getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + + context.setAuthState(null); + assertThat(context.getAuthState(), is(nullValue())); + } + public void testFrom() { Instant from = Instant.ofEpochMilli(randomLongBetween(0, 1_000_000_000_000L)); TransformContext context = new TransformContext(TransformTaskState.STARTED, null, 0, from, listener); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java index 96678b629b435..3b4604caca5cd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java @@ -112,7 +112,7 @@ private static Instant getNow() { private static void withIdStateAndContext(TransformTask task, String transformId, TransformContext context) { when(task.getTransformId()).thenReturn(transformId); when(task.getState()).thenReturn( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, "", null, null, false) + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, "", null, null, false, null) ); when(task.getContext()).thenReturn(context); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index cedd8c8651317..afa10afbdf638 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -245,7 +245,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -257,7 +257,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -269,7 +269,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -327,7 +327,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -340,7 +340,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -352,7 +352,7 @@ public void fail(String failureMessage, ActionListener listener) { // fail again this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -364,7 +364,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -376,7 +376,7 @@ public void fail(String failureMessage, ActionListener listener) { this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -458,7 +458,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -472,7 +472,7 @@ public void fail(String failureMessage, ActionListener listener) { listener -> configManager.putOrUpdateTransformStoredDoc( new TransformStoredDoc( config.getId(), - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), indexer.getStats() ), indexer.getSeqNoPrimaryTermAndIndex(), @@ -487,7 +487,7 @@ public void fail(String failureMessage, ActionListener listener) { // state persistence should fail with a version conflict this.assertAsyncFailure( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), e -> { @@ -500,7 +500,7 @@ public void fail(String failureMessage, ActionListener listener) { // recovered this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { @@ -513,7 +513,7 @@ public void fail(String failureMessage, ActionListener listener) { // succeed this.assertAsync( listener -> indexer.persistState( - new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false), + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 42, null, null, null, false, null), listener ), r -> { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 7759f153ee753..a9ea0f0244e86 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -146,7 +146,8 @@ class MockedTransformIndexer extends TransformIndexer { context.getStateReason(), getProgress(), null, - context.shouldStopAtCheckpoint() + context.shouldStopAtCheckpoint(), + null ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index ec286b3af3e6c..bcb92adaf2003 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -389,7 +389,7 @@ private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { } - public TransformPersistentTasksExecutor buildTaskExecutor() { + private TransformPersistentTasksExecutor buildTaskExecutor() { ClusterService clusterService = mock(ClusterService.class); Client client = mock(Client.class); TransformAuditor mockAuditor = mock(TransformAuditor.class); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 9885f247f255d..af95a15a13e44 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.transforms; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -19,6 +20,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.health.HealthStatus; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; @@ -30,6 +32,7 @@ import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -117,7 +120,8 @@ public void testStopOnFailedTaskWithStoppedIndexer() { "because", null, null, - false + false, + null ); TransformTask transformTask = new TransformTask( @@ -195,7 +199,8 @@ public void testStopOnFailedTaskWithoutIndexer() { "because", null, null, - false + false, + null ); TransformTask transformTask = new TransformTask( @@ -405,6 +410,51 @@ public void testFindTransformTasks() { assertThat(TransformTask.findTransformTasks(Set.of("transform-4", "transform-5"), clusterState), is(empty())); } + public void testApplyNewAuthState() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); + + TransformConfig transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders(); + TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + + TransformState transformState = new TransformState( + TransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0L, + "because", + null, + null, + false, + AuthorizationState.green() + ); + + TransformTask transformTask = new TransformTask( + 42, + "some_type", + "some_action", + TaskId.EMPTY_TASK_ID, + client, + createTransformTaskParams(transformConfig.getId()), + transformState, + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY), + auditor, + threadPool, + Collections.emptyMap() + ); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + transformTask.applyNewAuthState(AuthorizationState.red(new ElasticsearchSecurityException("missing permissions"))); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.RED))); + assertThat(transformTask.getContext().getAuthState().getLastAuthError(), is(equalTo("missing permissions"))); + + transformTask.applyNewAuthState(AuthorizationState.green()); + assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN))); + + transformTask.applyNewAuthState(null); + assertThat(transformTask.getContext().getAuthState(), is(nullValue())); + } + private static TransformTaskParams createTransformTaskParams(String transformId) { return new TransformTaskParams(transformId, Version.CURRENT, TimeValue.timeValueSeconds(10), false); } diff --git a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_authorization_state.schema.json b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_authorization_state.schema.json new file mode 100644 index 0000000000000..e23d12c8aa518 --- /dev/null +++ b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_authorization_state.schema.json @@ -0,0 +1,33 @@ +{ + "definitions": {}, + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://raw.githubusercontent.com/elastic/elasticsearch/master/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_authorization_state.schema.json", + "description": "schema definition for transform authorization state", + "additionalProperties": false, + "title": "Root", + "type": "object", + "required": [ + "timestamp", + "status" + ], + "properties": { + "timestamp": { + "type": "integer", + "description": "when the permissions where last checked" + }, + "status": { + "type": "string", + "enum": [ + "green", + "yellow", + "red", + "unknown" + ], + "description": "The transform authorization status" + }, + "last_auth_error": { + "type": "string", + "description": "last error message if auth error occurred" + } + } +} diff --git a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_stats.schema.json b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_stats.schema.json index 8e520e4f09e15..cc2234f35c923 100644 --- a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_stats.schema.json +++ b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_stats.schema.json @@ -66,6 +66,10 @@ }, "health": { "$ref": "file:transform_health.schema.json" + }, + "auth_state": { + "$ref": "file:transform_authorization_state.schema.json" } } } +