Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TestingFailureInjector* to test classpath #23233

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,115 +13,42 @@
*/
package io.trino.execution;

import com.google.common.cache.CacheBuilder;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.cache.NonEvictableCache;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.ErrorType;
import io.trino.spi.TrinoException;

import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_FAILURE;
import static io.trino.spi.ErrorType.EXTERNAL;
import static io.trino.spi.ErrorType.INSUFFICIENT_RESOURCES;
import static io.trino.spi.ErrorType.INTERNAL_ERROR;
import static io.trino.spi.ErrorType.USER_ERROR;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class FailureInjector
public interface FailureInjector
{
public static final String FAILURE_INJECTION_MESSAGE = "This error is injected by the failure injection service";

private final NonEvictableCache<Key, InjectedFailure> failures;
private final Duration requestTimeout;

@Inject
public FailureInjector(FailureInjectionConfig config)
{
this(
config.getExpirationPeriod(),
config.getRequestTimeout());
}

public FailureInjector(Duration expirationPeriod, Duration requestTimeout)
{
failures = buildNonEvictableCache(CacheBuilder.newBuilder()
.expireAfterWrite(expirationPeriod.toMillis(), MILLISECONDS));
this.requestTimeout = requireNonNull(requestTimeout, "requestTimeout is null");
}

public void injectTaskFailure(
void injectTaskFailure(
String traceToken,
int stageId,
int partitionId,
int attemptId,
InjectedFailureType injectionType,
Optional<ErrorType> errorType)
{
failures.put(new Key(traceToken, stageId, partitionId, attemptId), new InjectedFailure(injectionType, errorType));
}
FailureInjector.InjectedFailureType injectionType,
Optional<ErrorType> errorType);

public Optional<InjectedFailure> getInjectedFailure(
Optional<InjectedFailure> getInjectedFailure(
String traceToken,
int stageId,
int partitionId,
int attemptId)
{
if (failures.size() == 0) {
return Optional.empty();
}
return Optional.ofNullable(failures.getIfPresent(new Key(traceToken, stageId, partitionId, attemptId)));
}

public Duration getRequestTimeout()
{
return requestTimeout;
}
int attemptId);

private static class Key
{
private final String traceToken;
private final int stageId;
private final int partitionId;
private final int attemptId;

private Key(String traceToken, int stageId, int partitionId, int attemptId)
{
this.traceToken = requireNonNull(traceToken, "traceToken is null");
this.stageId = stageId;
this.partitionId = partitionId;
this.attemptId = attemptId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key key = (Key) o;
return stageId == key.stageId && partitionId == key.partitionId && attemptId == key.attemptId && Objects.equals(traceToken, key.traceToken);
}

@Override
public int hashCode()
{
return Objects.hash(traceToken, stageId, partitionId, attemptId);
}
}
Duration getRequestTimeout();

public enum InjectedFailureType
enum InjectedFailureType
{
TASK_MANAGEMENT_REQUEST_FAILURE,
TASK_MANAGEMENT_REQUEST_TIMEOUT,
Expand All @@ -130,7 +57,9 @@ public enum InjectedFailureType
TASK_FAILURE,
}

public static class InjectedFailure
String FAILURE_INJECTION_MESSAGE = "This error is injected by the failure injection service";

class InjectedFailure
{
private final InjectedFailureType injectedFailureType;
private final Optional<ErrorType> taskFailureErrorType;
Expand Down Expand Up @@ -173,7 +102,7 @@ public String toString()
}
}

public enum InjectedErrorCode
enum InjectedErrorCode
implements ErrorCodeSupplier
{
INJECTED_USER_ERROR(1, USER_ERROR),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import io.airlift.units.Duration;
import io.trino.spi.ErrorType;

import java.util.Optional;

public class NoOpFailureInjector
implements FailureInjector
{
@Override
public void injectTaskFailure(String traceToken, int stageId, int partitionId, int attemptId, InjectedFailureType injectionType, Optional<ErrorType> errorType)
{
}

@Override
public Optional<InjectedFailure> getInjectedFailure(String traceToken, int stageId, int partitionId, int attemptId)
{
return Optional.empty();
}

@Override
public Duration getRequestTimeout()
{
throw new IllegalArgumentException("getRequestTimeout should not be called");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import io.trino.event.SplitMonitor;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.ExplainAnalyzeContext;
import io.trino.execution.FailureInjectionConfig;
import io.trino.execution.FailureInjector;
import io.trino.execution.LocationFactory;
import io.trino.execution.MemoryRevokingScheduler;
import io.trino.execution.NoOpFailureInjector;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.SqlTaskManager;
Expand Down Expand Up @@ -277,8 +277,7 @@ protected void setup(Binder binder)
new TopologyAwareNodeSelectorModule()));

// task execution
configBinder(binder).bindConfig(FailureInjectionConfig.class);
binder.bind(FailureInjector.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, FailureInjector.class).setDefault().to(NoOpFailureInjector.class).in(Scopes.SINGLETON);
jaxrsBinder(binder).bind(TaskResource.class);
newExporter(binder).export(TaskResource.class).withGeneratedName();
binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ void injectTaskFailure(

void loadExchangeManager(String name, Map<String, String> properties);

record MaterializedResultWithPlan(QueryId queryId, Optional<Plan> queryPlan, MaterializedResult result) {}}
record MaterializedResultWithPlan(QueryId queryId, Optional<Plan> queryPlan, MaterializedResult result) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestFailureInjectionConfig
public class TestTestingFailureInjectionConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(FailureInjectionConfig.class)
assertRecordedDefaults(recordDefaults(TestingFailureInjectionConfig.class)
.setRequestTimeout(new Duration(2, MINUTES))
.setExpirationPeriod(new Duration(10, MINUTES)));
}
Expand All @@ -42,7 +42,7 @@ public void testExplicitPropertyMappings()
.put("failure-injection.expiration-period", "7m")
.buildOrThrow();

