Skip to content

Commit

Permalink
feat: permit abandoned data transfers to be re-started by another dat…
Browse files Browse the repository at this point in the history
…a-plane runtime (#4650)

feat: permit interrupted data transfers to be started by another replica
  • Loading branch information
ndr-brt authored Dec 5, 2024
1 parent ebd43aa commit 81e2cba
Show file tree
Hide file tree
Showing 33 changed files with 636 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public void initialize() {
runtimeId = getSetting(RUNTIME_ID, null);
if (runtimeId != null) {
getMonitor().warning("A configuration value for '%s' was found. Explicitly configuring this is not supported anymore and may get removed in the future. A random value will be assigned.".formatted(RUNTIME_ID));
} else {
// runtime-id should always be randomized to guarantee a working lease mechanism
runtimeId = UUID.randomUUID().toString();
}

componentId = getSetting(COMPONENT_ID, null);
Expand All @@ -128,8 +131,6 @@ public void initialize() {
});
}

// runtime-id should always be randomized to guarantee a working lease mechanism
runtimeId = UUID.randomUUID().toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void boot(boolean addShutdownHook) {
healthCheckService.addStartupStatusProvider(startupStatusRef::get);
}

} catch (Exception e) {
} catch (Throwable e) {
onError(e);
}

Expand Down Expand Up @@ -147,7 +147,7 @@ protected ServiceExtensionContext createServiceExtensionContext(Config config) {
/**
* Callback for any error that happened during runtime initialization
*/
protected void onError(Exception e) {
protected void onError(Throwable e) {
monitor.severe(String.format("Error booting runtime: %s", e.getMessage()), e);
throw new EdcException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,24 @@ void registerService_throwsWhenFrozen() {
@Nested
class GetRuntimeId {
@Test
void shouldReturnRandomUuid_whenNotConfigured_andComponentIdNull() {
void shouldReturnRandomUuid_whenNotConfigured() {
when(config.getConfig(any())).thenReturn(ConfigFactory.empty());
context.initialize();

var runtimeId = context.getRuntimeId();
assertThat(UUID.fromString(runtimeId)).isNotNull();

verify(monitor, times(2)).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID))));
verify(monitor, times(1)).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID))));
}


@Test
void shouldReturnRandomId_whenComponentIdNotConfigured() {
void shouldReturnConfiguredRuntimeId() {
when(config.getConfig(any())).thenReturn(ConfigFactory.fromMap(Map.of(RUNTIME_ID, "runtime-id")));
context.initialize();

var runtimeId = context.getRuntimeId();

assertThat(UUID.fromString(runtimeId)).isNotNull();
assertThat(runtimeId).isEqualTo("runtime-id");
}
}

