Skip to content

Commit

Permalink
merge: #13585
Browse files Browse the repository at this point in the history
13585: [Backport stable/8.2] Ban process instances through admin API r=oleschoenburg a=rodrigo-lourenco-lopes

## Description

Manual backport of #13550


Co-authored-by: Christopher Zell <[email protected]>
Co-authored-by: rodrigolourencolopes <[email protected]>
Co-authored-by: Ole Schönburg <[email protected]>
  • Loading branch information
4 people authored Jul 26, 2023
2 parents 5e9033d + f49c6a5 commit ca29ed2
Show file tree
Hide file tree
Showing 21 changed files with 453 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import static java.util.Objects.requireNonNull;

import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.ActorFutureCollector;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,6 +64,20 @@ public ActorFuture<Void> resumeProcessing() {
return callOnEachPartition(PartitionAdminAccess::resumeProcessing);
}

@Override
public ActorFuture<Void> banInstance(final long processInstanceKey) {
final var partitionId = Protocol.decodePartitionId(processInstanceKey);
final var partition = partitions.get(partitionId);
if (partition == null) {
return CompletableActorFuture.completedExceptionally(
new RuntimeException(
"Could not ban process instance %s, partition %s does not exist"
.formatted(processInstanceKey, partitionId)));
}

return partition.banInstance(processInstanceKey);
}

