Skip to content

Commit

Permalink
feat: implement policy-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Sep 14, 2023
1 parent f467cff commit 97397ff
Show file tree
Hide file tree
Showing 18 changed files with 730 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void started(TransferProcess process, TransferProcessStartedData addition
.transferProcessId(process.getId())
.dataAddress(additionalData.getDataAddress())
.callbackAddresses(process.getCallbackAddresses())
.contractId(process.getContractId())
.type(process.getType().name())
.build();

publish(event);
Expand Down
30 changes: 30 additions & 0 deletions core/policy-monitor/policy-monitor-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
}

dependencies {
api(project(":spi:policy-monitor:policy-monitor-spi"))
api(project(":spi:control-plane:control-plane-spi"))
api(project(":spi:control-plane:policy-spi"))
api(project(":spi:control-plane:transfer-spi"))
implementation(project(":core:common:state-machine"))
implementation(project(":core:common:connector-core"))

testImplementation(project(":core:common:junit"))
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.policy.monitor;

import org.eclipse.edc.connector.policy.monitor.manager.PolicyMonitorManagerImpl;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorManager;
import org.eclipse.edc.connector.policy.monitor.subscriber.StartMonitoring;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.telemetry.Telemetry;

import java.time.Clock;

import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension.NAME;

@Extension(value = NAME)
@Provides({ PolicyMonitorManager.class })
public class PolicyMonitorExtension implements ServiceExtension {

public static final String NAME = "Policy Monitor";

public static final long DEFAULT_ITERATION_WAIT = 1000;
public static final int DEFAULT_BATCH_SIZE = 20;
public static final int DEFAULT_SEND_RETRY_LIMIT = 7;
public static final long DEFAULT_SEND_RETRY_BASE_DELAY = 1000L;

@Inject
private ExecutorInstrumentation executorInstrumentation;

@Inject
private Telemetry telemetry;

@Inject
private Clock clock;

@Inject
private EventRouter eventRouter;

@Inject
private ContractAgreementService contractAgreementService;

@Inject
private PolicyEngine policyEngine;

@Inject
private TransferProcessService transferProcessService;

private PolicyMonitorManagerImpl manager;

@Override
public void initialize(ServiceExtensionContext context) {
manager = PolicyMonitorManagerImpl.Builder.newInstance()
.clock(clock)
.executorInstrumentation(executorInstrumentation)
.monitor(context.getMonitor())
.telemetry(telemetry)
.contractAgreementService(contractAgreementService)
.policyEngine(policyEngine)
.transferProcessService(transferProcessService)
.build();

context.registerService(PolicyMonitorManager.class, manager);

eventRouter.registerSync(TransferProcessStarted.class, new StartMonitoring(manager));
}

@Override
public void start() {
manager.start();
}

@Override
public void shutdown() {
if (manager != null) {
manager.stop();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.policy.monitor.manager;

import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorManager;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore;
import org.eclipse.edc.connector.policy.monitor.store.InMemoryPolicyMonitorStore;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.policy.engine.spi.PolicyContextImpl;
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.retry.WaitStrategy;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.eclipse.edc.statemachine.retry.EntityRetryProcessConfiguration;
import org.eclipse.edc.statemachine.retry.EntityRetryProcessFactory;
import org.jetbrains.annotations.NotNull;

import java.time.Clock;
import java.time.Instant;
import java.util.function.Function;

import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension.DEFAULT_ITERATION_WAIT;
import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension.DEFAULT_SEND_RETRY_BASE_DELAY;
import static org.eclipse.edc.connector.policy.monitor.PolicyMonitorExtension.DEFAULT_SEND_RETRY_LIMIT;

public class PolicyMonitorManagerImpl implements PolicyMonitorManager {

private int batchSize = DEFAULT_BATCH_SIZE;
private WaitStrategy waitStrategy = () -> DEFAULT_ITERATION_WAIT;
private Monitor monitor;
private Clock clock;
private Telemetry telemetry;
private ExecutorInstrumentation executorInstrumentation;
private EntityRetryProcessFactory entityRetryProcessFactory;
private PolicyMonitorStore store = new InMemoryPolicyMonitorStore();
private EntityRetryProcessConfiguration entityRetryProcessConfiguration = defaultEntityRetryProcessConfiguration();
private StateMachineManager stateMachineManager;
private PolicyEngine policyEngine;
private TransferProcessService transferProcessService;
private ContractAgreementService contractAgreementService;

private PolicyMonitorManagerImpl() {

}

public void start() {
entityRetryProcessFactory = new EntityRetryProcessFactory(monitor, clock, entityRetryProcessConfiguration);
stateMachineManager = StateMachineManager.Builder.newInstance("policy-monitor", monitor, executorInstrumentation, waitStrategy)
.processor(processEntriesInState(this::processMonitoring))
.build();
stateMachineManager.start();
}

public void stop() {
if (stateMachineManager != null) {
stateMachineManager.stop();
}
}

@Override
public void startMonitoring(String transferProcessId, String contractId) {
var entry = PolicyMonitorEntry.Builder.newInstance()
.id(transferProcessId)
.contractId(contractId)
.traceContext(telemetry.getCurrentTraceContext())
.state(100) // TODO: state needed
.build();

store.save(entry);
}

private boolean processMonitoring(PolicyMonitorEntry entry) {
monitor.debug("[POLICYMONITORING] GONNA GET THE POLICY FOR TP " + entry.getId());
var contractAgreement = contractAgreementService.findById(entry.getContractId());
var policy = contractAgreement.getPolicy();
monitor.debug("[POLICYMONITORING] EVALUATE THA POLICY " + policy);
var build = PolicyContextImpl.Builder.newInstance()
.additional(Instant.class, Instant.now(clock))
.additional(ContractAgreement.class, contractAgreement)
.build();
// TODO: it needs the whole contract agreement !!! 0_0
var result = policyEngine.evaluate("transfer.process", policy, build);
monitor.debug("[POLICYMONITORING] POLICY EVALUATE RESULT. SUCCEEDED? " + result.succeeded());
if (result.failed()) {
monitor.debug("[POLICYMONITORING] POLICY VER FAILED: " + result.getFailureDetail());
var completeResult = transferProcessService.complete(entry.getId());
monitor.debug("[POLICYMONITORING] COMPLETED TP? " + completeResult.succeeded() + " " + completeResult.getFailureDetail());
// TODO: change status and store
}
return false; // TODO: this should happen only if the policyengine returns succeed
}

private Processor processEntriesInState(Function<PolicyMonitorEntry, Boolean> function) {
var filter = new Criterion[]{}; // TODO, once one gets completed, should not be fetched anymore (deleted?)
return ProcessorImpl.Builder.newInstance(() -> store.nextNotLeased(batchSize, filter))
.process(telemetry.contextPropagationMiddleware(function))
.onNotProcessed(this::breakLease)
.build();
}

private void breakLease(PolicyMonitorEntry entry) {
store.save(entry);
}

public static class Builder {
private final PolicyMonitorManagerImpl manager;

public static Builder newInstance() {
return new Builder();
}

private Builder() {
manager = new PolicyMonitorManagerImpl();
}

public Builder batchSize(int size) {
manager.batchSize = size;
return this;
}

public Builder waitStrategy(WaitStrategy waitStrategy) {
manager.waitStrategy = waitStrategy;
return this;
}

public Builder monitor(Monitor monitor) {
manager.monitor = monitor;
return this;
}

public Builder telemetry(Telemetry telemetry) {
manager.telemetry = telemetry;
return this;
}

public Builder executorInstrumentation(ExecutorInstrumentation executorInstrumentation) {
manager.executorInstrumentation = executorInstrumentation;
return this;
}

public Builder clock(Clock clock) {
manager.clock = clock;
return this;
}

public Builder entityRetryProcessConfiguration(EntityRetryProcessConfiguration entityRetryProcessConfiguration) {
manager.entityRetryProcessConfiguration = entityRetryProcessConfiguration;
return this;
}

public Builder contractAgreementService(ContractAgreementService contractAgreementService) {
manager.contractAgreementService = contractAgreementService;
return this;
}

public Builder policyEngine(PolicyEngine policyEngine) {
manager.policyEngine = policyEngine;
return this;
}

public Builder transferProcessService(TransferProcessService transferProcessService) {
manager.transferProcessService = transferProcessService;
return this;
}

public PolicyMonitorManagerImpl build() {
return manager;
}
}

@NotNull
private EntityRetryProcessConfiguration defaultEntityRetryProcessConfiguration() {
return new EntityRetryProcessConfiguration(DEFAULT_SEND_RETRY_LIMIT, () -> new ExponentialWaitStrategy(DEFAULT_SEND_RETRY_BASE_DELAY));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.policy.monitor.store;

import org.eclipse.edc.connector.core.store.InMemoryStatefulEntityStore;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorEntry;
import org.eclipse.edc.connector.policy.monitor.spi.PolicyMonitorStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

public class InMemoryPolicyMonitorStore implements PolicyMonitorStore {

private final InMemoryStatefulEntityStore<PolicyMonitorEntry> store;

public InMemoryPolicyMonitorStore() {
store = new InMemoryStatefulEntityStore<>(PolicyMonitorEntry.class, UUID.randomUUID().toString(), Clock.systemUTC(), new HashMap<>());
}

@Override
public @Nullable PolicyMonitorEntry findById(String id) {
return store.find(id);
}

@Override
public @NotNull List<PolicyMonitorEntry> nextNotLeased(int max, Criterion... criteria) {
return store.leaseAndGet(max, criteria);
}

@Override
public StoreResult<PolicyMonitorEntry> findByIdAndLease(String id) {
return store.leaseAndGet(id);
}

@Override
public void save(PolicyMonitorEntry entity) {
store.upsert(entity);
}
}
Loading

0 comments on commit 97397ff

Please sign in to comment.