Skip to content

Commit

Permalink
Implement mathematical operators
Browse files Browse the repository at this point in the history
* count - emits the number of items emitted by the upstream
* index - emits `Tuple2<Long, T>` for each item from the upstream. The first element of the tuple is the index (0-based), and the second if the item
* min / max - emits the min/max item from the upstream, using Java comparator
* top(x) - emits the top x items from the upstream, a new ranking is emitted every time it changes.
* sum - emits the sum of all the items emitted by the upstream
* average - emits the average of all the items emitted by the upstream
  • Loading branch information
cescoffier committed May 10, 2021
1 parent 604c650 commit c14d8e4
Show file tree
Hide file tree
Showing 31 changed files with 1,703 additions and 0 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>mutiny-test-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-math</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
31 changes: 31 additions & 0 deletions math/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Mutiny Math Operators

This library provides a set of _operators_ for [Mutiny](https://smallrye.io/smallrye-mutiny).

You can use these operators using the `plug` method offered by Mutiny:

```java
Multi<Long> counts = Multi.createFrom().items("a", "b", "c", "d", "e")
.plug(Math.count())
```

These operators provide streams (`Multi`) emitting new values when the computed value changes. To get the last value, use `collect().last()`:

```java
String max = Multi.createFrom().items("e", "b", "c", "f", "g", "e")
.plug(Math.max())
.collect().last()
.await().indefinitely();
```

The `io.smallrye.math.Math` class provides the entry point to the offered operators:

* count - emits the number of items emitted by the upstream
* index - emits `Tuple2<Long, T>` for each item from the upstream. The first element of the tuple is the index (0-based), and the second if the item
* min / max - emits the min/max item from the upstream, using Java comparator
* top(x) - emits the top x items from the upstream, a new ranking is emitted every time it changes.
* sum - emits the sum of all the items emitted by the upstream
* average - emits the average of all the items emitted by the upstream



149 changes: 149 additions & 0 deletions math/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-project</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<name>SmallRye Mutiny :: Math Operators</name>
<artifactId>mutiny-math</artifactId>

<dependencies>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>reactive-streams-junit5-tck</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<goals>
<goal>test</goal>
</goals>
<configuration>
<!-- Disable TestNG -->
<testNGArtifactName>none:none</testNGArtifactName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.moditect</groupId>
<artifactId>moditect-maven-plugin</artifactId>
<executions>
<execution>
<id>add-module-infos</id>
<phase>package</phase>
<goals>
<goal>add-module-info</goal>
</goals>
<configuration>
<module>
<moduleInfoFile>src/main/module/module-info.java</moduleInfoFile>
</module>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.revapi</groupId>
<artifactId>revapi-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<activation>
<jdk>[9,)</jdk>
</activation>
<id>java-9+</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- wonderful reliability of javadoc used with modules breaking on non exposed packages -->
<source>8</source>
<release>8</release>
<detectJavaApiLink>false</detectJavaApiLink>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
37 changes: 37 additions & 0 deletions math/revapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[ {
"extension" : "revapi.java",
"id" : "java",
"configuration" : {
"missing-classes" : {
"behavior" : "report",
"ignoreMissingAnnotations" : false
},
"filter" : {
"packages" : {
"regex" : true,
"include" : [ "io\\.smallrye\\.mutiny\\.math(\\..+)?" ]
}
}
}
}, {
"extension" : "revapi.differences",
"id" : "breaking-changes",
"configuration" : {
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [

]
}
}, {
"extension" : "revapi.reporter.json",
"configuration" : {
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"output" : "target/compatibility.json",
"indent" : true,
"append" : false,
"keepEmptyFile" : true
}
} ]
30 changes: 30 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/AverageOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;

/**
* Average operator emitting the current count.
* Everytime it gets an item from upstream, it emits the <em>average</em> of the already received items.
* If the stream emits the completion event without having emitting any item before, 0 is emitted, following by the
* completion event.
* If the upstream emits a failure, the failure is propagated.
*/
public class AverageOperator<T extends Number>
implements Function<Multi<T>, Multi<Double>> {

private double sum = 0;
private long count = 0;

@Override
public Multi<Double> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> {
count = count + 1;
sum = sum + x.doubleValue();
return sum / count;
})
.onCompletion().ifEmpty().continueWith(0.0);
}
}
30 changes: 30 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/CountOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;

/**
* Count operator emitting the current count.
* Everytime it gets an item from upstream, it emits the <em>count</em>.
* If the stream emits the completion event without having emitting any item before, 0 is emitted, following by the
* completion event.
* If the upstream emits a failure, the failure is propagated.
*
* @param <T> type of the incoming items.
*/
public class CountOperator<T>
implements Function<Multi<T>, Multi<Long>> {

private long count = 0;

@Override
public Multi<Long> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> {
count = count + 1;
return count;
})
.onCompletion().ifEmpty().continueWith(0L);
}
}
26 changes: 26 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/IndexOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.tuples.Tuple2;

/**
* Index operator emitting a {@link io.smallrye.mutiny.tuples.Tuple2 Tuple2&lt;Long, T&gt;}, with the index (0-based) of the
* element
* and the element from upstream.
* If the upstream emits a failure, the failure is propagated.
*
* @param <T> type of the incoming items.
*/
public class IndexOperator<T>
implements Function<Multi<T>, Multi<Tuple2<Long, T>>> {

private long index = 0;

@Override
public Multi<Tuple2<Long, T>> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> Tuple2.of(index++, x));
}
}
Loading

0 comments on commit c14d8e4

Please sign in to comment.