Simple event-based framework for developing high concurrent applications wth event-based domain model.
Add the following dependency in your pom.xml
:
<dependency>
<groupId>io.github.imashtak</groupId>
<artifactId>echo-core</artifactId>
<version>0.7.0</version>
</dependency>
You may want to use this library in Spring application. If true you can add special dependency which will provide you annotation-based approach to register events and their handlers onto bus:
<dependency>
<groupId>io.github.imashtak</groupId>
<artifactId>echo-spring</artifactId>
<version>0.7.0</version>
</dependency>
For the same integration with Quarkus you need the following:
<dependency>
<groupId>io.github.imashtak</groupId>
<artifactId>echo-quarkus</artifactId>
<version>0.7.0</version>
</dependency>
For more information of such integrations read Usage with Spring or Usage with Quarkus.
There are two main concepts: Event
and Bus
. You can publish event on bus and then subscribe to the type of that event to somehow handle it. It is not required to handle every event on the bus.
Let’s start with simple event named SignInInitiated
:
@Getter
public class SignInInitiated extends Event {
private final String username;
private final String password;
public SignInInitiated(String username, String password) {
super();
this.username = username;
this.password = password;
}
}
Then we may create the bus and add subscription to it. Subscription method (handling method) will be called each time when event of type SignInInitiated
will be published onto bus:
public class Main {
public static void main(String[] args){
// Bus instance required
var bus = new Bus();
// Adding handling method and exception resolver
bus.subscribeOn(SignInInitiated.class,
e -> {
System.out.println("User '%s' signing in...".formatted(e.getUsername()));
},
(e, ex) -> {
System.out.println("Something go wrong!");
}
);
// Publishing event onto bus
bus.publish(new SignInInitiated("user", "passwd"));
}
}
Method Bus::subscribeOn
is non-blocking and all handles will be executed in different threads.
So Event
is just a portion of data which can be handled asynchronously.
There are several extensions for subscribing:
-
first, you can subscribe on interface class, which means that all events that are implement such an interface will be handled. This is default behavior of
subscribeOn
method; -
second, you can subscribe on classes, which are annotated by some annotation, the right method for that is
subscribeOnAnnotated
; -
third, you can set your own predicate and subscribe on events that matches this predicate --- see one of overloads of
subscribeOn
method.
Each event carries a Flow
— special amount of data purposed to hold information about chain of events — for tracing or debug stuff. All events have flow associated with them — mostly for tracing reasons. Also flow can hold any useful info in its field Flow::context
. One possible use case is storing info about the user — initiator of processing request.
What if we need to so some synchronous thing? For example, not just to react on users sign in, but also validate password, generate and return session token? Here comes Task
, Result
, Succcess
and Failure
classes.
Firstly we will change source code of SignInInitiated
to the following:
@Getter
public class SignInInitiated extends Task<Failed, Succeed> {
private final String username;
private final String password;
public SignInInitiated(String username, String password) {
super();
this.username = username;
this.password = password;
}
@Getter
public static class Succeed extends Success {
private final String token;
public Succeed(SignInInitiated task, String token) {
super(task);
this.token = token;
}
}
public static class Failed extends Failure {
public Failed(SignInInitiated task, Throwable cause) {
super(task, cause);
}
}
}
After that we have become to have an ability to await result of task — success or failure. Bus-connected code is following:
public class Main {
public static void main(String[] args){
// Bus instance required
var bus = new Bus();
// Adding handling method and exception resolver
bus.subscribeOn(SignInInitiated.class,
e -> {
System.out.println("User '%s' signing in...".formatted(e.getUsername()));
bus.publish(new SignInInitiated.Succeed(e, "token"));
},
(e, ex) -> {
System.out.println("Something go wrong!");
bus.publish(new SignInInitiated.Failed(e, ex));
}
);
// Publishing task onto bus
var task = new SignInInitiated("user", "passwd");
bus.publish(task);
// Awaiting result
Mono<Result> resultAsync = bus.await(task);
// Dealing with result
var result = resultAsync.block();
if (result.isSuccess()) {...}
else {...}
}
}
Sometimes it is easier to be more "object-oriented" and place handling method near the data. SelfHandler
interface provides that feature. Code is clear:
@Getter
public class SignInInitiated
extends Event
implements SelfHandler
{
private final String username;
private final String password;
public SignInInitiated(String username, String password) {
super();
this.username = username;
this.password = password;
}
@Override
public void handleSelf(Bus bus) {
}
@Override
public void onException(Bus bus, Throwable ex) {
}
}
After that the following code is enough to register handler onto bus:
bus.subscribeOn(SignInInitiated.class);
Bus provides a number of hooks:
-
bus.onBeforeHandle
calls right before event handling method call; -
bus.onAfterHandle
calls right after event handling method call and (or) its exception handling method call.
Such hooks may be useful for tracing needs. For example, you can set up logging MDC with event flow for using in logging pattern:
bus.onBeforeHandle(e -> {
MDC.put("flowId", e.flow().id().toString());
});
bus.onAfterHandle(e -> {
MDC.remove("flowId");
});
There are 3 annotations and 1 utility method that provides support for better code organization and allows to automatically create subscriptions:
-
@Handler
— marks that this class contains handlers of a number of events; -
@Handles
— marks that this method is handler of some event; -
@HandlesExceptionsOf
— marks that this method must be triggered if there will be any exception in@Handles
-method. Scope of such handles consists of methods in@Handler
-class; -
AutoRegistration::auto
— helper method which scans provided classes for occurrences of previously mentioned annotations.
Also, auto registration method will add subscription for SelfHandler
implementations.
Simple event handler may be written like this:
@Handler
@RequiredArgsConstructor
public class ExampleEventHandler {
private final Bus bus;
@Handles(ExampleEvent.class)
public void handles(ExampleEvent e) {
}
@HandlesExceptionsOf({ExampleEvent.class})
public void onException(ExampleEvent e, Throwable ex) {
}
}
Then you may use auto registration method for creating subscriptions:
var bus = new Bus();
AutoRegistration.auto(
bus,
List.of(ExampleEventHandler.class),
(clazz) -> Optional.of(new ExampleEventHandler(bus))
);
Requires option echo.packages.to.scan
(comma separated list of packages) to discover event and event handlers locations. Can be placed in any Spring config source. Example:
java -Decho.packages.to.scan=com.example.one,com.example.two ...
Package provides Spring-configuration class called EchoSpringConfiguration
which can be included to Spring application context, for example, in the following way:
@SpringBootApplication(scanBasePackageClasses = {EchoSpringConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
After that you will have the bean of type Bus
in the application context with automatically registered event handlers based on mentioned annotations.
Event handler may be a bean. Example:
@Component
@Handler
public class SomeHandler {
private final Bus bus;
public SomeHandler(@Lazy Bus bus) {
this.bus = bus;
}
@Handles
public void handler(SomeEvent e) {}
@HandlesExceptionsOf({SomeEvent.class})
public void onException(SomeEvent e, Throwable ex) {}
}
It is important to lazy wire Bus
dependency.
Requires option echo.packages.to.scan
(comma separated list of packages) to discover event and event handlers locations. Can be placed in any Quarkus config source. Example:
java -Decho.packages.to.scan=com.example.one,com.example.two ...
Package provides Quarkus-configuration class called EchoQuarkusConfiguration
which provides singleton instance of Bus
to CDI. It automatically registers event handlers onto bus.