Skip to content

Commit

Permalink
feat(tasks): introduce an Assert tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 21, 2025
1 parent 4d074cb commit 4d4963a
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Property<T> {
.copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);

@Getter
private String expression;
private T value;

Expand Down Expand Up @@ -347,11 +348,6 @@ public int hashCode() {
return Objects.hash(expression);
}

// used only by the serializer
String getExpression() {
return this.expression;
}

// used only by the value extractor
T getValue() {
return value;
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/utils/TruthUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ abstract public class TruthUtils {
public static boolean isTruthy(String condition) {
return condition != null && !FALSE_VALUES.contains(condition);
}

public static boolean isFalsy(String condition) {
return condition != null && FALSE_VALUES.contains(condition);
}
}
114 changes: 114 additions & 0 deletions core/src/main/java/io/kestra/plugin/core/execution/Assert.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.kestra.plugin.core.execution;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Metric;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.TruthUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Assert some conditions.",
description = "Used to control outputs data emitted from previous task on this execution."
)
@Plugin(
examples = {
@Example(
title = "Assert based on inputs data",
full = true,
code = {
"id: assert\n" +
"namespace: company.team\n" +
"\n" +
"inputs:\n" +
" - id: param\n" +
" type: STRING\n" +
" required: true\n" +
"\n" +
"tasks:\n" +
" - id: fail\\n\" +\n" +
" type: io.kestra.plugin.core.execution.Assert\n" +
" conditions:\n" +
" - \"{{ inputs.param == 'ok' }}\"\n" +
" - \"{{ 1 + 1 == 3 }}\"\n"
}
)
},
metrics = {
@Metric(name = "failed", type = Counter.TYPE),
@Metric(name = "success", type = Counter.TYPE)
}
)
public class Assert extends Task implements RunnableTask<VoidOutput> {
@Schema(
title = "List of assertion condition, must coerce to a boolean.",
description = "Boolean coercion allows 0, -0, and '' to coerce to false, all other values to coerce to true."
)
@NotNull
@PluginProperty(dynamic = true)
private List<String> conditions;

@Schema(title = "Optional error message.")
private Property<String> errorMessage;

@Override
public VoidOutput run(RunContext runContext) throws Exception {
AtomicInteger failed = new AtomicInteger(0);
AtomicInteger success = new AtomicInteger(0);

conditions
.forEach(s -> {
try {
String renderer = runContext.render(s);

if (TruthUtils.isFalsy(renderer)) {
runContext.logger().error("Assertion `{}` failed!", s, renderer);
failed.incrementAndGet();
} else {
success.incrementAndGet();
}

} catch (IllegalVariableEvaluationException e) {
runContext.logger().error("Assertion `{}` failed, failed to render `{}`", s, e.getMessage());
failed.incrementAndGet();
}
});

runContext.metric(Counter.of("success", success.get()));
runContext.metric(Counter.of("failed", failed.get()));

if (failed.get() > 0) {
throw new Exception(
failed + " assertion" + (failed.get() > 1 ? "s" : "") + " failed!" +
runContext.render(errorMessage).as(String.class).map(r -> "\n" + r).orElse("")
);
}

return null;
}
}
95 changes: 95 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/execution/AssertTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.kestra.plugin.core.execution;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThrows;

@KestraTest(startRunner = true)
public class AssertTest {
@Inject
RunContextFactory runContextFactory;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;

@Test
void success() throws Exception {
Assert task = Assert.builder()
.id(IdUtils.create())
.type(Assert.class.getName())
.conditions(List.of(
"{{ inputs.key == 'value' }}",
"{{ 42 == 42 }}"
))
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of("key", "value"));
task.run(runContext);

assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("success")).findFirst().orElseThrow().getValue(), is(2d));
assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("failed")).findFirst().orElseThrow().getValue(), is(0d));
}

@Test
void failed() {
List<LogEntry> logs = new ArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));


Assert task = Assert.builder()
.id(IdUtils.create())
.type(Assert.class.getName())
.conditions(List.of(
"{{ 42 == 42 }}",
"{{ inputs.key == 'value1' }}",
"{{ 42 == 42 }}",
"{{ inputs.key == 'value2' }}",
"{{ 42 == 42 }}"
))
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of("key", "value"));

Exception exception = assertThrows(Exception.class, () -> task.run(runContext));

assertThat(exception.getMessage(), containsString("2 assertions failed"));

List<LogEntry> matchingLog = TestsUtils.awaitLogs(logs, 2);
receive.blockLast();


assertThat(matchingLog.stream().filter(logEntry -> logEntry.getMessage().contains("inputs.key == 'value1'")).count(), is(1L));
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getMessage().contains("inputs.key == 'value2'")).count(), is(1L));

assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("success")).findFirst().orElseThrow().getValue(), is(3d));
assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("failed")).findFirst().orElseThrow().getValue(), is(2d));
}

@Test
@ExecuteFlow("flows/valids/assert.yaml")
void dontFailOnCondition(Execution execution) {
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}
}
16 changes: 16 additions & 0 deletions core/src/test/resources/flows/valids/assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
id: assert
namespace: io.kestra.tests

tasks:
- id: before
type: io.kestra.plugin.core.output.OutputValues
values:
output1: xyz
output2: abc
taskInfo: "{{ task.id }} > {{ taskrun.startDate }}"
- id: fail
type: io.kestra.plugin.core.execution.Assert
conditions:
- "{{ 42 == 42 }}"
- "{{ outputs.before.values.output1 == 'xyz' }}"
- "{{ outputs.before.values.output2 == 'xyz' }}"

0 comments on commit 4d4963a

Please sign in to comment.