Expand All @@ -125,7 +124,9 @@ void shouldReturnRandomUuid_whenNotConfigured() {
void shouldUseRuntimeId_whenComponentIdNotConfigured() {
when(config.getConfig(any())).thenReturn(ConfigFactory.fromMap(Map.of(RUNTIME_ID, "runtime-id")));
context.initialize();

var componentId = context.getComponentId();

assertThat(componentId).isEqualTo("runtime-id");
}

Expand All @@ -138,7 +139,7 @@ void shouldUseConfiguredValue_whenBothAreConfigured() {
var componentId = context.getComponentId();
var runtimeId = context.getRuntimeId();

assertThat(UUID.fromString(runtimeId)).isNotNull();
assertThat(runtimeId).isEqualTo("runtime-id");
assertThat(componentId).isEqualTo("component-id");
verify(monitor).warning(and(isA(String.class), argThat(message -> !message.contains(RUNTIME_ID))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -48,6 +49,7 @@ public class EmbeddedRuntime extends BaseRuntime {
private final MultiSourceServiceLocator serviceLocator;
private final URL[] classPathEntries;
private Future<?> runtimeThread;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

public EmbeddedRuntime(String name, Map<String, String> properties, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules));
Expand Down Expand Up @@ -87,6 +89,8 @@ public void boot(boolean addShutdownHook) {
super.boot(false);

latch.countDown();

isRunning.set(true);
} catch (Exception e) {
runtimeException.set(e);
throw new EdcException(e);
Expand All @@ -113,6 +117,7 @@ public void shutdown() {
if (runtimeThread != null && !runtimeThread.isDone()) {
runtimeThread.cancel(true);
}
isRunning.set(false);
}

@Override
Expand Down Expand Up @@ -148,4 +153,8 @@ public <T> T getService(Class<T> clazz) {
public ServiceExtensionContext getContext() {
return context;
}

public boolean isRunning() {
return isRunning.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.junit.matchers;

import org.mockito.internal.matchers.Equals;

import java.util.Arrays;
import java.util.HashSet;

import static org.mockito.ArgumentMatchers.argThat;

/**
* Matches if the items contained in the "wanted" array are contained in the actual one.
* Heavily inspired on {@link org.mockito.internal.matchers.ArrayEquals}
*/
public class ArrayContainsMatcher extends Equals {

@SuppressWarnings("unchecked")
public static <T> T[] arrayContains(T[] items) {
return (T[]) argThat(new ArrayContainsMatcher(items));
}

public ArrayContainsMatcher(Object[] wanted) {
super(wanted);
}

@Override
public boolean matches(Object actual) {
var wanted = getWanted();
if (wanted instanceof Object[] wantedArray && actual instanceof Object[] actualArray) {
return new HashSet<>(Arrays.asList(actualArray)).containsAll(Arrays.asList(wantedArray));
}

return super.matches(actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.function.Predicate;

@Deprecated(since = "0.11.0") // as not used anywhere
public class PredicateMatcher<T> implements ArgumentMatcher<T> {

private final Predicate<T> predicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.runtime.metamodel.annotation.Configuration;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.runtime.metamodel.annotation.Settings;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -40,6 +42,8 @@
import java.time.Clock;
import java.util.concurrent.Executors;

import static org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager.DEFAULT_FLOW_LEASE_FACTOR;
import static org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager.DEFAULT_FLOW_LEASE_TIME;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_ITERATION_WAIT;
import static org.eclipse.edc.statemachine.AbstractStateEntityManager.DEFAULT_SEND_RETRY_BASE_DELAY;
Expand Down Expand Up @@ -89,6 +93,9 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {
)
private int numThreads;

@Configuration
private FlowLeaseConfiguration flowLeaseConfiguration;

private DataPlaneManagerImpl dataPlaneManager;

@Inject
Expand Down Expand Up @@ -135,6 +142,8 @@ public void initialize(ServiceExtensionContext context) {
.transferProcessClient(transferProcessApiClient)
.monitor(monitor)
.telemetry(telemetry)
.runtimeId(context.getRuntimeId())
.flowLeaseConfiguration(flowLeaseConfiguration)
.build();

context.registerService(DataPlaneManager.class, dataPlaneManager);
Expand Down Expand Up @@ -165,4 +174,32 @@ public DataTransferExecutorServiceContainer dataTransferExecutorServiceContainer
private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration() {
return new EntityRetryProcessConfiguration(sendRetryLimit, () -> new ExponentialWaitStrategy(sendRetryBaseDelay));
}

@Settings
public record FlowLeaseConfiguration(
@Setting(
key = "edc.dataplane.state-machine.flow.lease.time",
description = "The time in milliseconds after which a runtime renews its ownership on a started data flow.",
defaultValue = DEFAULT_FLOW_LEASE_TIME + "")
long time,
@Setting(
key = "edc.dataplane.state-machine.flow.lease.factor",
description = "After flow lease time * factor a started data flow will be considered abandoned by the owner and so another runtime can caught it up and start it again.",
defaultValue = DEFAULT_FLOW_LEASE_FACTOR + "")
int factor
) {

public FlowLeaseConfiguration() {
this(DEFAULT_FLOW_LEASE_TIME, DEFAULT_FLOW_LEASE_FACTOR);
}

/**
* After this time has passed, a DataFlow can be considered "abandoned" and it can be picked up by another runtime.
*
* @return the abandoned time.
*/
public long abandonTime() {
return time * factor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.connector.dataplane.framework.manager;

import org.eclipse.edc.connector.controlplane.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.DataPlaneFrameworkExtension.FlowLeaseConfiguration;
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
Expand All @@ -35,10 +36,14 @@
import org.eclipse.edc.statemachine.StateMachineManager;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
Expand All @@ -59,6 +64,8 @@ public class DataPlaneManagerImpl extends AbstractStateEntityManager<DataFlow, D
private DataPlaneAuthorizationService authorizationService;
private TransferServiceRegistry transferServiceRegistry;
private TransferProcessApiClient transferProcessClient;
private String runtimeId;
private FlowLeaseConfiguration flowLeaseConfiguration = new FlowLeaseConfiguration();

private DataPlaneManagerImpl() {

Expand Down Expand Up @@ -88,7 +95,8 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage startMessage)
.callbackAddress(startMessage.getCallbackAddress())
.traceContext(telemetry.getCurrentTraceContext())
.properties(startMessage.getProperties())
.transferType(startMessage.getTransferType());
.transferType(startMessage.getTransferType())
.runtimeId(runtimeId);

var response = switch (startMessage.getFlowType()) {
case PULL -> handlePull(startMessage, dataFlowBuilder);
Expand Down Expand Up @@ -135,23 +143,41 @@ public StatusResult<Void> restartFlows() {
new Criterion("transferType.flowType", "=", PUSH.toString())
);

toBeRestarted.forEach(dataFlow -> {
dataFlow.transitToReceived();
processReceived(dataFlow);
});
toBeRestarted.forEach(this::restartFlow);
} while (!toBeRestarted.isEmpty());

return StatusResult.success();
}

@Override
protected StateMachineManager.Builder configureStateMachineManager(StateMachineManager.Builder builder) {
Supplier<Criterion> ownedByThisRuntime = () -> new Criterion("runtimeId", "=", runtimeId);
Supplier<Criterion> ownedByAnotherRuntime = () -> new Criterion("runtimeId", "!=", runtimeId);
Supplier<Criterion> flowLeaseNeedsToBeUpdated = () -> new Criterion("updatedAt", "<", clock.millis() - flowLeaseConfiguration.time());
Supplier<Criterion> danglingTransfer = () -> new Criterion("updatedAt", "<", clock.millis() - flowLeaseConfiguration.abandonTime());

return builder
.processor(processDataFlowInState(STARTED, this::updateFlowLease, ownedByThisRuntime, flowLeaseNeedsToBeUpdated))
.processor(processDataFlowInState(STARTED, this::restartFlow, ownedByAnotherRuntime, danglingTransfer))
.processor(processDataFlowInState(RECEIVED, this::processReceived))
.processor(processDataFlowInState(COMPLETED, this::processCompleted))
.processor(processDataFlowInState(FAILED, this::processFailed));
}

private boolean updateFlowLease(DataFlow dataFlow) {
dataFlow.transitToReceived();
dataFlow.transitionToStarted(runtimeId);
store.save(dataFlow);
return true;
}

private boolean restartFlow(DataFlow dataFlow) {
monitor.debug("Restarting interrupted flow %s, it was owned by runtime %s".formatted(dataFlow.getId(), dataFlow.getRuntimeId()));
dataFlow.transitToReceived();
processReceived(dataFlow);
return true;
}

private StatusResult<DataFlow> stop(String dataFlowId) {
return stop(dataFlowId, null);
}
Expand Down Expand Up @@ -225,8 +251,9 @@ private boolean processReceived(DataFlow dataFlow) {
return true;
}

dataFlow.transitionToStarted();
store.save(dataFlow);
dataFlow.transitionToStarted(runtimeId);
monitor.info("UPDATE dataflow %s. RuntimeId %s, UpdatedAt %s".formatted(dataFlow.getId(), dataFlow.getRuntimeId(), dataFlow.getUpdatedAt()));
update(dataFlow);

return entityRetryProcessFactory.doAsyncProcess(dataFlow, () -> transferService.transfer(request))
.entityRetrieve(id -> store.findByIdAndLease(id).orElse(f -> null))
Expand Down Expand Up @@ -277,9 +304,16 @@ private boolean processFailed(DataFlow dataFlow) {
return true;
}

private Processor processDataFlowInState(DataFlowStates state, Function<DataFlow, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()) };
return ProcessorImpl.Builder.newInstance(() -> store.nextNotLeased(batchSize, filter))
@SafeVarargs
private Processor processDataFlowInState(DataFlowStates state, Function<DataFlow, Boolean> function, Supplier<Criterion>... additionalCriteria) {
Supplier<Collection<DataFlow>> entitiesSupplier = () -> {
var additional = Arrays.stream(additionalCriteria).map(Supplier::get);
var filter = Stream.concat(Stream.of(new Criterion[]{ hasState(state.code()) }), additional)
.toArray(Criterion[]::new);
return store.nextNotLeased(batchSize, filter);
};

return ProcessorImpl.Builder.newInstance(entitiesSupplier)
.process(telemetry.contextPropagationMiddleware(function))
.onNotProcessed(this::breakLease)
.build();
Expand Down Expand Up @@ -322,6 +356,15 @@ public Builder authorizationService(DataPlaneAuthorizationService authorizationS
return this;
}

public Builder runtimeId(String runtimeId) {
manager.runtimeId = runtimeId;
return this;
}

public Builder flowLeaseConfiguration(FlowLeaseConfiguration flowLeaseConfiguration) {
manager.flowLeaseConfiguration = flowLeaseConfiguration;
return this;
}
}

}
Loading

0 comments on commit 81e2cba

Please sign in to comment.