Skip to content

Latest commit



519 lines (384 loc) · 21.5 KB

File metadata and controls

519 lines (384 loc) · 21.5 KB


Step functions are the place where most business logic happens. They are expected to have no side effect and only concern themselves of what their return value will be.

A Step is typically a class with a public method annotated with the @StepConfig annotation, and that method is expected to return a Result subtype. Beside these properties, it can be pretty much anything you want.

In the README demonstration pipeline, we wrote the following:

public class Tokenizer
    @StepConfig(id = "tokenizer")
    public TokenizedSentence tokenize(@Input String sentence)
        return new TokenizedSentence(Stream.of(sentence.split("[^\\p{L}]+"))

    public record TokenizedSentence(
        List<String> tokens
    ) implements Result {}

This Step has an entrypoint, the tokenize method is annotated with the @StepConfig. The id argument is optional, but if present it will be used in tags, metrics, logging, pretty much everywhere the step's identity is concerned. The @StepConfig annotation accepts a few more optional parameters which we dive into in the "Configuration" section.

It has an @Input argument that will be mapped to the pipeline's input. You can add as many arguments as you need, check out available options in the "Possible Inputs" section.

It returns a TokenizedSentence which type is an inner record, there is no single way to manage Result subtypes, but it's important to keep in mind that with data-pipeline's result data model these types are used for addressing. Here, the inner type is fine if it is closely tied to the Step logic, so it can make sense to simply declare the type alongside the Step. In other circumstances, you might need a common result type being its own thing. Together with a ResultEvaluator they can pull a lot of modeling weight.

A Step can return several results simultaneously by using the MultiResult wrapper. Each Result in the wrapper will be indexed individually and considered as if they were produced by a sequence of steps.

Note however that MultiResult outputs are considered as a single entity by any registered ResultEvaluator, this is so you can implement your own logic for resolving the StepStrategy from the result collection. For the same reason, if an Interruption is returned as part of a MultiResult it won't be picked up by X_ON_INTERRUPT default evaluator implementations.


Step functions can be supplied to a pipeline builder "as-is", meaning you simply registerStep the step instance itself:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(new Tokenizer())
    /* ...and others */

This is fine for simple setups and already gets you some of the data-pipeline feature-set (component architecture, various integrations, pipeline-level default behaviours, etc.). But at some point you will want finer-grained configuration at the component level, and this is where the StepAssembler steps in.

The StepAssembler accepts a Step (the one you would be providing directly to the pipeline) and offers you a way to plug in as many modifiers as you need:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(builder -> builder
        .step(new Tokenizer())
        .withWrapper(new RetryWrapper<>(RetryConfig.custom().maxAttempts(3).build()))
    /* ...and others */

In the snippet above, we added a retry and an error handler on the tokenizer. This is the kind of thing you could be doing if your step has to interact with an unreliable database or external API, for instance.

As we will show in the following sections, some of these options can be set through the @StepConfig annotation. It should be noted that the StepAssembler options have precedence over the @StepConfig, so the latter is a good place to put your step defaults for instance.

Function Modifiers

The StepAssembler accepts a variety of function modifiers which will alter how the Step is executed, or how its output (whether on the successful or error path) is handled. All of these are optional, but can be very useful in implementing more sophisticated patterns.


The StepCondition is a predicate which role is to determine whether the Step will be executed based on two types of information:

In the example below, we leverage the generic MetadataCondition implementation for performing a check on the "tokenizer" key:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(builder -> builder
        .step(new Tokenizer())
        .withCondition(new MetadataCondition("tokenizer", BASIC))
    .registerStep(builder -> builder
        .step(new SuperDuperTokenizer())
        .withCondition(new MetadataCondition("tokenizer", SUPER_DUPER))
    /* ...and others */
;"This is a sentence.", ctx -> ctx.set("tokenizer", BASIC)); // this would match step 1"This is a sentence.", ctx -> ctx.set("tokenizer", SUPER_DUPER)); // this would match step 2

A condition can also be applied via the @StepConfig annotation if the StepCondition has a default constructor:

@StepConfig(condition = MyCondition.class)
public MyResult doStuff() { /**/ }

...or, you can use the default type-based condition for filtering objects (in the @Object sense) of a given type:

/* This step will only run when the execution object is a MyType subclass */
@StepConfig(conditionOnClass = MyType.class)
public MyResult doStuff() { /**/ }

Error Handlers

The StepErrorHandler is a wrapper which role is to act on exceptions thrown by the Step it is applied to. There are two ways StepErrorHandler are typically used:

  • as exception wrappers: their contract gives access to the original Step exception, you can wrap the exception in order to standardize their signature, or introduce an exception type that can encapsulate metadata
  • as error recovery procedures: handlers can return a Result, this can be leveraged for running fallback code or convert an exception into an "error result", for instance if you want non-blocking errors

💡 Error handlers also have a dedicated documentation section.

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(builder -> builder
        .step(new Tokenizer())
        .withErrorHandler((ex, in, payload, results, ctx) -> new ErrorResult("This didn't go as planned: " + ex.getMessage(), ex))
    /* If we register other steps after the above, they will get executed */

We could imagine gathering all triggered errors for instance, within a Sink:

public class ErrorGatherer
    public void gatherErrors(@Current Stream<ErrorResult> errors)
        errors.forEach(error -> System.out.println(error.message()));

An error handler can be applied via the @StepConfig annotation if the StepErrorHandler has a default constructor:

@StepConfig(errorHandler = MyErrorHandler.class)
public MyResult doStuff() { /**/ }

Result Evaluators

A ResultEvaluator is a function that takes the step's return value as input, and can then inform the Pipeline about what to do next through a StepStrategy.

At the time of this writing a StepStrategy can be one of the following ; with CONTINUE being the default success strategy, and EXIT being the default exception strategy.

strategy register result retain object run remaining steps run pinned steps run sinks
CONTINUE yes yes yes yes yes
SKIP no yes yes yes yes
DISCARD_AND_CONTINUE yes no yes yes yes
STOP yes - no yes yes
ABORT yes - no no yes
EXIT (or exception throw) yes - no no no

For instance, if we want a ResultEvaluator that will stop the pipeline as soon as a specific Result is found:

public class MyResultEvaluator implements ResultEvaluator
    public StepStrategy evaluate(Result result, Indexable object, Object input, Context ctx)
        if (result instanceof MyStopConditionResult)
            return StepStrategy.STOP;
        return StepStrategy.CONTINUE;

From there, we would likely apply it as the default evaluator on the whole pipeline:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .setDefaultEvaluator(new MyResultEvaluator())
    /* ...and others */

...or on a single Step:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(builder -> builder
        .step(new Tokenizer())
        .withEvaluator(new MyResultEvaluator())
    /* ...and others */

A result evaluator can be applied via the @StepConfig annotation if the ResultEvaluator has a default constructor:

@StepConfig(evaluator = MyResultEvaluator.class)
public MyResult doStuff() { /**/ }


A StepWrapper is a function that takes a Step as input and returns a Step as output. The main use for wrappers is to apply generic policies on your business logic, one such example is resilience patterns such as a retry or circuit-breaker.

💡 Wrappers also have a dedicated documentation section.

A simple wrapper implementation can look like this:

public static class MyWrapper<T extends Indexable, I> implements StepWrapper<T, I>
    public Step<T, I> wrap(Step<T, I> step)
        return (obj, in, payload, res, ctx) -> {
            // do stuff
            return step.execute(obj, in, payload, res, ctx);

Then, the wrapper can be applied as follows:

Pipeline<String> pipeline = Pipeline.<String>of("string-processor")
    .registerStep(builder -> builder
        .step(new Tokenizer())
        .withWrapper(new MyWrapper<>())
    /* ...and others */

Possible Inputs

Step functions accept a variety of inputs, which can be combined as needed. Some will be mapped by type directly as they are related to Pipeline internals (e.g. Results or Context), others will need additional semantics via annotations.


When an argument is annotated with @Input, the pipeline will attempt to map it to its own input:

public MyResult doStuff(@Input SomeType in) { /**/ }

If the requested input type do not match with the pipeline's input type, an IllegalArgumentException will be thrown at execution time.


When an argument is annotated with @Current, the pipeline will attempt to map it to a "current result" in the ResultContainer.

By type

If the argument annotated is a Result subtype, the pipeline will attempt to recover the latest "current result" from the ResultContainer matching the specified type.

If the annotation is put on a non-Result argument type, the pipeline will throw an IllegalStateException at build time.

If no "current result" can be found for that type, a NoSuchElementException will be thrown at execution time.

public MyResult doStuff(@Current SomeResult result) { /**/ }

/* The argument can be an `Optional`, it will be empty if no match can be found */
public MyResult doStuff(@Current Optional<SomeResult> result) { /**/ }

/* The argument can be a `Stream`, in will contain all matches for that result type */
public MyResult doStuff(@Current Stream<SomeResult> results) { /**/ }

By name

If the argument annotated has a specified name, the pipeline will attempt to recover the latest "current result" from the ResultContainer matching the specified name. This can be most helpful in cases where you want to standardize Result types and reuse a single type for different semantics.

If the annotation is put on a non-Result argument type, the pipeline will throw an IllegalStateException at build time.

If no "current result" can be found for that name, a NoSuchElementException will be thrown at execution time.

If a "current result" can be found for that name, but the match has a type mismatch, an IllegalArgumentException will be thrown at execution time.

public MyResult doStuff(@Current(name = "my_name") SomeResult result) { /**/ }

/* The argument can be an `Optional`, it will be empty if no match can be found */
public MyResult doStuff(@Current(name = "my_name") Optional<SomeResult> result) { /**/ }

/* The argument can be a `Stream`, in will contain all matches for that name */
public MyResult doStuff(@Current(name = "my_name") Stream<SomeResult> results) { /**/ }


When an argument is annotated with @Latest, the pipeline will attempt to map it to a "latest result" in the ResultContainer.

By type

If the argument annotated is a Result subtype, the pipeline will attempt to recover the latest "latest result" from the ResultContainer matching the specified type.

If the annotation is put on a non-Result argument type, the pipeline will throw an IllegalStateException at build time.

If no "latest result" can be found for that type, a NoSuchElementException will be thrown at execution time.

public MyResult doStuff(@Latest SomeResult result) { /**/ }

/* The argument can be an `Optional`, it will be empty if no match can be found */
public MyResult doStuff(@Latest Optional<SomeResult> result) { /**/ }

/* The argument can be a `Stream`, in will contain all matches for that name */
public MyResult doStuff(@Latest Stream<SomeResult> results) { /**/ }

By name

If the argument annotated has a specified name, the pipeline will attempt to recover the latest "current result" from the ResultContainer matching the specified name. This can be most helpful in cases where you want to standardize Result types and reuse a single type for different semantics.

If the annotation is put on a non-Result argument type, the pipeline will throw an IllegalStateException at build time.

If no "latest result" can be found for that name, a NoSuchElementException will be thrown at execution time.

If a "latest result" can be found for that name, but the match has a type mismatch, an IllegalArgumentException will be thrown at execution time.

public MyResult doStuff(@Latest(name = "my_name") SomeResult result) { /**/ }

/* The argument can be an `Optional`, it will be empty if no match can be found */
public MyResult doStuff(@Latest(name = "my_name") Optional<SomeResult> result) { /**/ }

/* The argument can be a `Stream`, in will contain all matches for that name */
public MyResult doStuff(@Latest(name = "my_name") Stream<SomeResult> results) { /**/ }

Results and ResultView

The Results and ResultView arguments give you access to the whole ResultContainer, 💡 more information on their respective feature sets in the "Result Data Model" section.

They are mapped by type so no specific annotation is required:

public MyResult doStuff(Results results)


The pipeline's payload can be passed as argument with the @Payload annotation, 💡 more information on payloads in the initializer section.

If the requested and actual types do not match, an IllegalArgumentException will be thrown at execution time.

public MyResult doStuff(@Payload MyPayload payload) { /**/ }


The current object can be passed as argument with the @Object annotation, 💡 more information on objects in the initializer section.

If the requested and actual types do not match, an IllegalArgumentException will be thrown at execution time.

public MyResult doStuff(@Object SomeIndexable object) { /**/ }


The PipelineTag can be passed as argument, they are mapped by type so no specific annotation is required.

Pipeline tags are generated at the very start of the pipeline and contain the following properties:

@StepConfig(id = "my-step")
public MyResult doStuff(PipelineTag tag)
     * The following would print something along the lines of:
     * PipelineTag[uid=2zongloalw6vwnbrs7joqgf0cxh, pipeline=my-pipeline, author=anonymous]


The ComponentTag can be passed as argument, they are mapped by type so no specific annotation is required.

Component tags are generated at the start of each component run and contain the following properties:

@StepConfig(id = "my-step")
public MyResult doStuff(ComponentTag tag)
     * The following would print something along the lines of:
     * ComponentTag[uid=2zongm8nvgu7jrlc5tl0tbgcexk, pipelineTag=PipelineTag[uid=2zongloalw6vwnbrs7joqgf0cxh, pipeline=my-pipeline, author=anonymous], id=my-step, family=STEP]


The Context can be mapped by type, and gives you access to the pipeline's context:

public MyResult doStuff(Context context)
    //context.get("my_metadata_key", SomeType.class).orElseThrow();

💡 More info on the context in the pipeline's section.

Single entries in the context can be passed via the @Context annotation:

public MyResult doStuff(@Context("some_entry") String someEntry) { /**/ }

/* The argument can be an `Optional`, it will be empty if no match can be found */
public MyResult doStuff(@Context("some_entry") Optional<String> someEntry) { /**/ }


You can access the UIDGenerator currently in use by the pipeline by requesting it as an argument:

public MyResult doStuff(UIDGenerator generator)

This can be useful if you want to harmonize the generation of UIDs between data-pipeline managed data and more business-centric data.

Typically, if data-lineage is a concern, you might want to persist the PipelineTag or ComponentTag in some data store. It may then be relevant to use the same UID generation strategy for other data models, as these UIDs can have properties such as being time-stamped or lexicographically sortable.


The LogMarker is a component that can produce a LabelMarker out of the TagResolver currently in use by the pipeline.

Its use is encouraged for annotating your own logs with contextual information (💡 see the relevant TagResolver section).

public MyResult doStuff(LogMarker marker)
{, "This is my log: {}", 123);"my_local_key", "my_value"), "This is my log: {}", 234);


All step functions' Result are tracked by the pipeline, and can be used by subsequent steps, sinks, or returned as part of the final Output.

The Result interface is a very simple contract with a single name() method, which has a default implementation simply returning the class name. The value returned by name() can be used for addressing results within the ResultContainer, 💡 more information on that in the dedicated section.