FailureInjectionConfig expected = new FailureInjectionConfig()
TestingFailureInjectionConfig expected = new TestingFailureInjectionConfig()
.setRequestTimeout(new Duration(12, MINUTES))
.setExpirationPeriod(new Duration(7, MINUTES));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import static java.util.concurrent.TimeUnit.MINUTES;

public class FailureInjectionConfig
public class TestingFailureInjectionConfig
{
private Duration expirationPeriod = new Duration(10, MINUTES);
private Duration requestTimeout = new Duration(2, MINUTES);
Expand All @@ -35,7 +35,7 @@ public Duration getExpirationPeriod()
@ConfigHidden // not supposed to be used outside of tests
@Config("failure-injection.expiration-period")
@ConfigDescription("Period after which an injected failure is considered expired and will no longer be triggering a failure")
public FailureInjectionConfig setExpirationPeriod(Duration expirationPeriod)
public TestingFailureInjectionConfig setExpirationPeriod(Duration expirationPeriod)
{
this.expirationPeriod = expirationPeriod;
return this;
Expand All @@ -50,7 +50,7 @@ public Duration getRequestTimeout()
@ConfigHidden // not supposed to be used outside of tests
@Config("failure-injection.request-timeout")
@ConfigDescription("Period after which requests blocked to emulate a timeout are released")
public FailureInjectionConfig setRequestTimeout(Duration requestTimeout)
public TestingFailureInjectionConfig setRequestTimeout(Duration requestTimeout)
{
this.requestTimeout = requestTimeout;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.google.common.cache.CacheBuilder;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.cache.NonEvictableCache;
import io.trino.spi.ErrorType;

import java.util.Objects;
import java.util.Optional;

import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class TestingFailureInjector
implements FailureInjector
{
private final NonEvictableCache<Key, InjectedFailure> failures;
private final Duration requestTimeout;

@Inject
public TestingFailureInjector(TestingFailureInjectionConfig config)
{
this(
config.getExpirationPeriod(),
config.getRequestTimeout());
}

public TestingFailureInjector(Duration expirationPeriod, Duration requestTimeout)
{
failures = buildNonEvictableCache(CacheBuilder.newBuilder()
.expireAfterWrite(expirationPeriod.toMillis(), MILLISECONDS));
this.requestTimeout = requireNonNull(requestTimeout, "requestTimeout is null");
}

@Override
public void injectTaskFailure(
String traceToken,
int stageId,
int partitionId,
int attemptId,
InjectedFailureType injectionType,
Optional<ErrorType> errorType)
{
failures.put(new Key(traceToken, stageId, partitionId, attemptId), new InjectedFailure(injectionType, errorType));
}

@Override
public Optional<InjectedFailure> getInjectedFailure(
String traceToken,
int stageId,
int partitionId,
int attemptId)
{
if (failures.size() == 0) {
return Optional.empty();
}
return Optional.ofNullable(failures.getIfPresent(new Key(traceToken, stageId, partitionId, attemptId)));
}

@Override
public Duration getRequestTimeout()
{
return requestTimeout;
}

private static class Key
{
private final String traceToken;
private final int stageId;
private final int partitionId;
private final int attemptId;

private Key(String traceToken, int stageId, int partitionId, int attemptId)
{
this.traceToken = requireNonNull(traceToken, "traceToken is null");
this.stageId = stageId;
this.partitionId = partitionId;
this.attemptId = attemptId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Key key = (Key) o;
return stageId == key.stageId && partitionId == key.partitionId && attemptId == key.attemptId && Objects.equals(traceToken, key.traceToken);
}

@Override
public int hashCode()
{
return Objects.hash(traceToken, stageId, partitionId, attemptId);
}
}
}
Loading
Loading