private ActorFuture<Void> callOnEachPartition(
final Function<PartitionAdminAccess, ActorFuture<Void>> functionToCall) {
final ActorFuture<Void> response = concurrencyControl.createFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public ActorFuture<Void> resumeProcessing() {
return CompletableActorFuture.completed(null);
}

@Override
public ActorFuture<Void> banInstance(final long processInstanceKey) {
logCall();
return CompletableActorFuture.completed(null);
}

private void logCall() {
LOG.warn("Received call on NoOp implementation of PartitionAdminAccess");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface PartitionAdminAccess {
ActorFuture<Void> pauseProcessing();

ActorFuture<Void> resumeProcessing();

ActorFuture<Void> banInstance(final long processInstanceKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
package io.camunda.zeebe.broker.system.partitions;

import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.io.IOException;

public interface PartitionAdminControl {
StreamProcessor getStreamProcessor();

ZeebeDb getZeebeDb();

LogStream getLogStream();

ExporterDirector getExporterDirector();

void triggerSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import java.io.IOException;
import java.util.function.Supplier;
Expand All @@ -20,23 +22,39 @@ public class PartitionAdminControlImpl implements PartitionAdminControl {
private final Supplier<ExporterDirector> exporterDirectorSupplier;
private final Supplier<AsyncSnapshotDirector> snapshotDirectorSupplier;
private final Supplier<PartitionProcessingState> partitionProcessingStateSupplier;
private final Supplier<ZeebeDb> zeebeDbSupplier;
private final Supplier<LogStream> logStreamSupplier;

public PartitionAdminControlImpl(
final Supplier<StreamProcessor> streamProcessorSupplier,
final Supplier<ExporterDirector> exporterDirectorSupplier,
final Supplier<AsyncSnapshotDirector> snapshotDirectorSupplier,
final Supplier<PartitionProcessingState> partitionProcessingStateSupplier) {
final Supplier<PartitionProcessingState> partitionProcessingStateSupplier,
final Supplier<ZeebeDb> zeebeDbSupplier,
final Supplier<LogStream> logStreamSupplier) {
this.streamProcessorSupplier = streamProcessorSupplier;
this.exporterDirectorSupplier = exporterDirectorSupplier;
this.snapshotDirectorSupplier = snapshotDirectorSupplier;
this.partitionProcessingStateSupplier = partitionProcessingStateSupplier;
this.zeebeDbSupplier = zeebeDbSupplier;
this.logStreamSupplier = logStreamSupplier;
}

@Override
public StreamProcessor getStreamProcessor() {
return streamProcessorSupplier.get();
}

@Override
public ZeebeDb getZeebeDb() {
return zeebeDbSupplier.get();
}

@Override
public LogStream getLogStream() {
return logStreamSupplier.get();
}

@Override
public ExporterDirector getExporterDirector() {
return exporterDirectorSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public PartitionAdminControl getPartitionAdminControl() {
() -> getPartitionContext().getStreamProcessor(),
() -> getPartitionContext().getExporterDirector(),
() -> snapshotDirector,
() -> partitionProcessingState);
() -> partitionProcessingState,
() -> zeebeDb,
() -> logStream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.engine.state.processing.DbBannedInstanceState;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.LogStreamWriter.WriteFailure;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.stream.impl.records.RecordBatchEntry;
import io.camunda.zeebe.util.Either;
import java.io.IOException;
import java.util.Optional;
import org.slf4j.Logger;
Expand Down Expand Up @@ -140,4 +151,88 @@ public ActorFuture<Void> resumeProcessing() {
});
return completed;
}

@Override
public ActorFuture<Void> banInstance(final long processInstanceKey) {
final ActorFuture<Void> future = concurrencyControl.createFuture();
concurrencyControl.run(
() -> {
try {
adminControl
.getLogStream()
.newLogStreamWriter()
.onComplete(
(writer, error) -> {
if (error != null) {
LOG.error(
"Could not retrieve writer to write error record for process instance.",
error);
future.completeExceptionally(error);
return;
}

writeErrorEventAndBanInstance(processInstanceKey, writer, future);
});
} catch (final Exception e) {
LOG.error("Could not resume processing", e);
future.completeExceptionally(e);
}
});
return future;
}

private void writeErrorEventAndBanInstance(
final long processInstanceKey, final LogStreamWriter writer, final ActorFuture<Void> future) {
tryWriteErrorEvent(writer, processInstanceKey)
.ifRightOrLeft(
(position) -> {
LOG.info("Wrote error record on position {}", position);
// we only want to make the state change after we wrote the event
banInstanceInState(processInstanceKey);
LOG.info("Successfully banned instance with key {}", processInstanceKey);
future.complete(null);
},
writeFailure -> {
final String errorMsg =
String.format(
"Failure %s on writing error record to ban instance %d",
writeFailure, processInstanceKey);
future.completeExceptionally(new IllegalStateException(errorMsg));
LOG.error(errorMsg);
});
}

private void banInstanceInState(final long processInstanceKey) {
final var zeebeDb = adminControl.getZeebeDb();
final var context = zeebeDb.createContext();
final var dbBannedInstanceState = new DbBannedInstanceState(zeebeDb, context, partitionId);

dbBannedInstanceState.banProcessInstance(processInstanceKey);
}

private static Either<WriteFailure, Long> tryWriteErrorEvent(
final LogStreamWriter writer, final long processInstanceKey) {
final var errorRecord = new ErrorRecord();
errorRecord.initErrorRecord(new Exception("Instance was banned from outside."), -1);
errorRecord.setProcessInstanceKey(processInstanceKey);

final var recordMetadata =
new RecordMetadata()
.recordType(RecordType.EVENT)
.valueType(ValueType.ERROR)
.intent(ErrorIntent.CREATED)
.rejectionType(RejectionType.NULL_VAL)
.rejectionReason("");
final var entry =
RecordBatchEntry.createEntry(
processInstanceKey,
-1,
recordMetadata.getRecordType(),
recordMetadata.getIntent(),
recordMetadata.getRejectionType(),
recordMetadata.getRejectionReason(),
recordMetadata.getValueType(),
errorRecord);
return writer.tryWrite(entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,37 @@ protected ActorFuture<Either<ErrorResponseWriter, ApiResponseWriter>> handleAsyn
stepDownIfNotPrimary(responseWriter, partitionId, errorWriter));
case PAUSE_EXPORTING -> pauseExporting(responseWriter, partitionId, errorWriter);
case RESUME_EXPORTING -> resumeExporting(responseWriter, partitionId, errorWriter);
case BAN_INSTANCE -> banInstance(requestReader, responseWriter, partitionId, errorWriter);
default -> unknownRequest(errorWriter, requestReader.getMessageDecoder().type());
};
}

private ActorFuture<Either<ErrorResponseWriter, ApiResponseWriter>> banInstance(
final ApiRequestReader requestReader,
final ApiResponseWriter responseWriter,
final int partitionId,
final ErrorResponseWriter errorWriter) {
final long key = requestReader.key();

final ActorFuture<Either<ErrorResponseWriter, ApiResponseWriter>> result = actor.createFuture();
adminAccess
.banInstance(requestReader.key())
.onComplete(
(r, t) -> {
if (t == null) {
result.complete(Either.right(responseWriter));
} else {
LOG.error("Failed to ban instance {} on partition {}", key, partitionId, t);
result.complete(
Either.left(
errorWriter.internalError(
"Failed to ban instance %s, on partition %s", key, partitionId)));
}
});

return result;
}

private ActorFuture<Either<ErrorResponseWriter, ApiResponseWriter>> unknownRequest(
final ErrorResponseWriter errorWriter, final AdminRequestType type) {
errorWriter.unsupportedMessage(type, AdminRequestType.values());
Expand Down Expand Up @@ -135,7 +162,7 @@ private Either<ErrorResponseWriter, ApiResponseWriter> stepDownIfNotPrimary(
final int partitionId,
final ErrorResponseWriter errorWriter) {
final var partition = partitionManager.getPartitionGroup().getPartition(partitionId);
if (partition instanceof RaftPartition raftPartition) {
if (partition instanceof final RaftPartition raftPartition) {
if (raftPartition.getRole() == Role.LEADER) {
raftPartition.stepDownIfNotPrimary();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public AdminRequestDecoder getMessageDecoder() {
return messageDecoder;
}

public long key() {
return messageDecoder.key();
}

@Override
public void wrap(final DirectBuffer buffer, final int offset, final int length) {
messageDecoder.wrapAndApplyHeader(buffer, offset, headerDecoder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.shared.management;

import io.camunda.zeebe.gateway.Loggers;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.boot.actuate.endpoint.annotation.Selector.Match;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.boot.actuate.endpoint.web.WebEndpointResponse;
import org.springframework.boot.actuate.endpoint.web.annotation.WebEndpoint;
import org.springframework.stereotype.Component;

@Component
@WebEndpoint(id = "banning")
public final class BanInstanceEndpoint {
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
final BanInstanceService banInstanceService;

@Autowired
public BanInstanceEndpoint(final BanInstanceService banInstanceService) {
this.banInstanceService = banInstanceService;
}

@WriteOperation
public WebEndpointResponse<?> post(
@Selector(match = Match.SINGLE) final long processInstanceKey) {
try {
LOG.info("Send AdminRequest to ban instance with key {}", processInstanceKey);
banInstanceService.banInstance(processInstanceKey);
return new WebEndpointResponse<>(WebEndpointResponse.STATUS_NO_CONTENT);
} catch (final CompletionException e) {
return new WebEndpointResponse<>(
e.getCause(), WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR);
} catch (final Exception e) {
return new WebEndpointResponse<>(e, WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.shared.management;

import io.camunda.zeebe.gateway.admin.BrokerAdminRequest;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public final class BanInstanceService {

private final BrokerClient client;

@Autowired
public BanInstanceService(final BrokerClient client) {
this.client = client;
}

public void banInstance(final long key) {
client.sendRequest(new BrokerAdminRequest().banInstance(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,4 @@
public interface BannedInstanceState extends StreamProcessorLifecycleAware {

boolean isBanned(final TypedRecord record);

boolean isEmpty();
}
Loading

0 comments on commit ca29ed2

Please sign in to comment.